Русский
Русский
English
Статистика
Реклама

Изучаю Scala Часть 4 WebSocket


Привет, Хабр! На этот раз я по пробовал сделать простенький чат через ВебСокеты. За подробностями добро пожаловать под кат.

Содержание



Ссылки


  1. Исходники
  2. Образы docker image
  3. Tapir
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


Собственно весь код находиться в одном объект ChatHub
class ChatHub[F[_]] private(                             val topic: Topic[F, WebSocketFrame],                             private val ref: Ref[F, Int]                           )                           (                             implicit concurrent: Concurrent[F],                             timer: Timer[F]                           ) extends Http4sDsl[F] {  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint    .get    .in("chat")    .tag("WebSockets")    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")    .description("Подключает к общему чату")    .in(      stringBody        .description("Сообщение которое будет отправлено пользователям в чате")        .example("Привет!")    )    .out(      stringBody        .description("Сообщение которое кто-то написал в чат")        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")    )    //Заглушка которая всегда отвечает ошибкой.     .serverLogic(_ => IO(Left(()): Either[Unit, String]))  def routeWs: HttpRoutes[F] = {    HttpRoutes.of[F] {      case GET -> Root / "chat" => logic()    }  }  private def logic(): F[Response[F]] = {    val toClient: Stream[F, WebSocketFrame] =      topic.subscribe(1000)    val fromClient: Pipe[F, WebSocketFrame, Unit] =      handle    WebSocketBuilder[F].build(toClient, fromClient)  }  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s    .collect({      case WebSocketFrame.Text(text, _) => text    })    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))    .through(topic.publish)}object ChatHub {  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {    ref <- Ref.of[F, Int](0)    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))  } yield new ChatHub(topic, ref)}

Тут надо сразу сказать про Topic примитив синхронизации из Fs2 который позволяет сделать модель Publisher Subscriber причем у вас может быть много Publisher и одновременно много Subscriber. Вообще в него лучшее отправлять сообщения через какой-то буфер вроде Queue потому что у него есть ограничения на количество сообщения в очереди и Publisher ждет пока все Subscriber не получат сообщения в свою очередь сообщений и если она переполнена то может и зависнуть.
val topic: Topic[F, WebSocketFrame],

Тут еще я считаю количество сообщений которые были переданы в чат как номер каждого сообщения. Так как это мне нужно делать из разных потоков я использовал аналог Atomic который тут называется Ref и гарантирует атомарность операции.
  private val ref: Ref[F, Int]

Обработка потока сообщений от пользователей.
  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =     stream//Достаем из фрейма текстовое сообщение и фильтруем фреймы.     .collect({      case WebSocketFrame.Text(text, _) => text    })//Атомарно увеличиваем наш счетчик с сохранением нового значения и добавления его значения к тексту сообщения пользователя.    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))//Каждое пришедшее сообщение отправляем в топик    .through(topic.publish)

Собственно сама логика создания сокета.
private def logic(): F[Response[F]] = {//Откуда получать данные для клиента.    val toClient: Stream[F, WebSocketFrame] =//Просто подписываемся на данные которые будут приходить в топик      topic.subscribe(1000)//Что будем делать с данными которые приходить от клиента    val fromClient: Pipe[F, WebSocketFrame, Unit] =//Просто отправляем данные в топик после обработки      handle//Создаем веб сокет с созданными ранее генератором и потребителем данных.    WebSocketBuilder[F].build(toClient, fromClient)  }

Связываем наш сокет с роутом на сервере (ws://localhost:8080/chat)
def routeWs: HttpRoutes[F] = {    HttpRoutes.of[F] {      case GET -> Root / "chat" => logic()    }  }

Собственно на этом все. Дальше уже можно запускать сервер с этим роутом. Мне еще захотелось какую ни какую документацию сделать. Вообще для документирования WebSocket и прочего основанного на событиях взаимодействия вроде RabbitMQ AMPQ есть AsynAPI но под Tapir там нет ничего поэтому просто сделал для Swagger описание эндпойнта как GET запрос. Работать он конечно не будет. Точнее 501 ошибку будет возвращать зато будет отображаться в Swagger
  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint    .get    .in("chat")    .tag("WebSockets")    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")    .description("Подключает к общему чату")    .in(      stringBody        .description("Сообщение которое будет отправлено пользователям в чате")        .example("Привет!")    )    .out(      stringBody        .description("Сообщение которое кто-то написал в чат")        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")    )

В самом сваггере это выглядит вот так

Подключаем наш чат к нашему серверу API
    todosController = new TodosController()    imagesController = new ImagesController()//Создаем объект нашего чата    chatHub <- Resource.liftF(ChatHub[IO]())    endpoints = todosController.endpoints ::: imagesController.endpoints//Добавляем его эндпойнт в документацию Swagger    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")    yml: String = docs.toYaml//Добавляем его маршрут в список маршрутов приложения    routes = chatHub.routeWs <+>      endpoints.toRoutes <+>      new SwaggerHttp4s(yml, "swagger").routes[IO]    httpApp = Router(      "/" -> routes    ).orNotFound    blazeServer <- BlazeServerBuilder[IO](serverEc)      .bindHttp(settings.host.port, settings.host.host)      .withHttpApp(httpApp)      .resource

Подключаемся к чату крайне простым скриптом.
    <script>        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;        const webSocket = new WebSocket('ws://localhost:8080/chat');        webSocket.onopen = event => {            alert('onopen ');        };        webSocket.onmessage = event => {            console.log(event);            receive(event.data);        };        webSocket.onclose = event => {            alert('onclose ');        };        function send() {            let text = document.getElementById("message");            webSocket.send(`Сообщение от клиента с Id подключения ${id}: ${text.value}`);            text.value = '';        }        function receive(m) {            let text = document.getElementById("chat");            text.value = text.value + '\n\r' + m;        }    </script>

На этом собственно все. Надеюсь кому-то кто тоже изучает скала будет интересна эта статья а может даже полезна.
Источник: habr.com
К списку статей
Опубликовано: 30.08.2020 02:18:14
0

Сейчас читают

Комментариев (0)
Имя
Электронная почта

Разработка веб-сайтов

Системы обмена сообщениями

Scala

Ооп

Функциональное программирование

Functional programming

Websocket

Websockets

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru