Русский
Русский
English
Статистика
Реклама

Андройд

Чаты на вебсокетах, когда на бэкенде WAMP. Теперь про Android

13.01.2021 22:20:55 | Автор: admin

Мой коллега уже писал про наш опыт разработки чатов на вебсокетах для iOS, поэтому часть про особенности бэкенда с точки зрения клиента у нас общая. А вот реализация на Android, конечно, отличается. И ещё мне не приходилось, как в первой статье, искать библиотеку для поддержки старых версий операционной системы, потому что на Android каких-то глобальных изменений в сетевой части не было, всё работало и так.

К реализации вернёмся чуть ниже, а начнём с ответов на вопросы про бэкенд, которые появились после первой статьи: почему WAMP, какой брокер используем и некоторые другие моменты.

На время передам слово нашему бэкенд-разработчику @antoha-gs, а если хочется сразу почитать про клиент-серверное общение и декодирование, то первый раздел можно пропустить.

Что там на бэкенде

Почему WAMP. Изначально искал открытый протокол, который мог бы работать поверх WebSocket с поддержкой функционала PubSub и RPC и с потенциалом масштабирования. Лучше всего подошёл WAMP одни плюсы, разве что не нашёл реализации протокола на Java/Kotlin, которая бы меня устраивала.

Какой брокер мы используем. Продолжая предыдущий пункт, это, собственно, и послужило написанию собственной реализации протокола. Плюсы экспертиза в своём коде и гибкость, то есть при надобности всегда можно отойти от стандарта в нужную сторону. Каких-то серьёзных минусов не выявил.

К небольшим проблемам реализации проекта чатов можно отнести то, что нужно было ресёрчить и ревёрсить то, как работает реализация на Sendbird сервисе, который мы использовали. То есть какими возможностями мы там пользовались и какой дополнительный функционал был реализован поверх. Также к сложностям отнёс бы перенос данных из Sendbird в свою базу.

Ещё был такой момент в комментариях:

Правильно, что не стали использовать Socket.IO, так как рано или поздно столкнулись бы с двумя проблемами: 1) Пропуск сообщений. 2) Дублирование сообщений. WAMP к сожалению также не решает эти вопросы. Поэтому для чатов лучше использовать что-то вроде MQTT.

Насколько я могу судить, протокол не решает таких проблем магическим образом, всё упирается в реализацию. Да, на уровне протокола может поддерживаться дополнительная информация/настройки для указания уровня обслуживания (at most/at least/exactly), но ответственность за её реализацию всё равно лежит на конкретной имплементации. В нашем случае, учитывая специфику, достаточно гарантировать надёжную запись в базу и доставку на клиенты at most once, что WAMP вполне позволяет реализовать. Также он легко расширяем.

MQTT отличный протокол, никаких вопросов, но в данном сравнении у него меньше фич, чем у WAMP, которые могли бы пригодиться нам для сервиса чатов. В качестве альтернативы можно было бы рассмотреть XMPP (aka Jabber), потому что, в отличие от MQTT и WAMP, он предназначен для мессенджеров, но и там без допилов бы не обошлось. Ещё можно создать свой собственный протокол, что нередко делают в компаниях, но это, в том числе, дополнительные временные затраты.

Это были основные вопросы касательно бэкенда после предыдущей статьи, и, думаю, мы ещё вернёмся к нашей реализации в отдельном материале. А сейчас возвращаю слово Сергею.

Клиент-сервер

Начну с того, что WAMP означает для клиента.

  • В целом протокол предусматривает почти всё. Это облегчает взаимодействие разработчиков клиентской части и бэка.

  • Кодирование всех типов событий в числах (PUBLISH это 16, SUBSCRIBE 32 и так далее). Это усложняет чтение логов разработчику и QA (сразу не догадаться, что значит прилетевшее сообщение [33,11,5862354]).

  • Механизм подписок на события (например, новые сообщения в чат или обновление количества участников) реализован через получение от бэкенда уникального id подписки. Его надо где-то хранить и ни в коем случае не терять во избежание утечек. Как это сделано (было бы сильно проще и подписываться и отписываться просто по id чата):client подписываемся на новые сообщения в чате [32,18,{},"co.fun.chat.testChatId"]backend [33,18,5868752 (id подписки)]client после выхода из чата отписываемся по id [34,20,5868752]

Для работы с сокетом использовали OkHttp (стильно, надёжно, современно, реализация ping-pong таймаутов из коробки) и RxJava, потому что сама концепция чата практически идеальный пример того самого event-based programming, ради которого Rx, в общем, и задумывался.

Теперь рассмотрим пример коннекта к серверу, использующему WAMP-протокол через OkHttpClient:

val request = Request.Builder()    .url(ChatsConfig.SOCKETURL)    .addHeader("Connection", "Upgrade")    .addHeader("Sec-WebSocket-Protocol", "wamp.json")    .addHeader("Authorization", authToken)    .build()val listener = ChatWebSocketListener()webSocket = okHttpClient.newWebSocket(request, listener)

Пример реализации ChatWebSocketListener:

private inner class ChatWebSocketListener : WebSocketListener() {override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { connectionStatusSubject.onNext(ChatConnectionStatuses.NOTCONNECTED) //subject, оповещающий пользователей о состоянии коннекта (в UI нужен для отображения лоадеров, оффлайн-стейтов и так далее)}override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { webSocket.close(1000, null)}override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { onConnectionError("${t.message} ${response?.body}")}override fun onMessage(webSocket: WebSocket, text: String) { socketMessagesSubject.onNext(serverMessageFactory.processMessage(text)) //subject, через который идут все сообщения, которые в дальнейшем фильтруются для конкретных получателей (см. ниже)}override fun onOpen(webSocket: WebSocket, response: Response) { authorize() }}

Здесь мы видим, что все сообщения от сокета приходят в виде обычного String, представляющего собой JSON, закодированный по правилам WAMP протокола и имеющий структуру:

[ResultCode: Int, RequestId: Long, ArgumentsMap: JsonObject ]

Например:

[50, 7, {"type":100, "chats":[список чатов]}]

Декодирование и отправка сообщений

Для декодинга сообщений в объекты мы использовали библиотеку Gson. Все модели ответа отписываются обычными data-классами вида:

@DontObfuscatedata class ChatListResponse(@SerializedName("chats") val chatList: List<Chat>)

А декодирование происходит с помощью следующего кода:

private fun chatListUpdateInternal(jsonChatsResponse: JSONObject):ChatsListUpdatesEvent { return gson.fromJson(jsonChatsResponse.toString(), ChatsListUpdatesEvent::class.java)}

Теперь рассмотрим базовый пример отправки сообщения по сокету. Для удобства мы сделали обёртку для всех базовых типов WAMP сообщений:

sealed class WampMessage { class BaseMessage(val wampId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage()  class ErrorMessage(val procedureId: Int, val seq: Long, val jsonData: JSONArray) : WampMessage() object WelcomeMessage : WampMessage() class AbortMessage(val jsonData: JSONArray) : WampMessage()}

А также добавили фабрику для формирования этих сообщений:

fun getCallMessage(rpc: String,         options: Map<String, Any> = emptyMap(),         arguments: List<Any?> = emptyList(),         argumentsDict: Map<String, Any?> = emptyMap()):WampMessage.BaseMessage { //[CALL, Request|id, Options|dict, Procedure|uri, Arguments|list] val seq = nextSeq.getAndIncrement() return WampMessage.BaseMessage(WAMP.MessageIds.CALL,               seq,               JSONArray(listOfNotNull(WAMP.MessageIds.CALL,               seq,               options,               rpc,               arguments,               argumentsDict)))}

Пример отправки сообщений:

val messages: Observable<WampMessage> = socketMessagesSubjectfun sendMessage(msgToSend: WampMessage.BaseMessage): Observable<WampMessage> { return messages.filter {   it is WampMessage.BaseMessage && it.seq == msgToSend.seq}    .take(1)    .doOnSubscribe {     webSocket.send(msgToSend.jsonData.toString())    }}

Сопоставление отправленного сообщения и ответа на него в WAMP происходит с помощью уникального идентификатора seq, отправляемого клиентом, который потом кладётся в ответ.

В клиенте генерация идентификатора делается следующим образом:

companion object { private val nextSeq: AtomicLong = AtomicLong(1)}fun getNextSeq() = nextSeq.getAndIncrement()

Взаимодействие с WAMP Subscriptions

Подписки в протоколе WAMP концепт, по которому подписчик (клиент) подписывается на какие-либо события, приходящие от бэкенда. В нашей реализации мы использовали:

  • обновление списка чатов;

  • новые сообщения в конкретном чате;

  • изменение онлайн-статуса собеседника;

  • изменение в составе участников чата;

  • смена роли юзера (например, когда его назначают модератором);

  • и так далее.

Клиент сообщает серверу о желании получать события с помощью следующего сообщения:

[SUBSCRIBE: Int, RequestId: Long, Options: Map, Topic: String]

Где topic это скоуп событий, которые нужны подписчику.

Для формирования базового события подписки используется код:

fun getSubscribeMessage(topic: String, options: Map<String, Any> = emptyMap()): WampMessage.BaseMessage { val seq = nextSeq.getAndIncrement() return WampMessage.BaseMessage(WAMP.MessageIds.SUBSCRIBE,                 seq,                JSONArray(listOfNotNull(WAMP.MessageIds.SUBSCRIBE,                                seq,                                options,                                topic)))}

Разумеется, при выходе с экрана (например, списка чатов), необходимо соответствующую подписку корректно отменять. И вот тут выявляется одно из свойств протокола WAMP: при отправке subscribe-сообщения бэкенд возвращает числовой id подписки, и выходит, что отписаться от конкретного топика нельзя нужно запоминать и хранить этот id, чтобы использовать его при необходимости.

А так как хочется оградить пользователей API подписок от лишнего менеджмента айдишников, было сделано следующее:

private val subscriptionsMap = ArrayMap<String, Long>()private fun getBaseSubscription(topic: String): Observable<WampMessage> { val msg = wampClientMessageFactory.getSubscribeMessage(topic) return send(msg).map {   val subscriptionId = converter.getSubscriptionId((it.asBaseMessage()).jsonData)   subscriptionsMap[topic] = subscriptionId   subscriptionId}    .switchMap { subscriptionId ->      chatClient.messages.filter {       it.isMessageFromSubscription(subscriptionId)     }    }}

Так клиент ничего не будет знать об id, и для отписки ему будет достаточно указать имя подписки, которую необходимо отменить:

fun unsubscribeFromTopic(topic: String) { if (!subscriptionsMap.contains(topic)) {    return } val msg = wampClientMessageFactory.getUnsubscribeMessage(subscriptionsMap[topic]) send(msg, true).exSubscribe() subscriptionsMap.remove(topic)}

Это то, что я хотел рассказать про реализацию на Android, но если есть вопросы постараюсь ответить на них в комментариях. Напомню, что про чаты на вебсокетах в iOS мы уже писали вот здесь, а также готовим отдельную часть про бэкенд.

Подробнее..

Как мы просто сократили объем входящего в дата-центр трафика на 70

03.02.2021 22:16:57 | Автор: admin

Хочу рассказать о том, как довольно простым лайфхаком мы радикально сократили объем входящего в дата-центр трафика, одновременно сделав жизнь пользователей нашего мобильного приложения чуть лучше и даже уменьшив расход заряда их батареи.

Единственное, о чем мы пожалели что не применили это решение раньше.

Наша команда придерживается принципов Data-Driven, то есть решения о развитии продуктов принимаются на основе метрик, а внедрение любых новых фич начинается с проверки гипотез. Для этого мы собираем и агрегируем колоссальное количество данных о том, что происходит внутри.

Два года назад, когда мы переходили с RedShift на ClickHouse, количество собираемых аналитических событий (приложение открылось, приложение запросило ленту контента, пользователь просмотрел контент, пользователь поставил смайл (лайк) и так далее) составляло около 5 млрд в сутки. Сегодня это число приближается к 14 млрд.

Это огромный объём данных, для хранения и обработки которого мы постоянно придумываем решения и лайфхаки, а сервис, их агрегирующий один из самых сложных среди всех наших инфраструктурных сервисов.

Но перед тем, как агрегировать, сохранить и обработать столько данных, их надо сначала принять и с этим есть свои проблемы. Часть описана в статье о переходе на ClickHouse (ссылка на неё была выше), но есть и другие.

Например, несколько раз, когда агрегирующий сервис начинал работать с перебоями, клиентские приложения начинали накапливать события и пытаться повторять отправку. Это приводило к эффекту снежного кома, в итоге практически блокируя нам входящие каналы в дата-центре.

Во время брейншторма по поводу одного из таких инцидентов прозвучала идея: раз мы создаём этими событиями такой колоссальный объём трафика, может быть нам начать его сжимать на клиентских устройствах? Отличная идея, но она была отвергнута как неконструктивная. Ведь это не избавит нас от катастрофы, а лишь оттянет её на какое-то время. Поэтому идея отправилась в специальный ящичек для неплохих идей.

Но ближе к лету непростого 2020 года ей нашлось применение.

Протокол HTTP, помимо сжатия ответов (о котором знают все, кто когда-либо оптимизировал скорость работы сайтов), позволяет использовать аналогичный механизм для сжатия тела POST/PUT-запросов, объявив об этом в заголовке Content-Encoding. В качестве входящего обратного прокси и балансировщика нагрузки мы используем nginx, проверенное и надёжное решение. Мы настолько были уверены, что он сумеет ко всему прочему ещё и на лету распаковать тело POST-запроса, что поначалу даже не поверили, что из коробки он этого не умеет. И нет, готовых модулей для этого тоже нет, надо было как-то решать проблему самостоятельно или использовать скрипт на Lua. Идея с Lua нам особенно не понравилась, зато это знание развязало руки в части выбора алгоритма компрессии.

Дело в том, что давно стандартизированные алгоритмы сжатия типа gzip, deflate или LZW были изобретены в 70-х годах XX века, когда каналы связи и носители были узким горлышком, и коэффициент сжатия был важнее, чем потраченное на сжатие время. Сегодня же в кармане каждого из нас лежит универсальный микрокомпьютер первой четверти XXI века, оборудованный подчас четырёх- и более ядерным процессором, способный на куда большее, а значит алгоритм можно выбрать более современный.

Выбор алгоритма

Требования к алгоритму были простыми:

  1. Высокая скорость сжатия. Мы не хотим, чтобы приложения тормозили из-за второстепенной функции.

  2. Небольшое потребление процессорной мощности. Не хотим, чтобы телефоны грелись в руках пользователей.

  3. Хорошая поддержка, доступность для основных языков программирования.

  4. Permissive лицензия.

Сразу отбросили Brotli, предназначенный для одноразового сжатия статического контента на стороне сервера. По схожей причине решили отказаться от bzip, также ориентированного на статику и большие архивы.

В итоге остановились на алгоритме Zstandard, по следующим причинам:

  • Высокая скорость сжатия (на порядок больше, чем у zlib), заточенность на небольшие объёмы данных.

  • Хороший коэффициент сжатия при щадящем уровне потребления CPU.

  • За алгоритмом стоит Facebook, разрабатывавший его для себя.

  • Открытый исходный код, двойная лицензия GPLv2/BSD.

Когда мы увидели первым же в списке поддерживаемых языков JNI, интерфейс вызова нативного кода для JVM, доступный из Kotlin мы поняли, что это судьба. Ведь Kotlin является у нас основным языком разработки как на Android, так и бэкенде. Обёртка для Swift (наш основной язык разработки на iOS) завершила процесс выбора.

Решение на бэкенде

На стороне бэкенда задача была тривиальная: увидев заголовок Content-encoding: zstd, сервис должен получить поток, содержащий сжатое тело запроса, отправить его в декомпрессор zstd, и получить в ответ поток с распакованными данными. То есть буквально (используя JAX-RS container):

// Обёртка над Zstd JNIimport org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;// ...if (  containerRequestContext    .getHeaders()    .getFirst("Content-Encoding")    .equals("zstd")) {  containerRequestContext    .setEntityStream(ZstdCompressorInputStream(      containerRequestContext.getEntityStream()    ))}

Решение на iOS

Решили сначала попробовать сжатие аналитических событий на iOS. Команда разработки была свободнее, ну и клиентов на iOS у нас несколько меньше. На всякий случай закрыли этот функционал при помощи feature toggle с возможностью плавной раскатки.

import Foundationimport ZSTDfinal class ZSTDRequestSerializer {    private let compressionLevel: Int32    init(compressionLevel: Int32) {        self.compressionLevel = compressionLevel    }    func requestBySerializing(request: URLRequest, parameters: [String: Any]?) throws -> URLRequest? {        guard let mutableRequest = (request as NSURLRequest).mutableCopy() as? NSMutableURLRequest else {            return nil        }        // ...        mutableRequest.addValue("zstd", forHTTPHeaderField: "Content-Encoding")        if let parameters = parameters {            let jsonData = try JSONSerialization.data(withJSONObject: parameters, options: [])            let processor = ZSTDProcessor(useContext: true)            let compressedData = try processor.compressBuffer(jsonData, compressionLevel: compressionLevel)            mutableRequest.httpBody = compressedData        }        return mutableRequest as URLRequest    }}

Осторожно включив фичу на 10% клиентов прямо в процессе раскатки очередной версии, затаив дыхание, стали смотреть в логи, метрики, индикатор крашей и прочие инструменты. Проблем не обнаружилось, полёт был нормальный.

Впрочем, и снижение объёма трафика было не сильно заметно. Дождавшись, пока новая версия клиента раскатится пошире, мы врубили сжатие на 100% аудитории.

Результат нас, мягко говоря, удовлетворил:

График падения трафика на iOSГрафик падения трафика на iOS

Входящий трафик упал аж на 25%. На графике представлен весь входящий в наш дата-центр трафик, включающий и штатные запросы клиентов, и закачиваемые ими картинки и видео.

То есть мы на четверть сократили весь объём.

Решение на Android

Воодушевлённые, мы запилили сжатие для второй платформы.

// Тут перехватываем отправку события через interceptor и подменяем оригинальный body на сжатый если это запрос к eventsoverride fun intercept(chain: Interceptor.Chain): Response {   val originalRequest = chain.request()   return if (originalRequest.url.toString()               .endsWith("/events")) {      val compressed = originalRequest.newBuilder()            .header("Content-Encoding", "zstd")            .method(originalRequest.method, zstd(originalRequest.body))            .build()      chain.proceed(compressed)   } else {      chain.proceed(chain.request())   }}// Метод сжатия, берет requestBody и возвращает сжатыйprivate fun zstd(requestBody: RequestBody?): RequestBody {   return object : RequestBody() {      override fun contentType(): MediaType? = requestBody?.contentType()      override fun contentLength(): Long = -1 //We don't know the compressed length in advance!      override fun writeTo(sink: BufferedSink) {         val buffer = Buffer()         requestBody?.writeTo(buffer)         sink.write(Zstd.compress(buffer.readByteArray(), compressLevel))      }   }}

И тут нас ждал шок:

График падения на AndroidГрафик падения на Android

Так как доля Android среди нашей аудитории больше, чем iOS, падение составило ещё 45%. Итого, если считать от исходного уровня, мы выиграли суммарно 70% от, напомню, всего входящего трафика в ДЦ.

К сожалению метрики отдельных хостов у нас не хранятся на такую глубину, но по грубой прикидке именно на отправке аналитических событий выигрыш составил процентов 80, поскольку они представляют из себя довольно монотонный, а значит хорошо сжимаемый JSON.

В этот момент мы только могли пожалеть, что не нашли время на внедрение такой рационализации раньше.

Также стало видно, что наши опасения относительно батарейки не оправдались. Наоборот, потратив немного процессорной мощности телефона на сжатие данных, мы экономим намного больше электричества на передаче этих данных в эфир, как на Wi-Fi, так и по сотовой сети.

Два слова, что ещё можно улучшить

У Zstandard есть очень интересная возможность использования предобученного словаря. Если взять достаточно большой объём типичных данных и прогнать по нему сжатие, то можно получить на выходе словарь наиболее высокочастотных вхождений, который затем использовать и при сжатии, и при распаковке.

При этом коэффициент сжатия увеличивается от 10-15% на текстах до 50% на однообразных наборах строк, как у нас. А скорость сжатия даже несколько увеличивается при размере словаря порядка 16 килобайт. Это, конечно, уже не приведёт к такому впечатляющему результату, но всё равно будет приятно и полезно.

Подробнее..

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru