From 3429097d88af2fc3422f60f11df0326cdf74a54b Mon Sep 17 00:00:00 2001 From: Xerus <27jf@web.de> Date: Wed, 6 Feb 2019 11:07:09 +0100 Subject: [PATCH 1/2] Improve Lobby & ClientManager --- server/src/sc/server/Lobby.kt | 7 +- .../src/sc/server/network/ClientManager.java | 160 ------------------ server/src/sc/server/network/ClientManager.kt | 137 +++++++++++++++ .../sc/server/network/NewClientListener.java | 2 - 4 files changed, 141 insertions(+), 165 deletions(-) delete mode 100644 server/src/sc/server/network/ClientManager.java create mode 100644 server/src/sc/server/network/ClientManager.kt diff --git a/server/src/sc/server/Lobby.kt b/server/src/sc/server/Lobby.kt index 16f68abd8..688a3763b 100644 --- a/server/src/sc/server/Lobby.kt +++ b/server/src/sc/server/Lobby.kt @@ -25,8 +25,10 @@ import java.io.IOException class Lobby : IClientListener { private val logger = LoggerFactory.getLogger(Lobby::class.java) - val gameManager: GameRoomManager = GameRoomManager() - val clientManager: ClientManager = ClientManager(this) + val gameManager = GameRoomManager() + val clientManager = ClientManager().also { + it.setOnClientConnected(this::onClientConnected) + } /** * Starts the ClientManager in it's own daemon thread. This method should be used only once. @@ -63,7 +65,6 @@ class Lobby : IClientListener { is JoinPreparedRoomRequest -> ReservationManager.redeemReservationCode(source, packet.reservationCode) is JoinRoomRequest -> { val gameRoomMessage = this.gameManager.joinOrCreateGame(source, packet.gameType) - // null is returned if join was unsuccessful if (gameRoomMessage != null) { for (admin in clientManager.clients) { if (admin.isAdministrator) { diff --git a/server/src/sc/server/network/ClientManager.java b/server/src/sc/server/network/ClientManager.java deleted file mode 100644 index dd95cc41c..000000000 --- a/server/src/sc/server/network/ClientManager.java +++ /dev/null @@ -1,160 +0,0 @@ -package sc.server.network; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sc.api.plugins.exceptions.RescuableClientException; -import sc.protocol.responses.ProtocolErrorMessage; -import sc.server.Lobby; -import sc.server.ServiceManager; - -import java.io.IOException; -import java.util.*; - -/** The ClientManager serves as a lookup table for all active connections. */ -public class ClientManager implements Runnable, IClientListener { - private static final Logger logger = LoggerFactory.getLogger(ClientManager.class); - - // Lobby which we are connected to - private Lobby lobby; - private boolean running; - private Thread thread; - - // List of all XStreamClients - protected final List clients; - - // Listener waits for new clients to connect - private final NewClientListener clientListener; - - /** - * Create manager from {@link Lobby lobby} - * - * @param lobby from which the manager is created - */ - public ClientManager(Lobby lobby) { - this.clientListener = new NewClientListener(); - this.lobby = lobby; - this.clients = new ArrayList<>(); - this.running = false; - this.thread = null; - } - - /** - * Adds the given newClient and notifies all listeners by - * invoking onClientConnected.
- * (only used by tests and addAll()) - */ - public void add(Client newClient) { - this.clients.add(newClient); - newClient.addClientListener(this); - this.lobby.onClientConnected(newClient); - } - - /** Used for testing */ - public List getClients() { - return this.clients; - } - - /** Fetch new clients */ - @Override - public void run() { - this.running = true; - - logger.info("ClientManager running."); - - while (this.running && !Thread.interrupted()) { - try { - // Waits blocking for new Client - Client client = this.clientListener.fetchNewSingleClient(); - - logger.info("Delegating new client to ClientManager..."); - this.add(client); - logger.info("Delegation done."); - } catch (InterruptedException e) { - if (this.running) { - logger.error("Interrupted while waiting for a new client.", e); - } else { - logger.error("Client manager is shutting down"); - } - // TODO should it be handled? - } - - } - - this.running = false; - logger.info("ClientManager closed."); - } - - /** - * Starts the ClientManager in it's own daemon thread. This method should be used only once. - * clientListener starts SocketListener on defined port to watch for new connecting clients - */ - public void start() throws IOException { - this.clientListener.start(); - if (this.thread == null) { - this.thread = ServiceManager.createService(this.getClass().getSimpleName(), this); - this.thread.start(); - } - } - - /** - * Set the {@link Lobby lobby}. - * - * @param lobby to be set - */ - public void setLobby(Lobby lobby) { - this.lobby = lobby; - } - - public void close() { - this.running = false; - - if (this.thread != null) { - this.thread.interrupt(); - } - - this.clientListener.close(); - - for (int i = 0; i < this.clients.size(); i++) { - Client client = this.clients.get(i); - client.stop(); - } - } - - /** - * On client disconnect remove it from the list - * - * @param source client which disconnected - */ - @Override - public void onClientDisconnected(Client source) { - logger.info("Removing client {} from client manager", source); - clients.remove(source); - } - - /** - * Do nothing on error - * - * @param source client, which rose the error - * @param packet which contains the error - */ - @Override - public void onError(Client source, ProtocolErrorMessage packet) { - // TODO Error handling needs to happen - } - - /** - * Ignore any request - * - * @param source client, which send the package - * @param packet to be handled - * - * @throws RescuableClientException never - */ - @Override - public void onRequest(Client source, PacketCallback packet) - throws RescuableClientException { - // XXX Handle Request? - - } - -} diff --git a/server/src/sc/server/network/ClientManager.kt b/server/src/sc/server/network/ClientManager.kt new file mode 100644 index 000000000..cb44c0c4c --- /dev/null +++ b/server/src/sc/server/network/ClientManager.kt @@ -0,0 +1,137 @@ +package sc.server.network + +import org.slf4j.LoggerFactory +import sc.api.plugins.exceptions.RescuableClientException +import sc.protocol.responses.ProtocolErrorMessage +import sc.server.ServiceManager +import java.io.IOException +import java.util.* + +/** The ClientManager serves as a lookup table for all active connections. */ +class ClientManager : Runnable, IClientListener { + + /** List of all XStreamClients */ + val clients = ArrayList() + + /** Listener waits for new clients to connect */ + private val clientListener = NewClientListener() + + private var running: Boolean = false + private var thread: Thread? = null + + private var onClientConnected: ((Client) -> Unit)? = null + + init { + this.running = false + this.thread = null + } + + /** + * Adds the given `newClient` and notifies all listeners by + * invoking `onClientConnected`.

+ * *(only used by tests and addAll())* + */ + fun add(newClient: Client) { + this.clients.add(newClient) + newClient.addClientListener(this) + onClientConnected?.invoke(newClient) + } + + fun setOnClientConnected(consumer: (Client) -> Unit) { + onClientConnected = consumer + } + + /** Fetch new clients */ + override fun run() { + this.running = true + + logger.info("ClientManager running") + + while(this.running && !Thread.interrupted()) { + try { + // Waits blocking for new Client + val client = this.clientListener.fetchNewSingleClient() + + logger.info("Delegating new client to ClientManager...") + this.add(client) + logger.info("Delegation done") + } catch(e: InterruptedException) { + if(this.running) { + logger.warn("Interrupted while waiting for a new client", e) + } else { + logger.warn("Client manager is shutting down") + } + } + } + + this.running = false + logger.info("ClientManager closed") + } + + /** + * Starts the ClientManager in it's own daemon thread. This method should be used only once. + * clientListener starts SocketListener on defined port to watch for new connecting clients + */ + @Throws(IOException::class) + fun start() { + this.clientListener.start() + if(this.thread == null) { + this.thread = ServiceManager.createService(this.javaClass.simpleName, this) + this.thread!!.start() + } + } + + fun close() { + this.running = false + + if(this.thread != null) { + this.thread!!.interrupt() + } + + this.clientListener.close() + + for(i in this.clients.indices) { + val client = this.clients[i] + client.stop() + } + } + + /** + * On client disconnect remove it from the list + * + * @param source client which disconnected + */ + override fun onClientDisconnected(source: Client) { + logger.info("Removing client {} from client manager", source) + clients.remove(source) + } + + /** + * Do nothing on error + * + * @param source client, which rose the error + * @param packet which contains the error + */ + override fun onError(source: Client, packet: ProtocolErrorMessage) { + // TODO Error handling needs to happen + } + + /** + * Ignore any request + * + * @param source client, which send the package + * @param packet to be handled + * + * @throws RescuableClientException never + */ + @Throws(RescuableClientException::class) + override fun onRequest(source: Client, packet: PacketCallback) { + // XXX Handle Request? + + } + + companion object { + private val logger = LoggerFactory.getLogger(ClientManager::class.java) + } + +} diff --git a/server/src/sc/server/network/NewClientListener.java b/server/src/sc/server/network/NewClientListener.java index 762f881a3..4a24b9a32 100644 --- a/server/src/sc/server/network/NewClientListener.java +++ b/server/src/sc/server/network/NewClientListener.java @@ -83,8 +83,6 @@ public void run() { /** * Start the listener and create a daemon thread from this object - * - * @throws IOException */ public void start() throws IOException { startSocketListener(); From 9fee3b44d91e20b7e61428f014a30defc262791e Mon Sep 17 00:00:00 2001 From: Xerus <27jf@web.de> Date: Wed, 20 Feb 2019 19:34:39 +0100 Subject: [PATCH 2/2] Create actor --- server/build.gradle.kts | 1 + server/src/sc/server/GameRoomActor.kt | 114 ++++++++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 server/src/sc/server/GameRoomActor.kt diff --git a/server/build.gradle.kts b/server/build.gradle.kts index 4fb5cd46d..38fe41b99 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -17,6 +17,7 @@ application { dependencies { implementation(project(":sdk")) + implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.1.1") testImplementation("junit", "junit", "4.12") } diff --git a/server/src/sc/server/GameRoomActor.kt b/server/src/sc/server/GameRoomActor.kt new file mode 100644 index 000000000..6781a7284 --- /dev/null +++ b/server/src/sc/server/GameRoomActor.kt @@ -0,0 +1,114 @@ +package sc.server + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.actor +import sc.api.plugins.exceptions.RescuableClientException +import sc.protocol.requests.* +import sc.protocol.responses.PlayerScorePacket +import sc.protocol.responses.ProtocolMessage +import sc.protocol.responses.RoomPacket +import sc.protocol.responses.TestModeMessage +import sc.server.gaming.ReservationManager +import kotlin.coroutines.CoroutineContext + +fun CoroutineScope.createLobby(output: SendChannel, context: CoroutineContext = this.coroutineContext): + SendChannel = actor(context = context, capacity = Channel.UNLIMITED, start = CoroutineStart.LAZY) { + LobbyActor(this, channel, output).start() +} + + +/** + * RPC Actor implementation. + * + * @param scope the scope for this actor to act in. + * @param input the actor's input channel. + * @param output the actor's output channel. + */ +private class LobbyActor( + private val scope: CoroutineScope, + private val input: ReceiveChannel, + private val output: SendChannel) { + + /** + * Start the actor. + */ + suspend fun start() { + for (m in input) onReceive(m) + } + + private suspend fun onReceive(request: ILobbyRequest) { + when (request) { + is JoinPreparedRoomRequest -> ReservationManager.redeemReservationCode(source, request.reservationCode) + is JoinRoomRequest -> { + val gameRoomMessage = this.gameManager.joinOrCreateGame(source, request.gameType) + if (gameRoomMessage != null) { + for (admin in clientManager.clients) { + if (admin.isAdministrator) { + admin.send(gameRoomMessage) + } + } + } + } + is AuthenticateRequest -> source.authenticate(request.password) + is PrepareGameRequest -> if (source.isAdministrator) { + source.send(this.gameManager.prepareGame(request)) + } + is FreeReservationRequest -> if (source.isAdministrator) { + ReservationManager.freeReservation(request.reservation) + } + is RoomPacket -> { + // i.e. new move + val room = this.gameManager.findRoom(request.roomId) + room.onEvent(source, request.data) + } + is ObservationRequest -> if (source.isAdministrator) { + val room = this.gameManager.findRoom(request.roomId) + room.addObserver(source) + } + is PauseGameRequest -> if (source.isAdministrator) { + try { + val room = this.gameManager.findRoom(request.roomId) + room.pause(request.pause) + } catch (e: RescuableClientException) { + this.logger.error("Got exception on pause: {}", e) + } + + } + is ControlTimeoutRequest -> if (source.isAdministrator) { + val room = this.gameManager.findRoom(request.roomId) + val slot = room.slots[request.slot] + slot.role.player.isCanTimeout = request.activate + + } + is StepRequest -> // It is not checked whether there is a prior pending StepRequest + if (source.isAdministrator) { + val room = this.gameManager.findRoom(request.roomId) + room.step(request.forced) + } + is CancelRequest -> if (source.isAdministrator) { + val room = this.gameManager.findRoom(request.roomId) + room.cancel() + // TODO check whether all clients receive game over message + this.gameManager.games.remove(room) + } + is TestModeRequest -> if (source.isAdministrator) { + val testMode = request.testMode + logger.info("Test mode is set to {}", testMode) + Configuration.set(Configuration.TEST_MODE, java.lang.Boolean.toString(testMode)) + source.send(TestModeMessage(testMode)) + } + is GetScoreForPlayerRequest -> if (source.isAdministrator) { + val displayName = request.displayName + val score = getScoreOfPlayer(displayName) + ?: throw IllegalArgumentException("Score for \"$displayName\" could not be found!") + logger.debug("Sending score of player \"{}\"", displayName) + source.send(PlayerScorePacket(score)) + } + else -> throw RescuableClientException("Unhandled Packet of type: " + request.javaClass) + } + } +} \ No newline at end of file