20 November 2020

Concurrency in Kotlin: how coroutines make it easier

Concurrent software is hard to get right, as we’ve seen in the previous article, “When flows split”. Concurrency is even harder to express. Most concurrent code is being written in a way that makes our heads spin every time we need to read it. Let us see how Kotlin helps us write concurrent code that is clearer and more efficient from the start.

This help comes under the form of first-class support for coroutines.

Of course, now one question begs to be asked: what are coroutines?

We are all familiarised with the concept of a routine, also called a function or a method. Subroutines mean pretty much the same in our everyday work. But coroutines? They are not the same thing, but they are a deceptively simple concept. Coroutines are routines that cooperate with their execution environment and with each other.

This cooperation takes place in the sense that one coroutine can allow the control flow to be transferred to another coroutine without ending the first coroutine. Even potentially switching between execution contexts. Let it sink for a bit. This unusual definition means that one call from a coroutine to another may cause the original control flow to be suspended, only to resume later, maybe on a different thread.

Suspension in coroutines bears some similarities to suspension in threads. A thread becomes frozen in time and space once it is suspended. Then the OS scheduler selects another thread to unfreeze and resume execution, then suspends it, selects another, and so on until our first thread is selected to resume execution. At this point, it continues as if nothing had happened. The same concept applies to coroutines.

The big difference lies in the mechanics for suspending and resuming.

First of all, suspension in coroutines can only happen when another coroutine is being called, whereas threads can be suspended at any time. This is why we said earlier that a coroutine could “allow” the transfer of the control flow: a coroutine can never suspend if it does not call other coroutines.

Unlike traditional thread-based concurrency, there is no one-to-one relationship between coroutines and threads. So, the thread that was executing a coroutine that got suspended can move on to execute another coroutine. Coroutines are one level of abstraction higher than threads.

All this also means that calls to a coroutine will not necessarily restart its execution, as execution may continue from somewhere in its middle. It all depends on the state of the execution flow. That is right. Coroutines are stateful!

Just for completeness, we can define subroutines as a particular case of coroutines in the sense that they do not cooperate and are not stateful. One call from one subroutine to another never causes suspension of the execution flow, always continues in the same thread, and always runs from the beginning to the end of the subroutine. This is the behaviour we are used to seeing in non-concurrent programs.

Writing coroutines

Kotlin provides first-class support for coroutines. The language itself supports them, so there is no need for additional frameworks, plugins, or special libraries. The compiler and the runtime do all the work necessary to preserve and pass along the state.

public class App { 
    private static final Logger LOG = LoggerFactory.getLogger(App.class); 
    public static void main(String[] args) throws ExecutionException, InterruptedException { 
        CompletableFuture.allOf( 
                CompletableFuture.runAsync(() -> LOG.info("I'm a teapot.")), 
                CompletableFuture.runAsync(() -> LOG.info("I'm a coffee pot.")) 
            ).get(); 
        LOG.info("No more pots."); 
    } 
} 

listing #1

fun main() { 
    val log = getLogger("main") 
    runBlocking(Dispatchers.Default) { 
        launch { 
            log.info("I'm a teapot.") 
        } 
        launch { 
            log.info("I'm a coffee pot.") 
        } 
    } 
    log.info("No more pots.") 
}

listing #2

Compare the Java code in snippet 1 with the Kotlin code in snippet 2. Java got better with CompletableFuture after version 8, but Kotlin is a bit more elegant, right? Coroutines were designed to streamline the expression of concurrency in our code.

As we can see when running them, the results of both code snippets are equivalent. Both the Java lambdas and the Kotlin coroutines ran concurrently and in different threads. Let us see how we can work with Kotlin’s coroutines in practice.

We define a coroutine by using the suspend keyword in its declaration:

suspend fun doSomething(): Unit

This keyword allows the compiler to handle such functions differently. They now need to abide by the rules of coroutines. Rule number one: a coroutine may only be called from within a coroutine scope. Or in other words, from another coroutine.

It may seem like an impossible task. After all, a Kotlin program starts at the main() method, which is a regular subroutine, right? It turns out that there are constructs that allow us to create an initial coroutine scope.

runBlocking() comes to the rescue. It will take a lambda argument and run it within a coroutine. It will also not exit until the coroutine has completed. This is an essential detail because it is possible to schedule the execution of coroutines in different threads. As such, it is necessary to ensure that the main application thread does not exit until our coroutines have finished; otherwise, the application would terminate too soon.

Making coroutines concurrent

What about concurrency, then? Earlier, we mentioned concurrency and that coroutines make it easier to write concurrent code. We must keep in mind that just calling coroutines does not make them concurrent. Even though it is true that each call site of a coroutine is a suspension point, the execution flow is still sequential unless stated otherwise. Therefore, we must be explicit and say that we want a given coroutine to run concurrently. We use launch() to do so. Like runBlocking(), it will run a lambda function in a coroutine, but it will not block until its completion. Instead, it launches a child coroutine. The coroutine that called launch() will not complete until all its children do.

All of this becomes clearer if we look back at code snippets 1 and 2. In Java, we had to list explicitly all the futures we had created in order to wait for their results. The Kotlin version with coroutines is different. It automatically manages all calls made to concurrent coroutines within the runBlocking() scope.

Coroutine invocations using launch() return an instance of an object implementing Job. This interface represents the running state of a coroutine in Kotlin, much like Future represents the running state of an asynchronous task in Java. It is possible to join Jobs just like Futures. However, Job does not allow the caller to consume values returned by a coroutine. Fire-and-forget coroutines have their uses, but coroutines only achieve their full potential as functions when they can return values to their callers.

There is another interface that extends from Job, called Deferred. This interface contains the await() method, which we use to retrieve values returned by coroutines. We obtain instances of Deferred when async() is used instead of launch().

Now we can start coroutines concurrently and work with the values they return. We demonstrate this in code snippet 3.

suspend fun one(): Int = 
    withContext(Dispatchers.Default) { 
        // pretend this is computationally intensive 
        42 
    } 
 
suspend fun two(): Int = 
    withContext(Dispatchers.IO) { 
        // pretend this is an I/O operation 
        1 
    } 
 
fun main() = runBlocking { 
 
    val callOne = async { one() } 
    val callTwo = async { two() } 
    
    println(callOne.await() + callTwo.await()) 
 
} 

listing #3

Coroutines still need threads

The mechanism that supports coroutines is not magic. Remember that just like any other code, coroutines need threads to run on: just because we are not creating them ourselves, that does not mean that they do not exist.

Entities known as dispatchers manage the threads allocated to the execution of coroutines. They keep track of which coroutines are running, together with their states, and assign coroutines to threads as necessary. Just like thread pools dedicated to coroutines.

Kotlin supports several built-in dispatchers, each with its purpose. There is a dispatcher meant for I/O tasks, another to ensure that coroutines run on a thread associated with the user interface, a pre-built thread pool, among others. One should consult the Kotlin documentation to obtain more details about them since some are platform-specific.

It is possible to assign coroutines to specific dispatchers, and it is often recommended to do so. We can specify the dispatcher as an argument to the launch(), async(), and withContext() functions. Once a dispatcher is defined, by default or explicitly, it will be used by subsequent launch() and async() calls unless these specify a different dispatcher.

Code snippet 3 demonstrates usage of withContext(). In this example, each coroutine ran in a different dispatcher. The caller code got suspended in each invocation of await() until the respective coroutine finished.

Looking back at code snippet 2, we see that runBlocking() determined the dispatcher to use. In the example, it was necessary to define a dispatcher explicitly in order to ensure that the outcome would be as displayed: each coroutine running in a thread other than the main thread. If left undefined, the dispatcher used may be the Unconfined dispatcher: each coroutine will run in the calling thread until they suspend. Because the coroutines in the example never suspend – the logging operation is blocking – then only the main thread would be used.

Regardless of the function utilised to change the dispatcher, the chosen dispatcher will remain in scope and will be used for new coroutine invocations until it is changed again later.

Dispatcher management can become as complicated as thread pool management. A useful piece of advice is to use withContext() to specify the desired dispatcher. Of course, that would be overkill to do in every coroutine. So, we should use our best judgment and always remember to ask ourselves if the coroutine we are writing can run in any dispatcher or should be constrained to a specific dispatcher.

We may create our dispatchers, for example, when we want to manage the resources used by a set of coroutines. Or when these need to be segregated from other coroutines due to their runtime characteristics. Usually, this is done by creating an ExecutorService. The dispatcher is then obtained through the asCoroutineDispatcher() Kotlin extension method. No further management on our part is required, except that we must remember to shut down the executor. See code snippet number 4 for an example.

fun main() { 
    val executor = createExecutor() 
    try { 
        runBlocking(executor.asCoroutineDispatcher()) { 
             getLogger("sample").info("I'm running in a custom dispatcher.") 
        } 
    } finally { 
        executor.shutdown() 
    } 
} 
 
fun createExecutor(): ExecutorService = 
    Executors.newSingleThreadExecutor { 
        val thread = Thread(it) 
        thread.name = "custom-single-thread" 
        thread 
    } 

listing #4

Error management

Error management is an essential part of programming. It is relatively simple to reason about error flows in sequential code. However, it can be challenging to foresee all possible error flows once we start to use concurrent patterns.

For example, if we launch three concurrent tasks for one computation, and one of them fails, we need to ensure that all of them have completed before failing the entire computation. Some or all of the others may fail too, even at different moments in time. We must also handle all these possible failures. We even want to cancel tasks that have not failed instead of waiting for them to finish, since we already know that the computation as a whole will fail. All this management can add complexity that is unnecessary to the original purpose of the computation.

Structured concurrency is a paradigm whose purpose is to simplify the coordination of concurrent execution flows by enclosing them into execution scopes. These scopes are then handled automatically, terminating only when all enclosed concurrent execution flows reach their completion. In case an error occurs in one of them, then all others are cancelled.

fun main() = runBlocking { 
 
    suspend fun ne(): Int { 
        println("Answering the most important question in the universe...") 
        delay(1_000) 
        println("... done!") 
        return 42 
    } 
    
    suspend fun two(): Int { 
        println("Finding the question...") 
        throw ArithmeticException() 
    } 
    
    try { 
        coroutineScope { 
        
            val first = async { one() } 
            val second = async { two() } 
        
            println(first.await() + second.await()) 
        
        } 
    } catch (e: Exception) { 
            println("Coroutines have thrown ${e::class.java}") 
    } 
 
} 

listing #5

We want to simulate two non-blocking calculations in code snippet 5. Unfortunately, in this simulation, one of them fails with an exception. The runtime library cancels both coroutines after the failure. Even the coroutine that did not fail is not allowed to run to its completion. The exception is propagated to the code that called await(), so it can handle the error appropriately.

Not only does this make it easier to coordinate the execution of separate concurrent flows, but it also eases the cognitive burden of managing cancellations and failure cases explicitly. Without a structured approach, the overhead introduced by error and cancellation handling leads to quite a bit of boilerplate code that distracts us from the real purpose of the computation. Structured concurrency eliminates most of this added complexity.

Is this all?

We do not intend to present an exhaustive description of coroutines and concurrency in this article. Much remains to be said about them. Instead, we hope to have shown that Kotlin has allowed concurrent code to become more readable by offering first-class support for coroutines. Form no longer obscures the intent. Happy concurrent coding!

Additional reading

[1] Alst, Paulien van (2020), Kotlin - Hype of een nieuwe speler op de markt?, Java Magazine, 01 2020, pp 22-24

[2] Subramaniam, Venkat (2019), Programming Kotlin, Pragmatic Bookshelf

[3] Kotlin documentation on coroutines, https://kotlinlang.org/docs/reference/coroutines-overview.html


Vasco Veloso

I'm a polyglot senior software developer, currently interested in software design and architecture.