Содержание
-
Конфигурация проекта
-
Логгер
-
Домен
-
Маппер
-
Настройка Spring Security
-
Конфигурация веб-сокетов
-
Архитектура решения
-
Реализация
-
Интеграция с Redis
-
Импелементация сервиса
-
Заключение
Предисловие
В данном туториале будет рассмотрено создание масштабируемого
приложения, подключение и общение с котором происходит по
веб-сокетам. Рассмотрим и мужественно преодолеем проблему передачи
сообщений между инстансами с помощью месседж брокера. В качестве
месседж брокера будет использован Redis.
Конфигурация проекта
Начнём с самого важного, конфигурации логгера!
Конфигурация логгера состоит из того, что нам нужно создать
prototype bean, который будет конфигурировать логгер для класса, в
которой этот бин инжектится.
@Configurationclass LoggingConfig { @Bean @Scope("prototype") fun logger(injectionPoint: InjectionPoint): Logger { return LoggerFactory.getLogger( injectionPoint.methodParameter?.containingClass ?: injectionPoint.field?.declaringClass ) }}
Теперь, чтобы получить сконфигурированный логгер, нам достаточно
внедрить его.
@Componentclass ChatWebSocketHandlerService( private val logger: Logger)
Далее создадим доменку и сконфигурируем маппер для неё
Класс чата содержит базовую информацию, включая участников
чата.
data class Chat( val chatId: UUID, val chatMembers: List<ChatMember>, @JsonSerialize(using = LocalDateTimeSerializer::class) @JsonDeserialize(using = LocalDateTimeDeserializer::class) val createdDate: LocalDateTime, var lastMessage: CommonMessage?)
Класс ChatMember описывает участника чата. Из интересного тут -
это флаг deletedChat. Его назначение - убрать чат из выборки списка
чатов для пользователя с userId.
data class ChatMember( val userId: UUID, var fullName: String, var avatar: String, var deletedChat: Boolean)
Ниже представлен базовый класс для всех сообщений в чате.
Аннотация @JsonTypeInfo тут нужна для того, чтобы
классам-наследникам при заворачивании в JSON проставлялось поле
@type с указанием типа сообщения, а при
разворачивании были проставлены поля базового класса.
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)open class CommonMessage( val messageId: UUID, val chatId: UUID, val sender: ChatMember, @field:JsonSerialize(using = LocalDateTimeSerializer::class) @field:JsonDeserialize(using = LocalDateTimeDeserializer::class) val messageDate: LocalDateTime, var seen: Boolean)
Пример конкретного класса сообщения TextMessage
- текстового сообщения
class TextMessage( messageId: UUID, chatId: UUID, sender: ChatMember, var content: String, messageDate: LocalDateTime, seen: Boolean) : CommonMessage(messageId, chatId, sender, messageDate, messageType, seen)
Сконфигурируем ObjectMapper
В registerSubtypes добавляются классы-наследники, которые будут
сериализоваться и десериализоваться в JSON. Это необходимо для
того, чтобы после десериализации определить тип конкретный тип и
передать на обработку в нужный метод
@Configurationclass ObjectMapperConfig { @Bean fun objectMapper(): ObjectMapper = ObjectMapper() .registerModule(JavaTimeModule()) .registerModule(Jdk8Module()) .registerModule(ParameterNamesModule()) .registerModule(KotlinModule()) .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) .apply { registerSubtypes( NamedType(NewMessageEvent::class.java, "NewMessageEvent"), NamedType(MarkMessageAsRead::class.java, "MarkMessageAsRead"), NamedType(TextMessage::class.java, "TextMessage"), NamedType(ImageMessage::class.java, "ImageMessage") ) }}
Конфигурация Spring Security
Для начала нам понадобится ReactiveAuthenticationManager и
SecurityContextRepository. Для аутентификации будем использовать
JWT, поэтому создаем класс JwtAuthenticationManager со следующим
содержанием:
@Componentclass JwtAuthenticationManager(val jwtUtil: JwtUtil) : ReactiveAuthenticationManager { override fun authenticate(authentication: Authentication): Mono<Authentication> { val token = authentication.credentials.toString() val validateToken = jwtUtil.validateToken(token) var username: String? try { username = jwtUtil.extractUsername(token) } catch (e: Exception) { username = null println(e) } return if (username != null && validateToken) { val claims = jwtUtil.getClaimsFromToken(token) val role: List<String> = claims["roles"] as List<String> val authorities = role.stream() .map { role: String? -> SimpleGrantedAuthority(role) } .collect(Collectors.toList()) val authenticationToken = UsernamePasswordAuthenticationToken( username, null, authorities ) authenticationToken.details = claims Mono.just(authenticationToken) } else { Mono.empty() } }}
Чтобы везде, где необходимо, иметь возможность извлечь
информацию из seucirty context, заносим claims в details токена
(строка 25).
Для извлечения токена из запроса создаем класс
SecurityContextRepository. Извлекать токен будем двумя
способами:
-
Заголовок Authorization: Bearer ${JWT_TOKEN}
-
GET параметр ?access_token=${JWT_TOKEN}
@Componentclass SecurityContextRepository(val authenticationManager: ReactiveAuthenticationManager) : ServerSecurityContextRepository { override fun save(exchange: ServerWebExchange, context: SecurityContext): Mono<Void> { return Mono.error { IllegalStateException("Save method not supported") } } override fun load(exchange: ServerWebExchange): Mono<SecurityContext> { val authHeader = exchange.request .headers .getFirst(HttpHeaders.AUTHORIZATION) val accessToken: String = if (authHeader != null && authHeader.startsWith("Bearer ")) { authHeader.substring(7) } else exchange.request .queryParams .getFirst("access_token") ?: return Mono.empty() val auth = UsernamePasswordAuthenticationToken(accessToken, accessToken) return authenticationManager .authenticate(auth) .map { authentication: Authentication -> SecurityContextImpl(authentication) } }}
Теперь имея два необходимых класса мы можем сконфигурировать сам
Spring Security.
@EnableWebFluxSecurity@EnableReactiveMethodSecurityclass SecurityConfig( val reactiveAuthenticationManager: ReactiveAuthenticationManager, val securityContextRepository: SecurityContextRepository) { @Bean fun securityWebFilterChain(httpSecurity: ServerHttpSecurity): SecurityWebFilterChain { return httpSecurity .exceptionHandling() .authenticationEntryPoint { swe: ServerWebExchange, e: AuthenticationException -> Mono.fromRunnable { swe.response.statusCode = HttpStatus.UNAUTHORIZED } } .accessDeniedHandler { swe: ServerWebExchange, e: AccessDeniedException -> Mono.fromRunnable { swe.response.statusCode = HttpStatus.FORBIDDEN } } .and() .csrf().disable() .cors().disable() .formLogin().disable() .httpBasic().disable() .authenticationManager(reactiveAuthenticationManager) .securityContextRepository(securityContextRepository) .authorizeExchange() .pathMatchers("/actuator/**").permitAll() .pathMatchers(HttpMethod.GET, "/ws/**").hasAuthority("ROLE_USER") .anyExchange().authenticated() .and() .build() }}
Здесь из интересного: конфигурация позволяет подключиться по
путям начинающимся с /ws только аутентифицированным пользователям,
у которых есть роль ROLE_USER.
С конфигурацией Security закончили, теперь необходимо
сконфигурировать подключение по сокетам.
Конфигурация веб-сокетов
В первую очередь нам необходимо задать маппинг между запросом и
обработчиком. Чтобы добавить обработчик сокетов по определенному
адресу, мы делаем следующее:
-
Создаем мапу, где ключ - uri, а значение - обработчик. В этом
конкретном случае WebSocketHandler.
-
Создаем обработчик для ранее определенного маппинга и cors.
@Configurationclass ReactiveWebSocketConfig { @Bean fun webSocketHandlerMapping(chatWebSocketHandler: ChatWebSocketHandler): HandlerMapping { val map: MutableMap<String, WebSocketHandler> = HashMap() map["/ws/chat"] = chatWebSocketHandler val handlerMapping = SimpleUrlHandlerMapping() handlerMapping.setCorsConfigurations(Collections.singletonMap("*", CorsConfiguration().applyPermitDefaultValues())) handlerMapping.order = 1 handlerMapping.urlMap = map return handlerMapping } @Bean fun handlerAdapter(): WebSocketHandlerAdapter { return WebSocketHandlerAdapter() }}
Здесь в качестве обработчика для uri /ws/chat указываем
chatWebSocketHandler, его вид представлен ниже, имплементацией
займемся позднее. Этот класс реализует интерфейс WebSocketHandler,
содержащий один метод handle(session: WebSocketSession):
Mono<Void>
@Componentclass ChatWebSocketHandler : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { TODO("Not yet implemented") }}
С базовой конфигурацией закончили.
Поговорим об архитектуре решения
На данным момент наш чат нельзя масштабировать. Так как
подключение по веб-сокету дуплексное и, установив соединение с
одним инстансом, все запросы по этому сокету будут направляться на
этот инстанс, так при запуске двух и более инстансов у нас не будет
доступа до всех имеющихся сессий. Становится возможной ситуация,
когда собеседники из одного чата подключены к разным инстансам и
нет возможности сразу отправить сообщение по сокетам всем
участникам. Для решения необходим Message Broker, который будет
получать на вход сообщение для определенного чата и рассылать его
на все инстансы. Инстансы посмотрят на сообщение, возьмут
информацию об участниках чата из БД, и будут искать этих участников
среди своих подключенных пользователей.
Представим, что участники одного чата User 1 и User 2 подключены
к разным инстансам чата. User 1 подключен к Chat-Instance-0, а User
2 к Chat-Instance-1. Тогда, когда User 1 отправит сообщение в
Chat-Instance-0 (зеленая пунктирная линия), это сообщение попадёт в
чат и будет отправлено в Message broker, оттуда разослано по всем
инстансам. Chat-Instance-1 получит это сообщение и увидит, что у
него есть User 2, который относится к этому чату и ему необходимо
отправить это сообщение.
Реализация
Теперь займемся имплементацией нашего обработчика
ChatWebSocketHandler
Нам понадобится мапа userId => session, для того, чтобы
хранить открытые сессии и иметь возможность достать их по userId.
Для поддержки одновременной работы с несколькими сессиями из под
одного userId интерфейс мапы будет следующим: MutableMap<UUID,
LinkedList<WebSocketSession>>.
Добавлять в мапу запись мы будем при подписке на стрим
session.receive, а подчищать будем в doFinally.
В методе getReceiverStream создается стрим-обработчик сообщений,
пришедших от клиента. Мы получаем payload как строку и преобразуем
его к базовому WebSocketEvent, после чего в зависимости от типа
event'a передаем его на обработку в слой сервисов.
В методе getSenderStream происходит конфигурация стрима, который
занимается отправкой сообщений по сокету клиенту
@Componentclass ChatWebSocketHandler( val objectMapper: ObjectMapper, val logger: Logger, val chatService: ChatService, val objectStringConverter: ObjectStringConverter, val sinkWrapper: SinkWrapper) : WebSocketHandler { private val userIdToSession: MutableMap<UUID, LinkedList<WebSocketSession>> = ConcurrentHashMap() override fun handle(session: WebSocketSession): Mono<Void> { return ReactiveSecurityContextHolder.getContext() .flatMap { ctx -> val userId = UUID.fromString((ctx.authentication.details as Claims)["id"].toString()) val sender = getSenderStream(session, userId) val receiver = getReceiverStream(session, userId) return@flatMap Mono.zip(sender, receiver).then() } } private fun getReceiverStream(session: WebSocketSession, userId: UUID): Mono<Void> { return session.receive() .filter { it.type == WebSocketMessage.Type.TEXT } .map(WebSocketMessage::getPayloadAsText) .flatMap { objectStringConverter.stringToObject(it, WebSocketEvent::class.java) } .flatMap { convertedEvent -> when (convertedEvent) { is NewMessageEvent -> chatService.handleNewMessageEvent(userId, convertedEvent) is MarkMessageAsRead -> chatService.markPreviousMessagesAsRead(convertedEvent.messageId) else -> Mono.error(RuntimeException()) } } .onErrorContinue { t, _ -> logger.error("Error occurred with receiver stream", t) } .doOnSubscribe { val userSession = userIdToSession[userId] if (userSession == null) { val newUserSessions = LinkedList<WebSocketSession>() userIdToSession[userId] = newUserSessions } userIdToSession[userId]?.add(session) } .doFinally { val userSessions = userIdToSession[userId] userSessions?.remove(session) } .then() } private fun getSenderStream(session: WebSocketSession, userId: UUID): Mono<Void> { val sendMessage = sinkWrapper.sinks.asFlux() .filter { sendTo -> sendTo.userId == userId } .map { sendTo -> objectMapper.writeValueAsString(sendTo.event) } .map { stringObject -> session.textMessage(stringObject) } .doOnError { logger.error("", it) } return session.send(sendMessage) }}
Для того чтобы писать в websocket нам необходимо создать поток
данных, в который мы сможем добавлять данные. С reactora 3.4 для
этого рекомендуется использовать Sinks.Many. Создадим такой поток в
классе SinkWrapper.
@Componentclass SinkWrapper { val sinks: Sinks.Many<SendTo> = Sinks.many().multicast().onBackpressureBuffer()}
Теперь, отправив данные в этот поток, они будут обработаны в
потоке, сформированном в getSenderStream.
Интеграция с Redis
У Redis есть PUB/SUB модель общения, которая прекрасно решает
задачу транслирования сообщений между инстансами.
Итак, для приготовления данного блюда нам понадобится:
-
RedisChatMessageListener - подписка на топики и перенаправление
сообщение в слой сервисов
-
RedisChatMessagePublisher - публикация сообщений в топики
-
RedisConfig - конфигурация редиса
-
RedisListenerStarter - старт листенеров при старте инстанса
Реализация:
RedisConfig стандартный, ничего особенного
@Configurationclass RedisConfig { @Bean fun reactiveRedisConnectionFactory(redisProperties: RedisProperties): ReactiveRedisConnectionFactory { val redisStandaloneConfiguration = RedisStandaloneConfiguration(redisProperties.host, redisProperties.port) redisStandaloneConfiguration.setPassword(redisProperties.password) return LettuceConnectionFactory(redisStandaloneConfiguration) } @Bean fun template(reactiveRedisConnectionFactory: ReactiveRedisConnectionFactory): ReactiveStringRedisTemplate { return ReactiveStringRedisTemplate(reactiveRedisConnectionFactory) }}
RedisChatMessageListener
Здесь мы создаем подписку на топик по имени базового класса
(обычно название топиков выносят в проперти). Получив сообщение из
канала преобразуем его в объект (строка 13) и дальше передаем в
sendMessage, который достанет участников чата и попробует разослать
им это сообщение, если таковы имеются среди подключенных к
инстансу.
@Componentclass RedisChatMessageListener( private val logger: Logger, private val reactiveStringRedisTemplate: ReactiveStringRedisTemplate, private val objectStringConverter: ObjectStringConverter, private val chatService: ChatService) { fun subscribeOnCommonMessageTopic(): Mono<Void> { return reactiveStringRedisTemplate.listenTo(PatternTopic(CommonMessage::class.java.name)) .map { message -> message.message } .doOnNext { logger.info("Receive new message: $it") } .flatMap { objectStringConverter.stringToObject(it, CommonMessage::class.java) } .flatMap { message -> when (message) { is TextMessage -> chatService.sendMessage(message) is ImageMessage -> chatService.sendMessage(message) else -> Mono.error(RuntimeException()) } } .then() }}
RedisChatMessagePublisher
Паблишер имеет один метод для транслирования CommonMessage на
все инстансы. Объект сообщения приводится к строке и публикуется в
топик по имени базового класса.
@Componentclass RedisChatMessagePublisher( val logger: Logger, val reactiveStringRedisTemplate: ReactiveStringRedisTemplate, val objectStringConverter: ObjectStringConverter) { fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> { return objectStringConverter.objectToString(commonMessage) .flatMap { logger.info("Broadcast message $it to channel ${CommonMessage::class.java.name}") reactiveStringRedisTemplate.convertAndSend(CommonMessage::class.java.name, it) } .then() }}
RedisListenerStarter
В этом классе стартуются все листенеры из
RedisChatMessageListener. В нашем случае -
единственный листенер
subscribeOnCommonMessageTopic
@Componentclass RedisListenerStarter( val logger: Logger, val redisChatMessageListener: RedisChatMessageListener) { @Bean fun newMessageEventChannelListenerStarter(): ApplicationRunner { return ApplicationRunner { args: ApplicationArguments -> redisChatMessageListener.subscribeOnCommonMessageTopic() .doOnSubscribe { logger.info("Start NewMessageEvent channel listener") } .onErrorContinue { throwable, _ -> logger.error("Error occurred while listening NewMessageEvent channel", throwable) } .subscribe() } }}
Импелементация сервиса
Упрощенная реализация, без сохранения сообщений в БД с
замоканным chatRepository. В виду того, что статья выходит и так
больше, чем я рассчитывал.
Метод handleNewMessageEvent вызывается из
WebSocketHandler и получает на вход
userId отправителя и
NewMessageEvent - простое текстовое сообщение. В
методе происходит проверка на то, что отправитель действительно
является участником чата и дальше это сообщение транслируется между
инстансами.
@Serviceclass DefaultChatService( val logger: Logger, val sinkWrapper: SinkWrapper, val chatRepository: ChatRepository, val redisChatPublisher: RedisChatMessagePublisher) : ChatService { override fun handleNewMessageEvent(senderId: UUID, newMessageEvent: NewMessageEvent): Mono<Void> { logger.info("Receive NewMessageEvent from $senderId: $newMessageEvent") return chatRepository.findById(newMessageEvent.chatId) .filter { it.chatMembers.map(ChatMember::userId).contains(senderId) } .flatMap { chat -> val textMessage = TextMessage(UUID.randomUUID(), chat.chatId, chat.chatMembers.first { it.userId == senderId }, newMessageEvent.content, LocalDateTime.now(), false) chat.lastMessage = textMessage return@flatMap Mono.zip(chatRepository.save(chat), Mono.just(textMessage)) } .flatMap { broadcastMessage(it.t2) } } /** * Broadcast the message between instances */ override fun broadcastMessage(commonMessage: CommonMessage): Mono<Void> { return redisChatPublisher.broadcastMessage(commonMessage) } /** * Send the message to all of chatMembers of message chat direct */ override fun sendMessage(message: CommonMessage): Mono<Void> { return chatRepository.findById(message.chatId) .map { it.chatMembers } .flatMapMany { Flux.fromIterable(it) } .flatMap { member -> sendEventToUserId(member.userId, ChatMessageEvent(message.chatId, message)) } .then() } override fun sendEventToUserId(userId: UUID, webSocketEvent: WebSocketEvent): Mono<Void> { return Mono.fromCallable { sinkWrapper.sinks.emitNext(SendTo(userId, webSocketEvent), Sinks.EmitFailureHandler.FAIL_FAST) } .then() }}
Заключение
В качестве дальнейших доработок можно произвести разделение
получаемых и отправляемых ивентов на отдельные классы. Также в
месте, где происходит получение сообщения по сокетам от клиента,
его приведение к WebSocketEvent и передача в обработчик, можно
попробовать избавиться от хардкодного маппинка event => handler.
Пока не думал, как это можно сделать красивее, но уверен, что
решение есть.
Проект на GitHub