Refactor asynchronous actions to avoid thread usage

Adds custom coroutines specifically for use within Actions.  This
implements all features of the previous system, however, does not rely
on a separate executor to wake up continuations.

Note: since Actions are still ran by an executor and a small chance of
overlap exists the ActionCoroutine implementation uses atomic counters
and references.
This commit is contained in:
Gary Tierney
2017-09-19 01:52:10 +01:00
parent 169d89ffc0
commit 76dd8ba192
9 changed files with 220 additions and 122 deletions
@@ -1,3 +1,4 @@
import org.apollo.game.action.ActionBlock
import org.apollo.game.action.AsyncAction
import org.apollo.game.message.impl.ItemOptionMessage
import org.apollo.game.model.Animation
@@ -27,7 +28,7 @@ class ConsumeAction(val consumable: Consumable, player: Player, val slot: Int) :
}
}
suspend override fun executeActionAsync() {
override fun action(): ActionBlock = {
consumable.consume(mob, slot)
mob.playAnimation(Animation(CONSUME_ANIMATION_ID))
wait(consumable.delay)
+3 -4
View File
@@ -1,6 +1,7 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.selects.select
import org.apollo.game.action.ActionBlock
import org.apollo.game.action.AsyncDistancedAction
import org.apollo.game.action.DistancedAction
import org.apollo.game.message.impl.ObjectActionMessage
@@ -53,9 +54,9 @@ class DummyAction(val player: Player, position: Position) : AsyncDistancedAction
}
override suspend fun executeActionAsync() {
override fun action(): ActionBlock = {
mob.sendMessage("You hit the dummy.")
mob.turnTo(this.position)
mob.turnTo(position)
mob.playAnimation(PUNCH_ANIMATION)
wait()
@@ -66,8 +67,6 @@ class DummyAction(val player: Player, position: Position) : AsyncDistancedAction
} else {
skills.addExperience(Skill.ATTACK, EXP_PER_HIT)
}
stop()
}
}
@@ -1,3 +1,4 @@
import org.apollo.game.action.ActionBlock
import org.apollo.game.action.AsyncDistancedAction
import org.apollo.game.action.DistancedAction
import org.apollo.game.message.impl.ObjectActionMessage
@@ -68,18 +69,15 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget)
}
}
override fun start() {
override fun action() : ActionBlock = {
mob.turnTo(position)
val level = mob.skills.mining.currentLevel
if (level < target.ore.level) {
mob.sendMessage("You do not have the required level to mine this rock.")
stop()
}
}
override suspend fun executeActionAsync() {
mob.sendMessage("You swing your pick at the rock.")
mob.playAnimation(tool.animation)
@@ -88,14 +86,12 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget)
val obj = target.getObject(mob.world)
if (!obj.isPresent) {
stop()
return
}
if (target.isSuccessful(mob)) {
if (mob.inventory.freeSlots() == 0) {
mob.inventory.forceCapacityExceeded()
stop()
return
}
if (mob.inventory.add(target.ore.id)) {
@@ -105,7 +101,6 @@ class MiningAction(player: Player, val tool: Pickaxe, val target: MiningTarget)
mob.skills.addExperience(Skill.MINING, target.ore.exp)
mob.world.expireObject(obj.get(), target.ore.objects[target.objectId]!!, target.ore.respawn)
stop()
return
}
}
}
@@ -142,7 +137,7 @@ class ProspectingAction(val m: Player, val p: Position, val ore: Ore) : AsyncDis
}
}
suspend override fun executeActionAsync() {
override fun action() : ActionBlock = {
mob.sendMessage("You examine the rock for ores...")
mob.turnTo(position)
@@ -0,0 +1,139 @@
package org.apollo.game.action
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.RestrictsSuspension
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.createCoroutineUnchecked
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
typealias ActionPredicate = () -> Boolean
typealias ActionBlock = suspend ActionCoroutine.() -> Unit
interface ActionCoroutineCondition {
/**
* Called once every tick to check if `Continuation` associated with this condition should be resumed.
*/
fun resume(): Boolean
}
/**
* A continuation condition that waits on a given number of pulses.
*/
class AwaitPulses(pulses: Int) : ActionCoroutineCondition {
val remainingPulses: AtomicInteger = AtomicInteger(pulses)
override fun resume(): Boolean {
return remainingPulses.decrementAndGet() <= 0
}
}
/**
* A continuation condition that waits until a predicate is fufilled.
*/
class AwaitPredicate(val predicate: ActionPredicate) : ActionCoroutineCondition {
override fun resume(): Boolean {
return predicate.invoke()
}
}
/**
* A suspend point in an `ActionCoroutine` that has its `continuation` resumed whenever the `condition` evaluates
* to `true`.
*/
data class ActionCoroutineStep(val condition: ActionCoroutineCondition, internal val continuation: Continuation<Unit>)
@RestrictsSuspension
class ActionCoroutine : Continuation<Unit> {
companion object {
/**
* Create a new `ActionCoroutine` and immediately execute the given `block`, returning a continuation that
* can be resumed.
*/
fun start(block: ActionBlock) : ActionCoroutine {
val coroutine = ActionCoroutine()
val continuation = block.createCoroutineUnchecked(coroutine, coroutine)
coroutine.resumeContinuation(continuation)
return coroutine
}
}
override val context: CoroutineContext = EmptyCoroutineContext
override fun resume(value: Unit) {}
override fun resumeWithException(exception: Throwable) = throw exception
private fun resumeContinuation(continuation: Continuation<Unit>, allowCancellation: Boolean = true) {
try {
continuation.resume(Unit)
} catch (ex: CancellationException) {
if (!allowCancellation) {
throw ex
}
}
}
/**
* The next `step` in this `ActionCoroutine` saved as a resume point.
*/
private var next = AtomicReference<ActionCoroutineStep>()
/**
* Check if this continuation has no more steps to execute.
*/
fun stopped(): Boolean {
return next.get() == null
}
/**
* Update this continuation and check if the condition for the next step to be resumed is satisfied.
*/
fun pulse() {
val nextStep = next.getAndSet(null)
if (nextStep == null) {
return
}
val condition = nextStep.condition
val continuation = nextStep.continuation
if (condition.resume()) {
resumeContinuation(continuation)
} else {
next.compareAndSet(null, nextStep)
}
}
private suspend fun awaitCondition(condition: ActionCoroutineCondition) {
return suspendCoroutineOrReturn { cont ->
next.compareAndSet(null, ActionCoroutineStep(condition, cont))
COROUTINE_SUSPENDED
}
}
/**
* Stop execution of this continuation.
*/
suspend fun stop() {
return suspendCancellableCoroutine(true) { cont ->
next.set(null)
cont.cancel()
}
}
/**
* Wait `pulses` game updates before resuming this continuation.
*/
suspend fun wait(pulses: Int = 1) = awaitCondition(AwaitPulses(pulses))
/**
* Wait until the `predicate` returns `true` before resuming this continuation.
*/
suspend fun wait(predicate: ActionPredicate) = awaitCondition(AwaitPredicate(predicate))
}
@@ -3,24 +3,14 @@ package org.apollo.game.action
import org.apollo.game.model.entity.Mob
abstract class AsyncAction<T : Mob> : Action<T>, AsyncActionTrait {
override val runner: AsyncActionRunner
constructor(delay: Int, immediate: Boolean, mob: T) : super(delay, immediate, mob) {
this.runner = AsyncActionRunner({ this }, { executeActionAsync() })
}
override var continuation: ActionCoroutine? = null
abstract suspend fun executeActionAsync()
constructor(delay: Int, immediate: Boolean, mob: T) : super(delay, immediate, mob)
override fun execute() {
if (!runner.started()) {
runner.start()
if (update()) {
stop()
}
runner.pulse()
}
override fun stop() {
super.stop()
runner.stop()
}
}
@@ -1,69 +0,0 @@
package org.apollo.game.action
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
class AsyncActionRunner(val actionSupplier: () -> Action<*>, val callback: suspend () -> Unit) {
var job: Job? = null
var pulseChannel = Channel<Int>(1)
var unsentPulses = 0
fun pulse() {
if (pulseChannel.offer(unsentPulses + 1)) {
unsentPulses = 0
} else {
unsentPulses++
}
}
fun start() {
if (job != null) {
return
}
val action = actionSupplier.invoke()
job = launch(CommonPool) {
run {
while (action.isRunning) {
pulseChannel.receive()
callback()
}
}
}
}
fun started(): Boolean {
return job != null
}
fun stop() {
job?.cancel()
pulseChannel.close()
}
suspend fun wait(pulses: Int = 1, pulseCallback: (() -> Unit)? = null) {
var remainingPulses = pulses
while (remainingPulses > 0) {
val numPulses = pulseChannel.receive()
remainingPulses -= numPulses
pulseCallback?.invoke()
}
}
suspend fun await(condition: () -> Boolean, timeout: Int = 15) {
var remainingPulsesBeforeTimeout = timeout
while (!condition.invoke()) {
remainingPulsesBeforeTimeout -= pulseChannel.receive()
if (remainingPulsesBeforeTimeout <= 0) {
break
}
}
}
}
@@ -1,9 +1,29 @@
package org.apollo.game.action
interface AsyncActionTrait {
val runner: AsyncActionRunner
/**
* The continuation that this `Action` is executing. May be `null` if this action hasn't started yet.
*/
abstract var continuation: ActionCoroutine?
suspend fun wait(pulses: Int = 1, pulseCallback: (() -> Unit)? = null) {
runner.wait(pulses, pulseCallback)
/**
* Update this action, initializing the continuation if not already initialized.
*
* @return `true` if this `Action` has completed execution.
*/
fun update(): Boolean {
val continuation = this.continuation
if (continuation == null) {
this.continuation = ActionCoroutine.start(action())
return false
}
continuation.pulse()
return continuation.stopped()
}
/**
* Create a new `ActionBlock` to execute.
*/
fun action() : ActionBlock
}
@@ -3,36 +3,20 @@ package org.apollo.game.action
import org.apollo.game.model.Position
import org.apollo.game.model.entity.Mob
/**
* A `DistancedAction` that uses `ActionCoroutine`s to run asynchronously.
*/
abstract class AsyncDistancedAction<T : Mob> : DistancedAction<T>, AsyncActionTrait {
override val runner: AsyncActionRunner
override var continuation: ActionCoroutine? = null
constructor(delay: Int, immediate: Boolean, mob: T, position: Position, distance: Int) :
super(delay, immediate, mob, position, distance) {
this.runner = AsyncActionRunner({ this }, { executeActionAsync() })
}
abstract suspend fun executeActionAsync()
open protected fun start() {
}
override fun stop() {
super.stop()
runner.stop()
}
super(delay, immediate, mob, position, distance)
override fun executeAction() {
start()
if (!runner.started()) {
runner.start()
if (update()) {
stop()
}
runner.pulse()
}
}
@@ -0,0 +1,39 @@
package org.apollo.game.action
import org.apollo.game.action.ActionCoroutine
import org.junit.Assert.*
import org.junit.Test
class ActionCoroutineTest {
@Test
fun `Coroutine execution resumes after a pulse() call`() {
val coroutine = ActionCoroutine.start {
wait(1)
}
coroutine.pulse()
assertTrue(coroutine.stopped())
}
@Test
fun `Coroutine suspends on wait() calls`() {
val coroutine = ActionCoroutine.start {
wait(1)
}
// Check that the continuation is still waiting on a pulse.
assertFalse(coroutine.stopped())
}
@Test
fun `Coroutine cancels on stop() calls`() {
val coroutine = ActionCoroutine.start {
stop()
wait(1)
}
assertTrue(coroutine.stopped())
coroutine.pulse()
assertTrue(coroutine.stopped())
}
}