Add support for asynchronous Mob actions

Adds asynchronous implementations of the Action and DistancedAction
classes, allowing plugins to make use of suspendable functions in Kotlin
instead of creating mini state machines to keep track of state.

The asynchronicity works by creating coroutine backed Channels for each
asynchronous action and having them listen on "pulse" events from the
action scheduler.  The action can then suspend execution until a pulse is
received or until some expensive operation has completed (i.e.,
pathfinding).

If an asynchronous action is still busy when a pulse arrives then the
number of missed pulses will be accumulated and sent to the action at
the next possible time.

The training dummy plugin has been updated to use asycnrhonous actions
as an example.
This commit is contained in:
Gary Tierney
2017-06-20 06:53:00 +01:00
parent 182de0330f
commit 97e85868ff
10 changed files with 156 additions and 187 deletions
+8
View File
@@ -24,6 +24,12 @@ sourceSets {
}
}
repositories {
maven {
url { 'https://dl.bintray.com/kotlin/kotlinx/' }
}
}
dependencies {
compile project(':cache')
compile project(':net')
@@ -32,6 +38,8 @@ dependencies {
compile group: 'io.github.lukehutch', name: 'fast-classpath-scanner', version: '2.0.21'
compile group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jre8', version: "$kotlinVersion"
compile group: 'org.jetbrains.kotlin', name: 'kotlin-compiler-embeddable', version: "$kotlinVersion"
compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-jdk8', version: '0.16'
compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '0.16'
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.8.0'
}
@@ -1,9 +0,0 @@
package org.apollo.game.plugin;
import org.apollo.game.plugin.kotlin.KotlinPluginCompiler;
public class KotlinPluginCompilerStub {
public static void main(String[] argv) {
KotlinPluginCompiler.main(argv);
}
}
@@ -19,7 +19,6 @@ import java.util.stream.Collectors;
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner;
import io.github.lukehutch.fastclasspathscanner.scanner.ScanResult;
import org.apollo.game.model.World;
import org.apollo.game.plugin.kotlin.KotlinPluginCompiler;
import org.apollo.game.plugin.kotlin.KotlinPluginScript;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.kotlin.cli.common.messages.CompilerMessageLocation;
@@ -0,0 +1,26 @@
package org.apollo.game.action
import org.apollo.game.model.entity.Mob
abstract class AsyncAction<T : Mob> : Action<T>, AsyncActionTrait {
override val runner: AsyncActionRunner
constructor(delay: Int, immediate: Boolean, mob: T) : super(delay, immediate, mob) {
this.runner = AsyncActionRunner({ this }, { executeActionAsync() })
}
abstract suspend fun executeActionAsync()
override fun execute() {
if (!runner.started()) {
runner.start()
}
runner.pulse()
}
override fun stop() {
super.stop()
runner.stop()
}
}
@@ -0,0 +1,56 @@
package org.apollo.game.action
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.selects.select
import org.apollo.game.model.entity.Mob
import java.util.function.Supplier
class AsyncActionRunner(val actionSupplier: () -> Action<*>, val callback: suspend () -> Unit) {
var job: Job? = null
var pulseChannel = Channel<Int>(1)
var unsentPulses = 0
fun pulse() {
if (pulseChannel.offer(unsentPulses + 1)) {
unsentPulses = 0
} else {
unsentPulses++
}
}
fun start() {
if (job != null) {
return
}
val action = actionSupplier.invoke()
job = launch(CommonPool) {
select {
pulseChannel.onReceive {
callback()
action.stop()
}
}
}
}
fun started(): Boolean {
return job != null
}
fun stop() {
job?.cancel()
pulseChannel.close()
}
suspend fun wait(pulses: Int = 1) {
var remainingPulses = pulses
while (remainingPulses > 0) {
val numPulses = pulseChannel.receive()
remainingPulses -= numPulses
}
}
}
@@ -0,0 +1,9 @@
package org.apollo.game.action
interface AsyncActionTrait {
val runner: AsyncActionRunner
suspend fun wait(pulses: Int = 1) {
runner.wait(pulses)
}
}
@@ -0,0 +1,32 @@
package org.apollo.game.action
import org.apollo.game.model.Position
import org.apollo.game.model.entity.Mob
abstract class AsyncDistancedAction<T : Mob> : DistancedAction<T>, AsyncActionTrait {
override val runner: AsyncActionRunner
constructor(delay: Int, immediate: Boolean, mob: T, position: Position, distance: Int) :
super(delay, immediate, mob, position, distance) {
this.runner = AsyncActionRunner({ this }, { executeActionAsync() })
}
abstract suspend fun executeActionAsync()
override fun stop() {
super.stop()
runner.stop()
}
override fun executeAction() {
if (!runner.started()) {
runner.start()
}
runner.pulse()
}
}
@@ -1,159 +0,0 @@
package org.apollo.game.plugin.kotlin
import org.jetbrains.kotlin.cli.common.CLIConfigurationKeys
import org.jetbrains.kotlin.cli.common.messages.*
import org.jetbrains.kotlin.cli.jvm.compiler.*
import org.jetbrains.kotlin.cli.jvm.config.JvmClasspathRoot
import org.jetbrains.kotlin.codegen.CompilationException
import org.jetbrains.kotlin.com.intellij.openapi.util.Disposer
import org.jetbrains.kotlin.config.*
import org.jetbrains.kotlin.script.KotlinScriptDefinitionFromAnnotatedTemplate
import java.io.File
import java.lang.management.ManagementFactory
import java.net.URISyntaxException
import java.net.URLClassLoader
import java.nio.file.*
import java.nio.file.StandardOpenOption.*
import java.util.*
class KotlinMessageCollector : MessageCollector {
override fun clear() {
}
override fun report(severity: CompilerMessageSeverity, message: String, location: CompilerMessageLocation) {
if (severity.isError) {
println("${location.path}:${location.line}-${location.column}: $message")
println(">>> ${location.lineContent}")
}
}
override fun hasErrors(): Boolean {
return false
}
}
data class KotlinCompilerResult(val fqName: String, val outputPath: Path)
class KotlinPluginCompiler(val classpath: List<File>, val messageCollector: MessageCollector) {
companion object {
fun currentClasspath(): List<File> {
val classLoader = Thread.currentThread().contextClassLoader as? URLClassLoader ?:
throw RuntimeException("Unable to resolve classpath for current ClassLoader")
val classpathUrls = classLoader.urLs
val classpath = ArrayList<File>()
for (classpathUrl in classpathUrls) {
try {
classpath.add(File(classpathUrl.toURI()))
} catch (e: URISyntaxException) {
throw RuntimeException("URL returned by ClassLoader is invalid")
}
}
return classpath
}
@JvmStatic
fun main(args: Array<String>) {
if (args.size < 2) throw RuntimeException("Usage: <outputDirectory> script1.kts script2.kts ...")
val outputDir = Paths.get(args[0])
val inputScripts = args.slice(1..args.size - 1).map { Paths.get(it) }
val classpath = mutableListOf<File>()
val runtimeBean = ManagementFactory.getRuntimeMXBean()
if (!runtimeBean.isBootClassPathSupported) {
println("Warning! Boot class path is not supported, must be supplied on the command line")
} else {
val bootClasspath = runtimeBean.bootClassPath
classpath.addAll(bootClasspath.split(File.pathSeparatorChar).map { File(it) })
}
/**
* Our current classpath should contain all compile time dependencies for the plugin as well as Apollo's
* own sources. We can also achieve this via Gradle but doing it at runtime prevents Gradle from thinking
* the build has been modified after evaluation.
*/
classpath.addAll(currentClasspath())
val compiler = KotlinPluginCompiler(classpath, MessageCollector.NONE)
val compiledScriptClasses = mutableListOf<String>()
try {
try {
Files.createDirectory(outputDir)
} catch (e: FileAlreadyExistsException) {
// do nothing...
}
inputScripts.forEach {
compiledScriptClasses.add(compiler.compile(it, outputDir).fqName)
}
} catch (t: Throwable) {
t.printStackTrace()
System.exit(1)
}
}
}
private fun createCompilerConfiguration(inputPath: Path): CompilerConfiguration {
val configuration = CompilerConfiguration()
val scriptDefinition = KotlinScriptDefinitionFromAnnotatedTemplate(KotlinPluginScript::class)
configuration.add(JVMConfigurationKeys.SCRIPT_DEFINITIONS, scriptDefinition)
configuration.put(JVMConfigurationKeys.CONTENT_ROOTS, classpath.map { JvmClasspathRoot(it) })
configuration.put(JVMConfigurationKeys.RETAIN_OUTPUT_IN_MEMORY, true)
configuration.put(CLIConfigurationKeys.MESSAGE_COLLECTOR_KEY, KotlinMessageCollector())
configuration.put(CommonConfigurationKeys.MODULE_NAME, inputPath.toString())
configuration.addKotlinSourceRoot(inputPath.toAbsolutePath().toString())
return configuration
}
@Throws(KotlinPluginCompilerException::class)
fun compile(inputPath: Path, outputPath: Path): KotlinCompilerResult {
val rootDisposable = Disposer.newDisposable()
val configuration = createCompilerConfiguration(inputPath)
val configFiles = EnvironmentConfigFiles.JVM_CONFIG_FILES
val environment = KotlinCoreEnvironment.createForProduction(rootDisposable, configuration, configFiles)
try {
val generationState = KotlinToJVMBytecodeCompiler.analyzeAndGenerate(environment)
if (generationState == null) {
throw KotlinPluginCompilerException("Failed to generate bytecode for kotlin script")
}
val sourceFiles = environment.getSourceFiles()
val script = sourceFiles[0].script ?: throw KotlinPluginCompilerException("Main script file isnt a script")
val scriptFilePath = script.fqName.asString().replace('.', '/') + ".class"
val scriptFileClass = generationState.factory.get(scriptFilePath)
if (scriptFileClass == null) {
throw KotlinPluginCompilerException("Unable to find compiled plugin class file $scriptFilePath")
}
generationState.factory.asList().forEach {
Files.write(outputPath.resolve(it.relativePath), it.asByteArray(), CREATE, WRITE, TRUNCATE_EXISTING)
}
return KotlinCompilerResult(script.fqName.asString(), outputPath.resolve(scriptFileClass.relativePath))
} catch (e: CompilationException) {
throw KotlinPluginCompilerException("Compilation failed", e)
} finally {
Disposer.dispose(rootDisposable)
}
}
}
class KotlinPluginCompilerException(message: String, cause: Throwable? = null) : Exception(message, cause) {
}
@@ -38,6 +38,16 @@ abstract class KotlinPluginTestHelpers {
do {
action.pulse()
/**
* Introducing an artificial delay is necessary to prevent the timeout being exceeded before
* an asynchronous [Action] really starts. When a job is submitted to a new coroutine context
* there may be a delay before it is actually executed.
*
* This delay is typically sub-millisecond and is only incurred with startup. Since game actions
* have larger delays of their own this isn't a problem in practice.
*/
Thread.sleep(50L)
} while (action.isRunning && pulses++ < timeout)
Assert.assertFalse("Exceeded timeout waiting for action completion", pulses > timeout)
+15 -18
View File
@@ -1,9 +1,12 @@
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.selects.select
import org.apollo.game.action.AsyncDistancedAction
import org.apollo.game.action.DistancedAction
import org.apollo.game.message.impl.ObjectActionMessage
import org.apollo.game.model.Animation
import org.apollo.game.model.Position
import org.apollo.game.model.entity.Player
import org.apollo.game.model.entity.Skill
import org.apollo.game.model.entity.*
import org.apollo.net.message.Message
/**
@@ -15,7 +18,7 @@ on { ObjectActionMessage::class }
.where { option == 2 && id in DUMMY_IDS }
.then { DummyAction.start(this, it, position) }
class DummyAction(val player: Player, position: Position) : DistancedAction<Player>(0, true, player, position, DISTANCE) {
class DummyAction(val player: Player, position: Position) : AsyncDistancedAction<Player>(0, true, player, position, DISTANCE) {
companion object {
@@ -49,25 +52,19 @@ class DummyAction(val player: Player, position: Position) : DistancedAction<Play
}
var started = false
override fun executeAction() {
if (started) {
val skills = player.skillSet
override suspend fun executeActionAsync() {
mob.sendMessage("You hit the dummy.")
mob.turnTo(this.position)
mob.playAnimation(PUNCH_ANIMATION)
wait()
if (skills.getSkill(Skill.ATTACK).maximumLevel >= LEVEL_THRESHOLD) {
player.sendMessage("There is nothing more you can learn from hitting a dummy.")
} else {
skills.addExperience(Skill.ATTACK, EXP_PER_HIT)
}
val skills = player.skillSet
stop()
if (skills.getSkill(Skill.ATTACK).maximumLevel >= LEVEL_THRESHOLD) {
player.sendMessage("There is nothing more you can learn from hitting a dummy.")
} else {
mob.sendMessage("You hit the dummy.")
mob.turnTo(this.position)
mob.playAnimation(PUNCH_ANIMATION)
started = true
skills.addExperience(Skill.ATTACK, EXP_PER_HIT)
}
}