» » The magic of Dispatchers and how to make your own Main

The magic of Dispatchers and how to make your own Main

 

I think now there are no people left who are unfamiliar with coroutines in Kotlin. Magic tool, right? Even more magical about them, I find it possible to move the calculation to another thread:

fun main() = runBlocking {
	println("Hello from ${Thread.currentThread().name}")
  withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to main

Just one line and we get hello from another thread. How does this mechanism work? In fact, it’s painfully simple, if you look at CoroutineDispatcherit, you can see two important methods there:

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

The first is responsible for the need to call dispatch, but the second is a little more interesting, it is responsible for executing the passed in block Runnablein another thread. It is worth noting that it dispatchmust guarantee execution block, otherwise you can catch a deadlock and the coroutine will never continue execution, just as the method should not be deferredblock , if necessary, continue execution on the current thread, it is better to return false from isDispatchNeeded.

Standard?

We have 4 standard dispaters that can be accessed from the class Dispatchers: Default, Unconfined, Main, IO.

Unconfined: The simplest of them is Unconfined, which will not change the flow of execution and the code from the beginning of the article will no longer be so interesting:

runBlocking(Dispatchers.Unconfined) {
	println("Hello from ${Thread.currentThread().name}")
	withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to DefaultDispatcher-worker-2)

This is achieved at the expense of isDispatchNeeded which always returns false and does not allow a call dispatch(To be fair, it should be noted that dispatch it can still be called from, yield(),but that's a completely different story).

Default, IO: These two are implemented based on ExecutorCoroutineDispatcher the complexities of implementing their own Executor'а, Default inherits SchedulerCoroutineDispatcher, which executes tasks by sending them to CoroutineScheduler, it is custom Executor, with a thread pool equal to the number of processor threads (minimum 2) for CPU-bound tasks, which can be expanded up to maxPoolSize equal to the system setting kotlinx.coroutines.scheduler.max.pool.size , or 2097150 maximum, for blocking tasks (IO). IO Dispatcher works through Default limiting itself to threads equal to the kotlinx.coroutines.io.parallelism system parameter or the number of processor threads (minimum 64)).CoroutineSchedulershould understand the blocking task or not, and this is implemented by dispatchWithContext the y method SchedulerCoroutineDispatcher, where the task type is explicitly indicated: BlockingContext from IO and NonBlocking for any tasks from Default.

Main : The thing that started it all. Coroutines themselves ( coroutine-core ) do not provide an implementation MainCoroutineDispatcher, but only a mechanism for loading it. Loading is handled by a class MainDispatcherLoaderthat uses ServiceLoaderand FastServiceLoader( Android only ) that explicitly tries to initialize kotlinx.coroutines.android.AndroidDispatcherFactory. If MainDispatcherLoader it does not find implementations MainDispatcherFactory or createDispatcher throws an exception , a standard one will be created MissingMainCoroutineDispatcherthat throws exceptions for all .

Consider the implementation in Android :

In AndroidMainCoroutineDispatcher , it is implemented on the basis of Handler , AndroidDispatcherFactory handles initialization :

override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
	val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
	return HandlerContext(mainLooper.asHandler(async = true))
}

@VisibleForTesting
internal fun Looper.asHandler(async: Boolean): Handler {
	// Async support was added in API 16.
	if (!async || Build.VERSION.SDK_INT < 16) {
		return Handler(this)
	}
	if (Build.VERSION.SDK_INT &gt;= 28) {
    // TODO compile against API 28 so this can be invoked without reflection.
    val factoryMethod = Handler::class.java.getDeclaredMethod("createAsync", Looper::class.java)
    return factoryMethod.invoke(null, this) as Handler
	}

	val constructor: Constructor&lt;Handler&gt;
	try {
    constructor = Handler::class.java.getDeclaredConstructor(Looper::class.java,
        Handler.Callback::class.java, Boolean::class.javaPrimitiveType)
	} catch (ignored: NoSuchMethodException) {
    // Hidden constructor absent. Fall back to non-async constructor.
    return Handler(this)
	}
	return constructor.newInstance(this, null, true)
}

HandlerContextIt implements itself MainCoroutineDispatcherwithDelay and pushes execution to the main thread using Handler::post:

override fun dispatch(context: CoroutineContext, block: Runnable) {
	if (!handler.post(block)) {
		cancelOnRejection(context, block)
	}
}

Delay it is also needed to override the mechanism of the function delay(), which by default works on a dedicated thread, on Android it will work through handler.postDelayed . Also here you can look at the implementation of isDispatchNeeded , which MainCoroutineDispatcher.immediate will not be called dispatch for, provided that you are already on the main thread.

Own implementation of MainCoroutineDispatcher: I was wondering how to drag coroutines into an existing Java project with an already implemented Event-Loop. Fortunately, I have a game server for experiments written entirely in Java, running in several threads with an Event-Loop on the main one. It's worth starting with the implementation MainCoroutineDispatcher:

internal class ServerDispatcher(
	private val invokeImmediately: Boolean
) : MainCoroutineDispatcher() {
	@Volatile
	private var _immediate = if (invokeImmediately) this else null

	override val immediate = _immediate ?: ServerDispatcher(true).also { _immediate = it }

	override fun isDispatchNeeded(context: CoroutineContext): Boolean =
    !invokeImmediately || !Server.getInstance().isPrimaryThread

	override fun dispatch(context: CoroutineContext, block: Runnable) {
    Server.getInstance().scheduler.scheduleTask(block)
	}

	override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    throw UnsupportedOperationException("limitedParallelism is not supported for ${this::class.qualifiedName}")
	}
}

It isDispatchNeededis no different from that in Android, the difference is in dispatch, which Runnable queues tasks that are parsed and executed in a loop on the main thread. Let's see how it works:

val scope = CoroutineScope(ServerDispatcher(invokeImmediately = false) + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

We made sure that everything works, now it's time to make the download. We create a factory:

class ServerDispatcherFactory : MainDispatcherFactory {
	override val loadPriority: Int = Int.MAX_VALUE
	override fun createDispatcher(
    allFactories: List<MainDispatcherFactory>,
  ): MainCoroutineDispatcher = ServerDispatcher(invokeImmediately = false)
}

We go to the resources and put the file kotlinx.coroutines.internal.MainDispatcherFactoryin META-INF/services with the content:

dev.israpil.coroutines.mygameserver.ServerDispatcherFactory

We check:

val scope = CoroutineScope(Dispatchers.Main.immediate + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

Enjoy coroutines.

Related Articles

Add Your Comment

reload, if the code cannot be seen

All comments will be moderated before being published.