Skip to content

rsocket/bomberman-workshop

Repository files navigation

Bomberman Workshop

Welcome to our Building Bomberman with RSocket workshop!The workshop is aimed at developers of microservices and distributed systems interested in getting to know RSocket-Java and RSocket-JS closely in practice, trying different features of these technologies, and understanding their use for building fast and fault-tolerant systems.

In this workshop, you will encounter RSocket-JS on the browser side and develop some of the logic related to interacting with remote players and an intermediate server.You will also create a clustering and sharding mechanism that allows players to connect in the same game space.You will have the opportunity to evaluate the possibility of changing transport to achieve more efficient interaction between the client-server and the server-server.

Who this workshop is for

The workshop is aimed at engineers who know the basics of Spring and Reactive Programming.

Before we start

  • Install Docker

  • Install flatbuffers 1.12

MacOS
brew install flatbuffers
Linux
# When using Ubuntu 20.10+, simply use
# sudo apt install -y flatbuffers-compiler
# otherwise,
wget https://launchpad.net/ubuntu/+source/flatbuffers/1.12.1~git20200711.33e2d80+dfsg1-0.3/+build/19612152/+files/libflatbuffers1_1.12.1~git20200711.33e2d80+dfsg1-0.3_amd64.deb
wget https://launchpad.net/ubuntu/+source/flatbuffers/1.12.1~git20200711.33e2d80+dfsg1-0.3/+build/19612152/+files/flatbuffers-compiler_1.12.1~git20200711.33e2d80+dfsg1-0.3_amd64.deb
dpkg -i libflatbuffers1_1.12.1~git20200711.33e2d80+dfsg1-0.3_amd64.deb
dpkg -i flatbuffers-compiler_1.12.1~git20200711.33e2d80+dfsg1-0.3_amd64.deb

Part 1. Intro

Let’s build a client-server game. Multiple users would connect to the server. Users can create rooms which other users can start. Once the room is full of people, the owner of the room can start the game! You can try yourself by opening https://bomberman.xyz/ and asking your friend to join and having a go at playing a game. If none of your friends have time to play with you, you can simply open a separate browser tab.

rooms

Once you’ve spent a few hours playing the game, we can jump to coding and try to build it ourselves!

You can start part one by checking out the initial branch.

git checkout initial
./gradlew build

Let’s start by looking at Rooms.jsx. The whole view of the rooms is a single function that defined the state - it’s the list of rooms that you can on the screen, the ID of the user that the server tells us, and the room that we’ve created'. You can also check the whole UI in a single react component at the bottom of the function.

To communicate with the remote server, we use API client which is created inside the side effect of the UI component. Once the initial state is displayed, we create the client and call login(). At the moment, the login function is not implemented. The login maps perfectly to the request response interaction model of RSocket, so let’s add this call:

this.rsocket.requestResponse({
  metadata: encodeCompositeMetadata([
    [MESSAGE_RSOCKET_ROUTING, encodeRoute('game.players.login')],
  ]),
  data: Buffer.from(userName)
}).subscribe({
  onComplete: (payload) => {
    resolve(payload.data.toString());
  },
  onError: e => reject(e)
})

Here, we employed composite metadata which is an RSocket extension that allows for packing various discrete metadata types within a single metadata payload. Let’s start the app and see what’s happening.

Note, that you can observe the client side changes much faster if you use yarn watch.

cd game-ui && yarn watch

Now, it’s the time to start the game and see if we managed to log in which you can either do from your IDE or simply run ./gradlew bootRun from your terminal. Opening the browser on http://localhost:9000/ reveals an error which immediately makes sense:

error

At this point, the endpoint - the handler for game.players.login is not yet implemented, so now we can jump the controller that would be responsible for players, and start the implementation.

@Controller
@MessageMapping("game.players")
@AllArgsConstructor
class PlayersController {

  final PlayersRepository playersRepository;

  @MessageMapping("login")
  public String login(@Payload String name, RSocketRequester requester) {
    // ...
  }
}

Once, we’re able to log in, we can continue implementing rooms inside the RoomsController. While the request-response interaction is quite straightforward, the part that stands out is listening to updates of the rooms. We’re building a game, and every game player expects to see each other’s actions as soon as possible, without having to send an explicit request to ask for changes!

To achieve this, on the client we send can request a stream of events, and listen to the incoming rooms:

rsocket.requestStream({
  metadata: encodeCompositeMetadata([
    [MESSAGE_RSOCKET_ROUTING, encodeRoute('game.rooms')],
  ]),
}).subscribe({
  onSubscribe(s) {
    s.request(2147483642)
  },
  onNext(eventBuf) {
    roomsHandler(extractRoom(eventBuf));
  },
  onError(err) {
    reject(err)
  },
  onComplete() {
    resolve()
  }
})

While on the server side, we mirror the implementation by listening to the events from the room repository.

@Controller
@MessageMapping("game.rooms")
@AllArgsConstructor
public class RoomsController {

  private final RoomsRepository roomsRepository;

  @MessageMapping("")
  public Flux<ByteBuffer> listAndListen() {
    return roomsRepository.listAndListen()
        .map(MessageMapper::mapToRoomEventBuffer);
  }
  // ...
}

Congratulations! Once you’ve finished implementing all the methods, you’ll be able to see users joining and leaving rooms.

rooms

Part 2. Game Service

Once every player joined the game, it’s time to start the game!

package xyz.bomberman.game;

public class Game {
  // ...
  public static void create(Set<xyz.bomberman.player.Player> players) { /* ??? */ }
  // ...
}

How could we notify players that the game has started?Asking each player to periodically ask the server is not an option, while some players are waiting, others would get an advantage since they’d be able to start playing early!Instead, our server can initiate the game by sending a requestChannel request to every client.Remember, with RSocket, the requester is not necessarily the client.

First, we could generate the initial game state and the outgoing streams for every player:

var initialGameStateAsBuffer = generateGameAsBuffer(players);

var playersOutboundsMap = players.stream()
    .collect(Collectors.toMap(Player::id,__ -> Sinks.many().multicast().directBestEffort()));

Then, for each player, we’ll invoke p.play() which underneath can use the requester rsocket to make a request channel call to game.play.

for (Player p : players) {
  var otherPlayersEvents = mergeInboundsExceptPlayer(playersOutboundsMap, p);
  var playerSink = playersOutboundsMap.get(p.id());
  p.play(otherPlayersEvents.startWith(initialGameStateAsBuffer))
      .subscribe(
          gameEvent -> playerSink.emitNext(gameEvent, RETRY_NON_SERIALIZED),
          e -> playerSink.emitError(e, RETRY_NON_SERIALIZED),
          () -> playerSink.emitComplete(RETRY_NON_SERIALIZED)
      );
}

However, not only we need to open the channels, we also need to broadcast the event from each player to every other player. To make this possible, the outgoing events would be a collection of every other player’s events which is encapsulated in mergeInboundsExceptPlayer.

Flux<ByteBuffer> mergeInboundsExceptPlayer(
    Map<String, Many<ByteBuffer>> playersOutboundsMap,
    Player p
) {
  return Flux.merge( // merge all streams
      playersOutboundsMap.entrySet() // get all players' channels
          .stream()
          .filter(e -> !e.getKey().equals(p.id())) // filter out ours
          .map(e -> e.getValue().asFlux()) // take the fluxes
          .collect(Collectors.toList()));
}

All that’s left before we can play a round of two of bomberman is to add handle the incoming requestChannel request on the client. When creating an RSocketClient, we can supply a responder which will be responding to the incoming requests:

const socketClient = new RSocketClient({
    setup: {}, // ...
    responder: responder,  // <-- the responder
    transport: {},  // ...
});

Once, the responder is set, and the client receives a stream of events, we can start playing!

connect({
  requestChannel(flowable) {
    const game = new Game("myCanvas", 13, 13, window.assets, userName);
    return game.start(flowable)
  }
})

game

Now, that the game is up and running, it can be deployed on a remote server and welcome real users who also want to play bomberman.However, once the game is deployed and is not running under a debugger on a local machine, it becomes much harder to understand what’s happening under the hood.At the very least, we want to immediately know when the rate of errors spikes so that the issues can be investigated.

Here is another area where RSocket shines.There are a number of ready-to-use metric libraries for RSocket that expose all the underlying metrics.But to intimately understand how they work, let’s implement one ourselves.

Let’s start from a customizer, the bean that can customize our RSocket server.

@Component
@AllArgsConstructor
public class MetricsRSocketServerCustomizer implements RSocketServerCustomizer {

  final MeterRegistry meterRegistry;

  @Override
  public void customize(RSocketServer rSocketServer) {
      //. ..
  }
}

RSocket provides a way to intercept connections and responders using interceptors which can be used for tracing, logging, debugging, etc.In our case, we’re creating once that can be used to collect and metrics.Once of the most important metrics is the rate of different frames flowing though RSocket, so a separate interceptor can collect all of this information:

rSocketServer.interceptors(registry -> {
  registry.forConnection(new MetricsConnectionInterceptor(meterRegistry));
});

Inside the newly created class, we define a counter which would be measuring the rate of the events, and wrap the connection into an inner InstrumentedRsConnection.

public class MetricsConnectionInterceptor implements DuplexConnectionInterceptor {
  public TaggedCounter frameCounter;

  public MetricsConnectionInterceptor(MeterRegistry registry) {
    this.frameCounter = new TaggedCounter("xyz.bomberman.frames", registry);
  }

  @Override
  public DuplexConnection apply(Type type, DuplexConnection connection) {
    return new InstrumentedRsConnection(type, connection);
  }

  private class InstrumentedRsConnection implements DuplexConnection {
    // ...
  }
}

The instrumented connection can mostly delegate methods to connection, however, two of the metrics are especially important, it’s sendFrame which we can use to record the outgoigng frames (out) and receive where we can track all of the incoming frames (in).

Outgoing frames
@Override
public void sendFrame(int streamId, ByteBuf frame) {
  recordFrame("out", frame, connectionType);
  connection.sendFrame(streamId, frame);
}
Incoming frames
@Override
public Flux<ByteBuf> receive() {
  return Flux.from(connection.receive()) //
      .doOnNext(frame -> recordFrame("in", frame, connectionType));
}

Now that we know how to get access to the internals, the rest is simple.We only need to report these frames to the metric collector.

private void recordFrame(String flux, ByteBuf frame, Type connectionType) {
  // reading the type of the frame
  var frameType = FrameHeaderCodec.frameType(frame).name();
  // and increment the counter
  frameCounter.increment(Tags.of("flux", flux, "connectionType", connectionType.name(), "frame", frameType));
}

These metrics give us the rate of all frames that flow through RSocket.Essentially, they give us a visibility into the transport layer.However, we are also interested in the application layer metrics that are mostly defined by the RSocket interaction models - requestResponse metrics and channel metrics.For instance, we could measure the latency of the request response, how long a single channel can stay alive for, the number of signals per channel, etc.

The simplest way to collect these metrics is to define a responder interceptor.

rSocketServer.interceptors(registry -> {
  registry.forResponder(new MetricsResponderInterceptor(meterRegistry));
});

The MetricsResponderInterceptor can define a few metrics to measure rate and latency.The RSocketProxy which wraps the instance of rsocket will help us to collect those.

public class MetricsResponderInterceptor implements RSocketInterceptor {
  // ...
  public MetricsResponderInterceptor(MeterRegistry registry) {
    this.signalCounter = new TaggedCounter("xyz.bomberman.signals", registry);
    this.channelTimer = registry.timer("xyz.bomberman.channel.timer");
    this.reqResTimer = registry.timer("xyz.bomberman.reqres.timer");
  }

  @Override
  public RSocket apply(RSocket rSocket) {
    return new RSocketProxy(rSocket) { /* ... */ };
  }
}

Now, the rsocket methods that we are interested in can be instrumented.For example, we can measure the latency of requestResponse:

@Override
public Mono<Payload> requestResponse(Payload payload) {
  var timer = Timer.start();
  return super.requestResponse(payload)
      .doFinally(signalType -> timer.stop(reqResTimer));
}

And the lifetime of the channels, as well as the number of signals flowing in each direction

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
  return recordSignal("out", () -> { //
    return super.requestChannel(recordSignal("in", () -> Flux.from(payloads)));
  });
}

private Flux<Payload> recordSignal(String flux, Supplier<Flux<Payload>> signalProvider) {
  var timer = Timer.start();
  return signalProvider.get().doFinally(signalType -> {
    signalCounter.increment(Tags.of("flux", flux, "signal", signalType.name()));
    timer.stop(channelTimer);
  });
}

Using these techniques and various interceptors, we can get a full view of an RSocket-based application which is an essential part of making it production-ready.

Part 3. The Mesh

How cool would it be if our servers can connect to each creating a truly distributed game mesh? With RSocket, these dreams can become a reality! However, to connect to each other, first two servers need to learn about, to discover, each other. The solution for this problem can be a discovery service, which every game server can connect to, announce its presence, and listen to the updates from every other server.

This application could be a plain java app listening on a port. Thanks to RSocket, it could also use any possible transport whether it’s TCP or WebSocket. For this example, let’s use WebSockets.

public class DiscoveryApplication {
  public static void main(String[] args) {
    var port = Objects.requireNonNull(System.getenv("PORT"), "8081");
    var server = RSocketServer.create()
        .payloadDecoder(PayloadDecoder.ZERO_COPY)
        .acceptor(new DiscoverySocketAcceptor(new ServiceRegistry()))
        .bindNow(WebsocketServerTransport.create(Integer.parseInt(port)));

    System.out.println("started on " + server.address());
    server.onClose().block();
  }
}

The accepter that accepts the incoming requests needs to store the information about the connecting clients, in particular we need to store their sendingRSocket so that we can proxy the connections. Once the client disconnects for whatever reason, we need to remember to deregister.

class DiscoverySocketAcceptor implements SocketAcceptor {
  @Override
  public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
    var flatServiceInfo = ServiceInfo.getRootAsServiceInfo(setup.getData());
    var serviceInfo = new ServiceInfo(flatServiceInfo.id(), flatServiceInfo.uri(), sendingSocket);

    return Mono.just(new DiscoveryRSocketHandler(serviceRegistry, serviceInfo))
        .doAfterTerminate(() -> {
          serviceRegistry.register(serviceInfo);
          sendingSocket.onClose()
              .doFinally(__ -> serviceRegistry.unregister(serviceInfo))
              .subscribe();
        });
  }
}

Now all that’s left is to implement the handler of the incoming requests. For each incoming request, we’ll need to understand the destination, and simply proxy the request to the desired destination. For example, let’s take a look at the requestResponse interaction type.

class DiscoveryRSocketHandler implements RSocket {
  // ...
  @Override
  public Mono<Payload> requestResponse(Payload payload) {
    // First, extract the destination
    var destinationIdByteBuf = CompositeMetadataUtils
        .extract(payload.metadata(), DESTINATION_ID_MIMETYPE);

    if (destinationIdByteBuf != null) {
      // Now, find the destination service from our service registry
      var destinationId = destinationIdByteBuf.toString(CharsetUtil.UTF_8);
      var destinationServiceInfo = serviceRegistry.find(destinationId);

      // Once, the destination is found, retrieve its requester and proxy the request
      if (destinationServiceInfo != null) {
        return destinationServiceInfo.getRequester().requestResponse(payload);
      }

      // If not found, handle the errors
      payload.release();
      return Mono
          .error(new IllegalStateException("Service [" + destinationId + "] is not registered"));
    }
    payload.release();
    return Mono
        .error(new IllegalStateException("Destination Id is missed"));
  }
  // ...
}

Now that Discovery Service is ready to send out events, our game servers need to listen to them. Let’s start listening by creating a separate spring bean which is also a subscriber.

@Service
public class DiscoveryService extends BaseSubscriber<DataBuffer> implements DisposableBean {
  // ...
  public DiscoveryService(
      requester.Builder requesterBuilder,
      RSocketStrategies strategies,
      RoomsRepository roomsRepository,
      PlayersRepository playersRepository,
      @Value("${server.port}") int port
  ) {
    // ...
    this.requester = requesterBuilder
        // extracting metadata to make it injectable inside controllers them with @Header
        .rsocketStrategies(b -> b.metadataExtractorRegistry(mer -> {
              mer.metadataToExtract(DESTINATION_ID_MIMETYPE, String.class,
                  DESTINATION_ID_MIMETYPE.toString());
              mer.metadataToExtract(PLAYER_ID_MIMETYPE, String.class,
                  PLAYER_ID_MIMETYPE.toString());
            }
        ))
        // now, specify the destination - the set of controllers
        .rsocketConnector(
            connector -> connector.acceptor(RSocketMessageHandler
                .responder(strategies,
                    new RemoteRoomsController(roomsRepository, playersRepository),
                    new RemotePlayersController(playersRepository),
                    new RemotePlayerController(playersRepository)))
                .reconnect(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(15))))
        // talking binary
        .dataMimeType(MediaType.APPLICATION_OCTET_STREAM)
        // how we anounce ourselves
        .setupData(buildServiceInfo())
        // and finally the URL of the discovery service. Can either be shared or localhost
        .websocket(URI.create("ws://discovery.bomberman.xyz"));
    this.requester.route("discovery.services")
        .retrieveFlux(DataBuffer.class)
        .subscribe(this);
  }

  @Override
  protected void hookOnNext(DataBuffer rawServiceEvent) {
    var serviceEvent = ServiceEvent.getRootAsServiceEvent(rawServiceEvent.asByteBuffer());
    if (serviceEvent.type() == EventType.Connected) {
      var session = serviceEvent.serviceInfo();
      // handle new service
      new RemotePlayersListener(requester, session.id(), playersRepository);
      new RemoteRoomsListener(requester, session.id(), playersRepository, roomsRepository);
    }
  }
  // ...
}

Now, we can start listening to the incoming players.

public class RemotePlayersListener extends BaseSubscriber<DataBuffer> {

  final RSocketRequester rSocketRequester;
  final String serviceId;
  final PlayersRepository playersRepository;
  final RemotePlayerClient remotePlayerClient;
  final HashMap<String, RemotePlayer> remotePlayers = new HashMap<>();

  public RemotePlayersListener(
      RSocketRequester rSocketRequester,
      String serviceId,
      PlayersRepository playersRepository) {
    // ...
    rSocketRequester.route("game.players")
        .metadata(serviceId, DESTINATION_ID_MIMETYPE)
        .retrieveFlux(DataBuffer.class)
        .subscribe(this);
  }

  @Override
  protected void hookOnNext(DataBuffer rawEvent) {
    var playerEvent = PlayerEvent.getRootAsPlayerEvent(rawEvent.asByteBuffer());

    // register the player when it's connected
    if (playerEvent.eventType() == Connected) {
      var player = playerEvent.event(new Player());
      var remotePlayer = new RemotePlayer(player.id(), player.name(), remotePlayerClient);

      remotePlayers.put(remotePlayer.id, remotePlayer);
      playersRepository.register(remotePlayer);
    // deregister the player when it's disconnected
    } else if (playerEvent.eventType() == Disconnected) {
      var playerId = playerEvent.event(new PlayerId());
      var id = playerId.id();

      remotePlayers.remove(id);
      playersRepository.disconnect(id);
    }
    DataBufferUtils.release(rawEvent);
  }
}

Game service can also listen to the new rooms in the same way.

public class RemoteRoomsListener extends BaseSubscriber<DataBuffer> {

  final String serviceId;
  final RemoteRoomClient remoteRoomClient;

  final PlayersRepository playersRepository;
  final RoomsRepository roomsRepository;

  final HashMap<String, RemoteRoom> remoteRooms;

  public RemoteRoomsListener(
      RSocketRequester rSocketRequester,
      String serviceId,
      PlayersRepository playersRepository,
      RoomsRepository roomsRepository
  ) {
    // ...
    rSocketRequester.route("game.rooms")
        .metadata(serviceId, DESTINATION_ID_MIMETYPE)
        .retrieveFlux(DataBuffer.class)
        .subscribe(this);
  }

  public void hookOnNext(DataBuffer rawEvent) {
    roomEvent = RoomEvent.getRootAsRoomEvent(rawEvent.asByteBuffer());
    if (roomEvent.type() == Added) {
      // The room was created, need to let everyone know, find the owner and the players
      // and create a new room
      final String roomId = roomEvent.id();
      final Player owner = playersRepository.find(roomEvent.owner().id());
      final Set<Player> players = new HashSet<>();
      for (int i = 0; i < roomEvent.playersLength(); i++) {
        final Player player = playersRepository.find(roomEvent.players(i).id());
        players.add(player);
      }

      remoteRoom = new RemoteRoom(roomId, owner, players, remoteRoomClient);

      remoteRooms.put(roomId, remoteRoom);
      roomsRepository.add(remoteRoom);
    } else if (roomEvent.type() == Updated) {
      // The room was updated, find the owner and the players in the same way and
      // update it.
      // ...
      remoteRoom = new RemoteRoom(roomId, owner, players, remoteRoomClient);

      remoteRooms.replace(roomId, remoteRoom);
      roomsRepository.update(remoteRoom);
    } else if (roomEvent.type() == EventType.Removed) {
      // the room was removed.
      final String roomId = roomEvent.id();
      remoteRooms.remove(roomId);
      roomsRepository.remove(roomId);
    }
    DataBufferUtils.release(rawEvent);
  }

  @Override
  protected void hookFinally(SignalType type) {
    remoteRooms.keySet().forEach(roomsRepository::remove);
  }
}

Now that the discovery service is up and running, you and your friends can launch a local server, and start playing even if you’re behind NAT. Bomberman game is a server mesh. Happy Playing!