Skip to content

Commit

Permalink
fix(express): global processing queue is not completely processed at …
Browse files Browse the repository at this point in the history
…startup (#15)

update(express): use launcher instead of threadpool
  • Loading branch information
ShindouMihou authored Mar 31, 2023
1 parent c125075 commit b9992cc
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions src/main/java/pw/mihou/nexus/express/core/NexusExpressCore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import org.javacord.api.DiscordApi
import org.javacord.api.entity.server.Server
import pw.mihou.nexus.Nexus
import pw.mihou.nexus.core.exceptions.NexusFailedActionException
import pw.mihou.nexus.core.threadpool.NexusThreadPool
import pw.mihou.nexus.express.NexusExpress
import pw.mihou.nexus.express.event.NexusExpressEvent
import pw.mihou.nexus.express.event.core.NexusExpressEventCore
Expand All @@ -27,14 +26,14 @@ internal class NexusExpressCore: NexusExpress {
private val localQueue: MutableMap<Int, BlockingQueue<NexusExpressEvent>> = ConcurrentHashMap()

fun ready(shard: DiscordApi) {
NexusThreadPool.executorService.submit {
Nexus.launcher.launch {
val local = localQueue(shard.currentShard)
while (!local.isEmpty()) {
try {
val event = local.poll()

if (event != null) {
NexusThreadPool.executorService.submit {
Nexus.launcher.launch {
(event as NexusExpressEventCore).process(shard)
}
}
Expand All @@ -51,7 +50,7 @@ internal class NexusExpressCore: NexusExpress {
if (!predicate.test(shard)) continue
val (_, event) = predicateQueue.poll()

NexusThreadPool.executorService.submit {
Nexus.launcher.launch {
(event as NexusExpressEventCore).process(shard)
}
} catch (exception: Exception) {
Expand All @@ -61,18 +60,20 @@ internal class NexusExpressCore: NexusExpress {
}
}

NexusThreadPool.executorService.submit {
Nexus.launcher.launch {
globalQueueProcessingLock.withLock {
try {
val event = globalQueue.poll()
while(globalQueue.isNotEmpty()) {
try {
val event = globalQueue.poll()

if (event != null) {
NexusThreadPool.executorService.submit {
(event as NexusExpressEventCore).process(shard)
if (event != null) {
Nexus.launcher.launch {
(event as NexusExpressEventCore).process(shard)
}
}
} catch (exception: Exception) {
Nexus.logger.error("An uncaught exception was caught from Nexus Express Way.", exception)
}
} catch (exception: Exception) {
Nexus.logger.error("An uncaught exception was caught from Nexus Express Way.", exception)
}
}
}
Expand All @@ -90,7 +91,7 @@ internal class NexusExpressCore: NexusExpress {

val maximumTimeout = Nexus.configuration.express.maximumTimeout
if (!maximumTimeout.isZero && !maximumTimeout.isNegative) {
NexusThreadPool.schedule({
Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) {
expressEvent.`do` {
if (status() == NexusExpressEventStatus.WAITING) {
val removed = localQueue(shard).remove(this)
Expand All @@ -105,10 +106,10 @@ internal class NexusExpressCore: NexusExpress {
expire()
}
}
}, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS)
}
}
} else {
NexusThreadPool.executorService.submit { expressEvent.process(Nexus.sharding[shard]!!) }
Nexus.launcher.launch { expressEvent.process(Nexus.sharding[shard]!!) }
}

return expressEvent
Expand All @@ -124,25 +125,25 @@ internal class NexusExpressCore: NexusExpress {

val maximumTimeout = Nexus.configuration.express.maximumTimeout
if (!maximumTimeout.isZero && !maximumTimeout.isNegative) {
NexusThreadPool.schedule({
Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) {
expressEvent.`do` {
if (status() == NexusExpressEventStatus.WAITING) {
val removed = predicateQueue.remove(pair)
if (Nexus.configuration.express.showExpiredWarnings) {
Nexus.logger.warn(
"An express request that was specified " +
"for a predicate has expired after ${maximumTimeout.toMillis()} milliseconds " +
"without any matching shard connecting with Nexus. [acknowledged=$removed]"
"An express request that was specified " +
"for a predicate has expired after ${maximumTimeout.toMillis()} milliseconds " +
"without any matching shard connecting with Nexus. [acknowledged=$removed]"
)
}

expire()
}
}
}, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS)
}
}
} else {
NexusThreadPool.executorService.submit { expressEvent.process(shard) }
Nexus.launcher.launch { expressEvent.process(shard) }
}

return expressEvent
Expand All @@ -156,7 +157,7 @@ internal class NexusExpressCore: NexusExpress {

val maximumTimeout = Nexus.configuration.express.maximumTimeout
if (!maximumTimeout.isZero && !maximumTimeout.isNegative) {
NexusThreadPool.schedule({
Nexus.launch.scheduler.launch(maximumTimeout.toMillis()) {
expressEvent.`do` {
if (status() == NexusExpressEventStatus.WAITING) {
val removed = globalQueue.remove(this)
Expand All @@ -171,10 +172,10 @@ internal class NexusExpressCore: NexusExpress {
expire()
}
}
}, maximumTimeout.toMillis(), TimeUnit.MILLISECONDS)
}
}
} else {
NexusThreadPool.executorService.submit { expressEvent.process(Nexus.sharding.collection().first()) }
Nexus.launcher.launch { expressEvent.process(Nexus.sharding.collection().first()) }
}

return expressEvent
Expand Down Expand Up @@ -238,7 +239,7 @@ internal class NexusExpressCore: NexusExpress {

override fun broadcast(event: Consumer<DiscordApi>) {
Nexus.sharding.collection().forEach { shard ->
NexusThreadPool.executorService.submit {
Nexus.launcher.launch {
try {
event.accept(shard)
} catch (exception: Exception) {
Expand Down

0 comments on commit b9992cc

Please sign in to comment.