Skip to content

Commit

Permalink
fix(shulker-proxy-agent): use a dedicated thread pool for redis pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremylvln committed Nov 2, 2023
1 parent 74d4067 commit afb575d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import io.shulkermc.proxyagent.adapters.kubernetes.ImplKubernetesGatewayAdapter
import io.shulkermc.proxyagent.adapters.kubernetes.KubernetesGatewayAdapter
import io.shulkermc.proxyagent.adapters.mojang.HttpMojangGatewayAdapter
import io.shulkermc.proxyagent.adapters.mojang.MojangGatewayAdapter
import io.shulkermc.proxyagent.adapters.pubsub.PubSubAdapter
import io.shulkermc.proxyagent.adapters.pubsub.RedisPubSubAdapter
import io.shulkermc.proxyagent.api.ShulkerProxyAPI
import io.shulkermc.proxyagent.api.ShulkerProxyAPIImpl
import io.shulkermc.proxyagent.handlers.TeleportPlayerOnServerHandler
import io.shulkermc.proxyagent.services.PlayerMovementService
import io.shulkermc.proxyagent.services.ProxyLifecycleService
import io.shulkermc.proxyagent.services.ServerDirectoryService
Expand All @@ -33,7 +33,7 @@ class ShulkerProxyAgentCommon(val proxyInterface: ProxyInterface, val logger: Lo
lateinit var fileSystem: FileSystemAdapter
lateinit var mojangGateway: MojangGatewayAdapter
lateinit var cache: CacheAdapter
lateinit var pubSub: PubSubAdapter
lateinit var pubSub: RedisPubSubAdapter

// Services
lateinit var serverDirectoryService: ServerDirectoryService
Expand Down Expand Up @@ -69,7 +69,7 @@ class ShulkerProxyAgentCommon(val proxyInterface: ProxyInterface, val logger: Lo
this.playerMovementService = PlayerMovementService(this)
this.proxyLifecycleService = ProxyLifecycleService(this)

// this.pubSub.onTeleportPlayerOnServer(TeleportPlayerOnServerHandler(this)::handle)
this.pubSub.onTeleportPlayerOnServer(TeleportPlayerOnServerHandler(this)::handle)

this.healthcheckTask = HealthcheckTask(this).schedule()
this.lostProxyPurgeTask = LostProxyPurgeTask(this).schedule()
Expand Down Expand Up @@ -104,6 +104,7 @@ class ShulkerProxyAgentCommon(val proxyInterface: ProxyInterface, val logger: Lo
fun shutdown() {
try {
this.cache.unregisterProxy(Configuration.PROXY_NAME)
this.pubSub.close()
this.agonesGateway.askShutdown()
} catch (ex: Exception) {
this.logger.severe("Failed to ask Agones sidecar to shutdown properly, stopping process manually")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,34 @@ package io.shulkermc.proxyagent.adapters.pubsub

import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPubSub
import java.util.concurrent.Executors

class RedisPubSubAdapter(private val jedisPool: JedisPool) : PubSubAdapter, AutoCloseable {
private val executor = Executors.newCachedThreadPool()

override fun close() {
this.executor.shutdownNow()
}

class RedisPubSubAdapter(private val jedisPool: JedisPool) : PubSubAdapter {
override fun teleportPlayerOnServer(playerId: String, serverName: String) {
this.jedisPool.resource.use { jedis ->
jedis.publish("shulker:teleport", "$playerId:$serverName")
}
}

override fun onTeleportPlayerOnServer(callback: (playerId: String, serverName: String) -> Unit) {
this.jedisPool.resource.use { jedis ->
jedis.subscribe(
object : JedisPubSub() {
override fun onMessage(channel: String, message: String) {
val (playerId, serverName) = message.split(":")
callback(playerId, serverName)
}
},
"shulker:teleport"
)
this.executor.submit {
this.jedisPool.resource.use { jedis ->
jedis.subscribe(
object : JedisPubSub() {
override fun onMessage(channel: String, message: String) {
val (playerId, serverName) = message.split(":")
callback(playerId, serverName)
}
},
"shulker:teleport"
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ object ListCommandHandler {
serverNames.mapIndexed { index, serverName ->
val boxCharacter = if (index == serverNames.size - 1) "" else ""
val playerNames = agent.cache.getPlayerNamesFromIds(agent.cache.listPlayersInServer(serverName)).values
val playerNamesJoined = playerNames
.sortedBy { it.lowercase() }
.joinToString(", ") { it }

source.sendMessage(
Component.text("$boxCharacter ")
.color(NamedTextColor.DARK_GRAY)
.append(Component.text(serverName).color(NamedTextColor.YELLOW))
.append(Component.text(" (${playerNames.size})").color(NamedTextColor.YELLOW))
.append(Component.text(": ").color(NamedTextColor.WHITE))
.append(Component.text(playerNames).color(NamedTextColor.WHITE))
.append(Component.text(playerNamesJoined).color(NamedTextColor.WHITE))
)
}
}
Expand Down

0 comments on commit afb575d

Please sign in to comment.