Kotlin Coroutines: Structured Concurrency

In the previous article, we discussed the basics on how to Kotlin Coroutines. Now, let’s continue to the topic of structured concurrency.

Let’s still use the same example as in the previous article:

suspend fun readFromDisk(key: String): String = withContext(Dispatchers.IO) { ... }

val deferred1 = GlobalScope.async() { readFromDisk(key1) }
val deferred2 = GlobalScope.async() { readFromDisk(key2) }

GlobalScope.launch {
  val value1 = deferred1.await()
  val value2 = deferred2.await()
}

How are we handling the case when e.g. exception is thrown from deferred1.await()? More specifically, how can we easily cancel deferred2 here?

To better understand this, we need to first discuss contexts and scopes of coroutines.

Contexts and Scopes

As briefly mentioned in the previous article, coroutines are always launched and executed in a context of some scope.

For example, GlobalScope.launch() in the example above means that the coroutine is launched in a “global scope”, meaning its lifetime is only limited by the lifetime of the whole application.

This is probably fine in many cases, but what if we need to limit the lifetime of a coroutine to a certain object? For example, in Android, we want to cancel all coroutines when an Activity is destroyed to avoid memory and resource leaks.

The Kotlin way is really easy:

class MyActivity: Activity, CoroutineScope by CoroutineScope(Dispatchers.Main) {
  override fun onCreate() {
    super.onCreate()
    launch {
      while (isActive) {
        System.out.println("--> still active...")
        delay(100)
      }
    }
  }

  override fun onDestroy() {
    cancel()
    super.onDestroy()
  }
}

Here, the MyActivity class implements the CoroutineScope interface, using Dispatchers.Main as the default dispatcher. This defines the MyActivity class as a scope for new coroutines.

The coroutine launched in the onCreate() function executes within this MyActivity scope on the main thread, i.e. its job becomes child of the parent scope’s job.

The parent is responsible of tracking all the children launched. Therefore, when cancel() is called in onDestroy(), it automatically cancels all the unfinished child jobs. Also, if you call join() on the parent, it will wait until all child jobs finish.

Note that if we use GlobalScope.launch() instead of launch() here, the launched coroutine will still execute in the global scope, instead of the scope defined by MyActivity.

Contexts

If we read the source code, we’ll notice that the CoroutineScope is just an interface with one property:

public interface CoroutineScope {
  public val coroutineContext: CoroutineContext
}

The CoroutineContext is essentially a map, describing the context of the coroutine, e.g. job of the coroutine, the dispatcher used, etc. Notably, the context should contain a Job that will become parent of new coroutines created within this context.

When launching new coroutines, you can also pass in some additional contexts, taking precedence over those from its parent. The resulting context will be used as context for the new coroutine. For example:

launch(Dispatchers.IO + CoroutineName("yourCoroutineName")) { ... }

Here, it launches a coroutine with an explicitly specified dispatcher and a name (useful for debugging purpose) at the same time, while inheriting other context values from its parent. The + operator is used to define multiple values for a coroutine context.

Note that we should not pass a Job in the context parameter, unless we really want to break the parent-child relation.

Error Handling

Now, let’s get back to the original example, and see how Kotlin handles exceptions with structured concurrency:

suspend fun readFromDisk(key: String): String = withContext(Dispatchers.IO) { ... }

launch {
  val deferred1 = async() { readFromDisk(key1) }
  val deferred2 = async() { readFromDisk(key2) }

  try {
    val value1 = deferred1.await()
    val value2 = deferred2.await()
  } catch (e: Exception) {
    ...
  }
}

When an exception is thrown from deferred1.await(), the try/catch block works as you expect. Also, deferred1 will cancel its parent, which furthers cancels all its children, including deferred2.

However, this piece of code will crash.

That’s because the parent scope is canceled and re-throws the exception, which is not caught anywhere. This behavior is unfortunately counterintuitive and often leads to unwanted results (see more discussions here).

To solve this, we should use the coroutineScope() function to create a new scope to wrap the async operations:

launch {
  try {
    coroutineScope {
      val deferred1 = async() { readFromDisk(key1) }
      val deferred2 = async() { readFromDisk(key2) }

      val value1 = deferred1.await()
      val value2 = deferred2.await()
    }
  } catch (e: Exception) {
    ...
  }
}

With this, the exception thrown from await() will be re-thrown by coroutineScope, which will be caught by the try/catch block.

Cancellation is Cooperative

As mentioned above, with structured concurrency, cancelling child coroutines is very easy. However, coroutine cancellation is cooperative, meaning calling job.cancel() won’t simply kill the coroutines. It gives developers an opportunity to clean-up and release the resources.

To check if a Job is canceled, you can periodically inspect the isActive flag:

val job = launch() {
  while (isActive) {
    ...
  }
}

You can also periodically call a suspend function that checks for cancellation, e.g. yield():

val job = launch() {
  while (true) {
    yield()
    ...
  }
}

Closing resources

You can handle resources clean-up and closing in the usual way, e.g. using the try/finally block or the use() function:

val job = launch() {
  var allocatedResources = ...
  try {
    while (isActive) {
      ...
    }
  } finally {
    allocatedResources?.release()

    // in rare cases when you need to call suspend functions in a cancelled coroutine,
    // you need to wrap them with withContext(NonCancellable) {}
    withContext(NonCancellable) {
      ...
    }
  }
}

This is pretty much all you need to know about structured concurrency, and in the next article we’ll discuss how to communicate between coroutines.


See also

comments powered by Disqus