Русский
Русский
English
Статистика
Реклама

Реактивный масштабируемый чат на Kotlin Spring WebSockets

Содержание

  1. Конфигурация проекта

    1. Логгер

    2. Домен

    3. Маппер

  2. Настройка Spring Security

  3. Конфигурация веб-сокетов

  4. Архитектура решения

  5. Реализация

    1. Интеграция с Redis

    2. Импелементация сервиса

  6. Заключение

Предисловие

В данном туториале будет рассмотрено создание масштабируемого приложения, подключение и общение с котором происходит по веб-сокетам. Рассмотрим и мужественно преодолеем проблему передачи сообщений между инстансами с помощью месседж брокера. В качестве месседж брокера будет использован Redis.

Конфигурация проекта

Начнём с самого важного, конфигурации логгера!

Конфигурация логгера состоит из того, что нам нужно создать prototype bean, который будет конфигурировать логгер для класса, в которой этот бин инжектится.

@Configurationclass LoggingConfig {    @Bean    @Scope("prototype")    fun logger(injectionPoint: InjectionPoint): Logger {        return LoggerFactory.getLogger(                injectionPoint.methodParameter?.containingClass                        ?: injectionPoint.field?.declaringClass        )    }}

Теперь, чтобы получить сконфигурированный логгер, нам достаточно внедрить его.

@Componentclass ChatWebSocketHandlerService(    private val logger: Logger) 

Далее создадим доменку и сконфигурируем маппер для неё

Класс чата содержит базовую информацию, включая участников чата.

data class Chat(    val chatId: UUID,    val chatMembers: List<ChatMember>,    @JsonSerialize(using = LocalDateTimeSerializer::class)    @JsonDeserialize(using = LocalDateTimeDeserializer::class)    val createdDate: LocalDateTime,    var lastMessage: CommonMessage?)

Класс ChatMember описывает участника чата. Из интересного тут - это флаг deletedChat. Его назначение - убрать чат из выборки списка чатов для пользователя с userId.

data class ChatMember(        val userId: UUID,        var fullName: String,        var avatar: String,        var deletedChat: Boolean)

Ниже представлен базовый класс для всех сообщений в чате. Аннотация @JsonTypeInfo тут нужна для того, чтобы классам-наследникам при заворачивании в JSON проставлялось поле @type с указанием типа сообщения, а при разворачивании были проставлены поля базового класса.

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)open class CommonMessage(    val messageId: UUID,    val chatId: UUID,    val sender: ChatMember,    @field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class)    val messageDate: LocalDateTime,    var seen: Boolean)

Пример конкретного класса сообщения TextMessage - текстового сообщения

class TextMessage(    messageId: UUID,    chatId: UUID,    sender: ChatMember,    var content: String,    messageDate: LocalDateTime,    seen: Boolean) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen)

Сконфигурируем ObjectMapper

В registerSubtypes добавляются классы-наследники, которые будут сериализоваться и десериализоваться в JSON. Это необходимо для того, чтобы после десериализации определить тип конкретный тип и передать на обработку в нужный метод

@Configurationclass ObjectMapperConfig {    @Bean    fun objectMapper(): ObjectMapper = ObjectMapper()        .registerModule(JavaTimeModule())        .registerModule(Jdk8Module())        .registerModule(ParameterNamesModule())        .registerModule(KotlinModule())        .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)        .apply {            registerSubtypes(                NamedType(NewMessageEvent::class.java, "NewMessageEvent"),                NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"),                NamedType(TextMessage::class.java, "TextMessage"),                NamedType(ImageMessage::class.java, "ImageMessage")            )        }}

Конфигурация Spring Security

Для начала нам понадобится ReactiveAuthenticationManager и SecurityContextRepository. Для аутентификации будем использовать JWT, поэтому создаем класс JwtAuthenticationManager со следующим содержанием:

@Componentclass JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager {    override fun authenticate(authentication: Authentication): Mono<Authentication> {        val token = authentication.credentials.toString()        val validateToken = jwtUtil.validateToken(token)        var username: String?        try {            username = jwtUtil.extractUsername(token)        } catch (e: Exception) {            username = null            println(e)        }        return if (username != null && validateToken) {            val claims = jwtUtil.getClaimsFromToken(token)            val role: List<String> = claims["roles"] as List<String>            val authorities = role.stream()                    .map { role: String? -> SimpleGrantedAuthority(role) }                    .collect(Collectors.toList())            val authenticationToken = UsernamePasswordAuthenticationToken(                    username,                    null,                    authorities            )            authenticationToken.details = claims            Mono.just(authenticationToken)        } else {            Mono.empty()        }    }}

Чтобы везде, где необходимо, иметь возможность извлечь информацию из seucirty context, заносим claims в details токена (строка 25).

Для извлечения токена из запроса создаем класс SecurityContextRepository. Извлекать токен будем двумя способами:

  1. Заголовок Authorization: Bearer ${JWT_TOKEN}

  2. GET параметр ?access_token=${JWT_TOKEN}

@Componentclass SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository {    override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> {        return Mono.error { IllegalStateException("Save method not supported") }    }    override fun load(exchange: ServerWebExchange): Mono<SecurityContext> {        val authHeader = exchange.request            .headers            .getFirst(HttpHeaders.AUTHORIZATION)        val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) {            authHeader.substring(7)        } else exchange.request            .queryParams            .getFirst("access_token") ?: return Mono.empty()        val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken)        return authenticationManager            .authenticate(auth)            .map { authentication: Authentication -> SecurityContextImpl(authentication) }    }}

Теперь имея два необходимых класса мы можем сконфигурировать сам Spring Security.

@EnableWebFluxSecurity@EnableReactiveMethodSecurityclass SecurityConfig(    val reactiveAuthenticationManager: ReactiveAuthenticationManager,    val securityContextRepository: SecurityContextRepository) {    @Bean    fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain {        return httpSecurity            .exceptionHandling()            .authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException ->                Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED }            }            .accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException ->                Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN }            }            .and()            .csrf().disable()            .cors().disable()            .formLogin().disable()            .httpBasic().disable()            .authenticationManager(reactiveAuthenticationManager)            .securityContextRepository(securityContextRepository)            .authorizeExchange()            .pathMatchers("/actuator/**").permitAll()            .pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER")            .anyExchange().authenticated()            .and()            .build()    }}

Здесь из интересного: конфигурация позволяет подключиться по путям начинающимся с /ws только аутентифицированным пользователям, у которых есть роль ROLE_USER.

С конфигурацией Security закончили, теперь необходимо сконфигурировать подключение по сокетам.

Конфигурация веб-сокетов

В первую очередь нам необходимо задать маппинг между запросом и обработчиком. Чтобы добавить обработчик сокетов по определенному адресу, мы делаем следующее:

  1. Создаем мапу, где ключ - uri, а значение - обработчик. В этом конкретном случае WebSocketHandler.

  2. Создаем обработчик для ранее определенного маппинга и cors.

@Configurationclass ReactiveWebSocketConfig {    @Bean    fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping {        val map: MutableMap<String, WebSocketHandler> = HashMap()        map["/ws/chat"] = chatWebSocketHandler        val handlerMapping = SimpleUrlHandlerMapping()        handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues()))        handlerMapping.order = 1        handlerMapping.urlMap = map        return handlerMapping    }    @Bean    fun handlerAdapter(): WebSocketHandlerAdapter {        return WebSocketHandlerAdapter()    }}

Здесь в качестве обработчика для uri /ws/chat указываем chatWebSocketHandler, его вид представлен ниже, имплементацией займемся позднее. Этот класс реализует интерфейс WebSocketHandler, содержащий один метод handle(session: WebSocketSession): Mono<Void>

@Componentclass ChatWebSocketHandler : WebSocketHandler {    override fun handle(session: WebSocketSession): Mono<Void> {        TODO("Not yet implemented")    }}

С базовой конфигурацией закончили.

Поговорим об архитектуре решения

На данным момент наш чат нельзя масштабировать. Так как подключение по веб-сокету дуплексное и, установив соединение с одним инстансом, все запросы по этому сокету будут направляться на этот инстанс, так при запуске двух и более инстансов у нас не будет доступа до всех имеющихся сессий. Становится возможной ситуация, когда собеседники из одного чата подключены к разным инстансам и нет возможности сразу отправить сообщение по сокетам всем участникам. Для решения необходим Message Broker, который будет получать на вход сообщение для определенного чата и рассылать его на все инстансы. Инстансы посмотрят на сообщение, возьмут информацию об участниках чата из БД, и будут искать этих участников среди своих подключенных пользователей.

Представим, что участники одного чата User 1 и User 2 подключены к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User 2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в чат и будет отправлено в Message broker, оттуда разослано по всем инстансам. Chat-Instance-1 получит это сообщение и увидит, что у него есть User 2, который относится к этому чату и ему необходимо отправить это сообщение.

Реализация

Теперь займемся имплементацией нашего обработчика ChatWebSocketHandler

Нам понадобится мапа userId => session, для того, чтобы хранить открытые сессии и иметь возможность достать их по userId. Для поддержки одновременной работы с несколькими сессиями из под одного userId интерфейс мапы будет следующим: MutableMap<UUID, LinkedList<WebSocketSession>>.

Добавлять в мапу запись мы будем при подписке на стрим session.receive, а подчищать будем в doFinally.

В методе getReceiverStream создается стрим-обработчик сообщений, пришедших от клиента. Мы получаем payload как строку и преобразуем его к базовому WebSocketEvent, после чего в зависимости от типа event'a передаем его на обработку в слой сервисов.

В методе getSenderStream происходит конфигурация стрима, который занимается отправкой сообщений по сокету клиенту

@Componentclass ChatWebSocketHandler(    val objectMapper: ObjectMapper,    val logger: Logger,    val chatService: ChatService,    val objectStringConverter: ObjectStringConverter,    val sinkWrapper: SinkWrapper) : WebSocketHandler {    private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap()    override fun handle(session: WebSocketSession): Mono<Void> {        return ReactiveSecurityContextHolder.getContext()            .flatMap { ctx ->                val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString())                val sender = getSenderStream(session, userId)                val receiver = getReceiverStream(session, userId)                return@flatMap Mono.zip(sender, receiver).then()            }    }    private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> {        return session.receive()            .filter { it.type == WebSocketMessage.Type.TEXT }            .map(WebSocketMessage::getPayloadAsText)            .flatMap {                objectStringConverter.stringToObject(it, WebSocketEvent::class.java)            }            .flatMap { convertedEvent ->                when (convertedEvent) {                    is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent)                    is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId)                    else -> Mono.error(RuntimeException())                }            }            .onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) }            .doOnSubscribe {                val userSession = userIdToSession[userId]                if (userSession == null) {                    val newUserSessions = LinkedList<WebSocketSession>()                    userIdToSession[userId] = newUserSessions                }                userIdToSession[userId]?.add(session)            }            .doFinally {                val userSessions = userIdToSession[userId]                userSessions?.remove(session)            }            .then()    }    private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> {        val sendMessage = sinkWrapper.sinks.asFlux()            .filter { sendTo -> sendTo.userId == userId }            .map { sendTo -> objectMapper.writeValueAsString(sendTo.event) }            .map { stringObject -> session.textMessage(stringObject) }            .doOnError { logger.error("", it) }        return session.send(sendMessage)    }}

Для того чтобы писать в websocket нам необходимо создать поток данных, в который мы сможем добавлять данные. С reactora 3.4 для этого рекомендуется использовать Sinks.Many. Создадим такой поток в классе SinkWrapper.

@Componentclass SinkWrapper {    val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer()}

Теперь, отправив данные в этот поток, они будут обработаны в потоке, сформированном в getSenderStream.

Интеграция с Redis

У Redis есть PUB/SUB модель общения, которая прекрасно решает задачу транслирования сообщений между инстансами.

Итак, для приготовления данного блюда нам понадобится:

  1. RedisChatMessageListener - подписка на топики и перенаправление сообщение в слой сервисов

  2. RedisChatMessagePublisher - публикация сообщений в топики

  3. RedisConfig - конфигурация редиса

  4. RedisListenerStarter - старт листенеров при старте инстанса

Реализация:

RedisConfig стандартный, ничего особенного

@Configurationclass RedisConfig {    @Bean    fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory {        val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port)        redisStandaloneConfiguration.setPassword(redisProperties.password)        return LettuceConnectionFactory(redisStandaloneConfiguration)    }    @Bean    fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate {        return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory)    }}

RedisChatMessageListener

Здесь мы создаем подписку на топик по имени базового класса (обычно название топиков выносят в проперти). Получив сообщение из канала преобразуем его в объект (строка 13) и дальше передаем в sendMessage, который достанет участников чата и попробует разослать им это сообщение, если таковы имеются среди подключенных к инстансу.

@Componentclass RedisChatMessageListener(    private val logger: Logger,    private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,    private val objectStringConverter: ObjectStringConverter,    private val chatService: ChatService) {    fun subscribeOnCommonMessageTopic(): Mono<Void> {        return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name))            .map { message -> message.message }            .doOnNext { logger.info("Receive new message: $it") }            .flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) }            .flatMap { message ->                when (message) {                    is TextMessage -> chatService.sendMessage(message)                    is ImageMessage -> chatService.sendMessage(message)                    else -> Mono.error(RuntimeException())                }            }            .then()    }}

RedisChatMessagePublisher

Паблишер имеет один метод для транслирования CommonMessage на все инстансы. Объект сообщения приводится к строке и публикуется в топик по имени базового класса.

@Componentclass RedisChatMessagePublisher(    val logger: Logger,    val reactiveStringRedisTemplate: ReactiveStringRedisTemplate,    val objectStringConverter: ObjectStringConverter) {    fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {        return objectStringConverter.objectToString(commonMessage)            .flatMap {                logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}")                reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it)            }            .then()    }}

RedisListenerStarter

В этом классе стартуются все листенеры из RedisChatMessageListener. В нашем случае - единственный листенер subscribeOnCommonMessageTopic

@Componentclass RedisListenerStarter(    val logger: Logger,    val redisChatMessageListener: RedisChatMessageListener) {    @Bean    fun newMessageEventChannelListenerStarter(): ApplicationRunner {        return ApplicationRunner { args: ApplicationArguments ->            redisChatMessageListener.subscribeOnCommonMessageTopic()                .doOnSubscribe { logger.info("Start NewMessageEvent channel listener") }                .onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) }                .subscribe()        }    }}

Импелементация сервиса

Упрощенная реализация, без сохранения сообщений в БД с замоканным chatRepository. В виду того, что статья выходит и так больше, чем я рассчитывал.

Метод handleNewMessageEvent вызывается из WebSocketHandler и получает на вход userId отправителя и NewMessageEvent - простое текстовое сообщение. В методе происходит проверка на то, что отправитель действительно является участником чата и дальше это сообщение транслируется между инстансами.

@Serviceclass DefaultChatService(    val logger: Logger,    val sinkWrapper: SinkWrapper,    val chatRepository: ChatRepository,    val redisChatPublisher: RedisChatMessagePublisher) : ChatService {    override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> {        logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent")        return chatRepository.findById(newMessageEvent.chatId)            .filter { it.chatMembers.map(ChatMember::userId).contains(senderId) }            .flatMap { chat ->                val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false)                chat.lastMessage = textMessage                return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage))            }            .flatMap { broadcastMessage(it.t2) }    }    /**     * Broadcast the message between instances     */    override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> {        return redisChatPublisher.broadcastMessage(commonMessage)    }    /**     * Send the message to all of chatMembers of message chat direct     */    override fun sendMessage(message: CommonMessage): Mono<Void> {        return chatRepository.findById(message.chatId)            .map { it.chatMembers }            .flatMapMany { Flux.fromIterable(it) }            .flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) }            .then()    }    override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> {        return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) }            .then()    }}

Заключение

В качестве дальнейших доработок можно произвести разделение получаемых и отправляемых ивентов на отдельные классы. Также в месте, где происходит получение сообщения по сокетам от клиента, его приведение к WebSocketEvent и передача в обработчик, можно попробовать избавиться от хардкодного маппинка event => handler. Пока не думал, как это можно сделать красивее, но уверен, что решение есть.

Проект на GitHub

Источник: habr.com
К списку статей
Опубликовано: 13.04.2021 18:11:50
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Java

Kotlin

Spring

Microservice

Redis

Chat

Категории

Последние комментарии

© 2006-2021, personeltest.ru