From 76dd8ba192b2ed8293a868429634635aafe10c53 Mon Sep 17 00:00:00 2001 From: Gary Tierney Date: Tue, 19 Sep 2017 01:52:10 +0100 Subject: [PATCH] Refactor asynchronous actions to avoid thread usage Adds custom coroutines specifically for use within Actions. This implements all features of the previous system, however, does not rely on a separate executor to wake up continuations. Note: since Actions are still ran by an executor and a small chance of overlap exists the ActionCoroutine implementation uses atomic counters and references. --- .../consumables/src/consumables.plugin.kts | 3 +- game/plugin/dummy/src/dummy.plugin.kts | 7 +- .../skills/mining/src/mining.plugin.kts | 11 +- .../org/apollo/game/action/ActionCoroutine.kt | 139 ++++++++++++++++++ .../org/apollo/game/action/AsyncAction.kt | 18 +-- .../apollo/game/action/AsyncActionRunner.kt | 69 --------- .../apollo/game/action/AsyncActionTrait.kt | 26 +++- .../game/action/AsyncDistancedAction.kt | 30 +--- .../apollo/game/action/ActionCoroutineTest.kt | 39 +++++ 9 files changed, 220 insertions(+), 122 deletions(-) create mode 100644 game/src/main/kotlin/org/apollo/game/action/ActionCoroutine.kt delete mode 100644 game/src/main/kotlin/org/apollo/game/action/AsyncActionRunner.kt create mode 100644 game/src/test/kotlin/org/apollo/game/action/ActionCoroutineTest.kt diff --git a/game/plugin/consumables/src/consumables.plugin.kts b/game/plugin/consumables/src/consumables.plugin.kts index 4624be01..c633e900 100644 --- a/game/plugin/consumables/src/consumables.plugin.kts +++ b/game/plugin/consumables/src/consumables.plugin.kts @@ -1,3 +1,4 @@ +import org.apollo.game.action.ActionBlock import org.apollo.game.action.AsyncAction import org.apollo.game.message.impl.ItemOptionMessage import org.apollo.game.model.Animation @@ -27,7 +28,7 @@ class ConsumeAction(val consumable: Consumable, player: Player, val slot: Int) : } } - suspend override fun executeActionAsync() { + override fun action(): ActionBlock = { consumable.consume(mob, slot) mob.playAnimation(Animation(CONSUME_ANIMATION_ID)) wait(consumable.delay) diff --git a/game/plugin/dummy/src/dummy.plugin.kts b/game/plugin/dummy/src/dummy.plugin.kts index 1730ff80..f02507b1 100644 --- a/game/plugin/dummy/src/dummy.plugin.kts +++ b/game/plugin/dummy/src/dummy.plugin.kts @@ -1,6 +1,7 @@ import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.selects.select +import org.apollo.game.action.ActionBlock import org.apollo.game.action.AsyncDistancedAction import org.apollo.game.action.DistancedAction import org.apollo.game.message.impl.ObjectActionMessage @@ -53,9 +54,9 @@ class DummyAction(val player: Player, position: Position) : AsyncDistancedAction } - override suspend fun executeActionAsync() { + override fun action(): ActionBlock = { mob.sendMessage("You hit the dummy.") - mob.turnTo(this.position) + mob.turnTo(position) mob.playAnimation(PUNCH_ANIMATION) wait() @@ -66,8 +67,6 @@ class DummyAction(val player: Player, position: Position) : AsyncDistancedAction } else { skills.addExperience(Skill.ATTACK, EXP_PER_HIT) } - - stop() } } diff --git a/game/plugin/skills/mining/src/mining.plugin.kts b/game/plugin/skills/mining/src/mining.plugin.kts index f44020b9..22c9652c 100644 --- a/game/plugin/skills/mining/src/mining.plugin.kts +++ b/game/plugin/skills/mining/src/mining.plugin.kts @@ -1,3 +1,4 @@ +import org.apollo.game.action.ActionBlock import org.apollo.game.action.AsyncDistancedAction import org.apollo.game.action.DistancedAction import org.apollo.game.message.impl.ObjectActionMessage @@ -68,18 +69,15 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget) } } - override fun start() { + override fun action() : ActionBlock = { mob.turnTo(position) val level = mob.skills.mining.currentLevel - if (level < target.ore.level) { mob.sendMessage("You do not have the required level to mine this rock.") stop() } - } - override suspend fun executeActionAsync() { mob.sendMessage("You swing your pick at the rock.") mob.playAnimation(tool.animation) @@ -88,14 +86,12 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget) val obj = target.getObject(mob.world) if (!obj.isPresent) { stop() - return } if (target.isSuccessful(mob)) { if (mob.inventory.freeSlots() == 0) { mob.inventory.forceCapacityExceeded() stop() - return } if (mob.inventory.add(target.ore.id)) { @@ -105,7 +101,6 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget) mob.skills.addExperience(Skill.MINING, target.ore.exp) mob.world.expireObject(obj.get(), target.ore.objects[target.objectId]!!, target.ore.respawn) stop() - return } } } @@ -142,7 +137,7 @@ class ProspectingAction(val m: Player, val p: Position, val ore: Ore) : AsyncDis } } - suspend override fun executeActionAsync() { + override fun action() : ActionBlock = { mob.sendMessage("You examine the rock for ores...") mob.turnTo(position) diff --git a/game/src/main/kotlin/org/apollo/game/action/ActionCoroutine.kt b/game/src/main/kotlin/org/apollo/game/action/ActionCoroutine.kt new file mode 100644 index 00000000..1087142b --- /dev/null +++ b/game/src/main/kotlin/org/apollo/game/action/ActionCoroutine.kt @@ -0,0 +1,139 @@ +package org.apollo.game.action + +import kotlinx.coroutines.experimental.suspendCancellableCoroutine +import java.util.concurrent.CancellationException +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.experimental.Continuation +import kotlin.coroutines.experimental.CoroutineContext +import kotlin.coroutines.experimental.EmptyCoroutineContext +import kotlin.coroutines.experimental.RestrictsSuspension +import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED +import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked +import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn + +typealias ActionPredicate = () -> Boolean +typealias ActionBlock = suspend ActionCoroutine.() -> Unit + +interface ActionCoroutineCondition { + /** + * Called once every tick to check if `Continuation` associated with this condition should be resumed. + */ + fun resume(): Boolean +} + +/** + * A continuation condition that waits on a given number of pulses. + */ +class AwaitPulses(pulses: Int) : ActionCoroutineCondition { + val remainingPulses: AtomicInteger = AtomicInteger(pulses) + + override fun resume(): Boolean { + return remainingPulses.decrementAndGet() <= 0 + } +} + +/** + * A continuation condition that waits until a predicate is fufilled. + */ +class AwaitPredicate(val predicate: ActionPredicate) : ActionCoroutineCondition { + override fun resume(): Boolean { + return predicate.invoke() + } +} + +/** + * A suspend point in an `ActionCoroutine` that has its `continuation` resumed whenever the `condition` evaluates + * to `true`. + */ +data class ActionCoroutineStep(val condition: ActionCoroutineCondition, internal val continuation: Continuation) + +@RestrictsSuspension +class ActionCoroutine : Continuation { + companion object { + /** + * Create a new `ActionCoroutine` and immediately execute the given `block`, returning a continuation that + * can be resumed. + */ + fun start(block: ActionBlock) : ActionCoroutine { + val coroutine = ActionCoroutine() + val continuation = block.createCoroutineUnchecked(coroutine, coroutine) + + coroutine.resumeContinuation(continuation) + + return coroutine + } + } + + override val context: CoroutineContext = EmptyCoroutineContext + override fun resume(value: Unit) {} + override fun resumeWithException(exception: Throwable) = throw exception + + private fun resumeContinuation(continuation: Continuation, allowCancellation: Boolean = true) { + try { + continuation.resume(Unit) + } catch (ex: CancellationException) { + if (!allowCancellation) { + throw ex + } + } + } + + /** + * The next `step` in this `ActionCoroutine` saved as a resume point. + */ + private var next = AtomicReference() + + /** + * Check if this continuation has no more steps to execute. + */ + fun stopped(): Boolean { + return next.get() == null + } + + /** + * Update this continuation and check if the condition for the next step to be resumed is satisfied. + */ + fun pulse() { + val nextStep = next.getAndSet(null) + if (nextStep == null) { + return + } + + val condition = nextStep.condition + val continuation = nextStep.continuation + + if (condition.resume()) { + resumeContinuation(continuation) + } else { + next.compareAndSet(null, nextStep) + } + } + + private suspend fun awaitCondition(condition: ActionCoroutineCondition) { + return suspendCoroutineOrReturn { cont -> + next.compareAndSet(null, ActionCoroutineStep(condition, cont)) + COROUTINE_SUSPENDED + } + } + + /** + * Stop execution of this continuation. + */ + suspend fun stop() { + return suspendCancellableCoroutine(true) { cont -> + next.set(null) + cont.cancel() + } + } + + /** + * Wait `pulses` game updates before resuming this continuation. + */ + suspend fun wait(pulses: Int = 1) = awaitCondition(AwaitPulses(pulses)) + + /** + * Wait until the `predicate` returns `true` before resuming this continuation. + */ + suspend fun wait(predicate: ActionPredicate) = awaitCondition(AwaitPredicate(predicate)) +} \ No newline at end of file diff --git a/game/src/main/kotlin/org/apollo/game/action/AsyncAction.kt b/game/src/main/kotlin/org/apollo/game/action/AsyncAction.kt index abf85dc8..e8df401d 100644 --- a/game/src/main/kotlin/org/apollo/game/action/AsyncAction.kt +++ b/game/src/main/kotlin/org/apollo/game/action/AsyncAction.kt @@ -3,24 +3,14 @@ package org.apollo.game.action import org.apollo.game.model.entity.Mob abstract class AsyncAction : Action, AsyncActionTrait { - override val runner: AsyncActionRunner - constructor(delay: Int, immediate: Boolean, mob: T) : super(delay, immediate, mob) { - this.runner = AsyncActionRunner({ this }, { executeActionAsync() }) - } + override var continuation: ActionCoroutine? = null - abstract suspend fun executeActionAsync() + constructor(delay: Int, immediate: Boolean, mob: T) : super(delay, immediate, mob) override fun execute() { - if (!runner.started()) { - runner.start() + if (update()) { + stop() } - - runner.pulse() - } - - override fun stop() { - super.stop() - runner.stop() } } \ No newline at end of file diff --git a/game/src/main/kotlin/org/apollo/game/action/AsyncActionRunner.kt b/game/src/main/kotlin/org/apollo/game/action/AsyncActionRunner.kt deleted file mode 100644 index 5288abc2..00000000 --- a/game/src/main/kotlin/org/apollo/game/action/AsyncActionRunner.kt +++ /dev/null @@ -1,69 +0,0 @@ -package org.apollo.game.action - -import kotlinx.coroutines.experimental.CommonPool -import kotlinx.coroutines.experimental.Job -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.launch -import kotlinx.coroutines.experimental.runBlocking - -class AsyncActionRunner(val actionSupplier: () -> Action<*>, val callback: suspend () -> Unit) { - var job: Job? = null - var pulseChannel = Channel(1) - var unsentPulses = 0 - - fun pulse() { - if (pulseChannel.offer(unsentPulses + 1)) { - unsentPulses = 0 - } else { - unsentPulses++ - } - } - - fun start() { - if (job != null) { - return - } - - val action = actionSupplier.invoke() - - job = launch(CommonPool) { - run { - while (action.isRunning) { - pulseChannel.receive() - callback() - } - } - } - } - - fun started(): Boolean { - return job != null - } - - fun stop() { - job?.cancel() - pulseChannel.close() - } - - suspend fun wait(pulses: Int = 1, pulseCallback: (() -> Unit)? = null) { - var remainingPulses = pulses - - while (remainingPulses > 0) { - val numPulses = pulseChannel.receive() - remainingPulses -= numPulses - - pulseCallback?.invoke() - } - } - - suspend fun await(condition: () -> Boolean, timeout: Int = 15) { - var remainingPulsesBeforeTimeout = timeout - - while (!condition.invoke()) { - remainingPulsesBeforeTimeout -= pulseChannel.receive() - if (remainingPulsesBeforeTimeout <= 0) { - break - } - } - } -} \ No newline at end of file diff --git a/game/src/main/kotlin/org/apollo/game/action/AsyncActionTrait.kt b/game/src/main/kotlin/org/apollo/game/action/AsyncActionTrait.kt index 7ad1a973..55a02aef 100644 --- a/game/src/main/kotlin/org/apollo/game/action/AsyncActionTrait.kt +++ b/game/src/main/kotlin/org/apollo/game/action/AsyncActionTrait.kt @@ -1,9 +1,29 @@ package org.apollo.game.action interface AsyncActionTrait { - val runner: AsyncActionRunner + /** + * The continuation that this `Action` is executing. May be `null` if this action hasn't started yet. + */ + abstract var continuation: ActionCoroutine? - suspend fun wait(pulses: Int = 1, pulseCallback: (() -> Unit)? = null) { - runner.wait(pulses, pulseCallback) + /** + * Update this action, initializing the continuation if not already initialized. + * + * @return `true` if this `Action` has completed execution. + */ + fun update(): Boolean { + val continuation = this.continuation + if (continuation == null) { + this.continuation = ActionCoroutine.start(action()) + return false + } + + continuation.pulse() + return continuation.stopped() } + + /** + * Create a new `ActionBlock` to execute. + */ + fun action() : ActionBlock } \ No newline at end of file diff --git a/game/src/main/kotlin/org/apollo/game/action/AsyncDistancedAction.kt b/game/src/main/kotlin/org/apollo/game/action/AsyncDistancedAction.kt index 343b2fd3..761714d0 100644 --- a/game/src/main/kotlin/org/apollo/game/action/AsyncDistancedAction.kt +++ b/game/src/main/kotlin/org/apollo/game/action/AsyncDistancedAction.kt @@ -3,36 +3,20 @@ package org.apollo.game.action import org.apollo.game.model.Position import org.apollo.game.model.entity.Mob +/** + * A `DistancedAction` that uses `ActionCoroutine`s to run asynchronously. + */ abstract class AsyncDistancedAction : DistancedAction, AsyncActionTrait { - override val runner: AsyncActionRunner + override var continuation: ActionCoroutine? = null constructor(delay: Int, immediate: Boolean, mob: T, position: Position, distance: Int) : - super(delay, immediate, mob, position, distance) { - - this.runner = AsyncActionRunner({ this }, { executeActionAsync() }) - } - - abstract suspend fun executeActionAsync() - - open protected fun start() { - - } - - override fun stop() { - super.stop() - runner.stop() - } + super(delay, immediate, mob, position, distance) override fun executeAction() { - start() - - if (!runner.started()) { - runner.start() + if (update()) { + stop() } - - runner.pulse() } - } diff --git a/game/src/test/kotlin/org/apollo/game/action/ActionCoroutineTest.kt b/game/src/test/kotlin/org/apollo/game/action/ActionCoroutineTest.kt new file mode 100644 index 00000000..4b6df03d --- /dev/null +++ b/game/src/test/kotlin/org/apollo/game/action/ActionCoroutineTest.kt @@ -0,0 +1,39 @@ +package org.apollo.game.action + +import org.apollo.game.action.ActionCoroutine +import org.junit.Assert.* +import org.junit.Test + +class ActionCoroutineTest { + @Test + fun `Coroutine execution resumes after a pulse() call`() { + val coroutine = ActionCoroutine.start { + wait(1) + } + + coroutine.pulse() + assertTrue(coroutine.stopped()) + } + + @Test + fun `Coroutine suspends on wait() calls`() { + val coroutine = ActionCoroutine.start { + wait(1) + } + + // Check that the continuation is still waiting on a pulse. + assertFalse(coroutine.stopped()) + } + + @Test + fun `Coroutine cancels on stop() calls`() { + val coroutine = ActionCoroutine.start { + stop() + wait(1) + } + + assertTrue(coroutine.stopped()) + coroutine.pulse() + assertTrue(coroutine.stopped()) + } +} \ No newline at end of file