Kotlin Coroutines: The Basics

Starting with the 1.3 release, Kotlin provides a nice and flexible way to do asynchronous or non-blocking programming: Coroutines.

How We Do Async

Before diving into coroutines, let’s first briefly go through some of the popular approaches we use to do async programming.

Threads

Consider the common use case that the app needs to fetch a list of feed items and then render them on the UI.

On Android, fetching the access token and feed items should be done on a worker thread, and the rendering should be done in the UI thread. So we’ll have something like this:

fun fetchAccessToken(): AccessToken { ... }
fun fetchFeedItems(accessToken: AccessToken): List<FeedItem> { ... }
fun render(feedItems: List<FeedItem>) { ... }

thread {
  val accessToken = fetchAccessToken()
  val feedItems = fetchFeedItems(accessToken)

  // Android specific code to dispatch the job to UI thread
  Handler(Looper.getMainLooper()).post {
    render(feedItems)
  }
}

The code looks quite simple. However, it has several drawbacks.

  • Threads are expensive, and you can have only a limited number of them.
  • Threads are difficult to use. For example, what if you need to fetch something else in parallel before rendering?

Callbacks

With callbacks, you pass one function as a parameter to another function. The passed-in function will be called when the other one finishes.

fun fetchAccessTokenAsync(cb: (AccessToken) -> Unit) { ... }
fun fetchFeedItemsAsync(cb: (List<FeedItem>) -> Unit) { ... }
fun render(feedItems: List<FeedItem>) { ... }

fetchAccessTokenAsync { accessToken ->
  fetchFeedItemsAsync(accessToken) { feedItems ->
    render(feedItems)
  }
}

The benefit is that you no longer need to maintain the threads. However, it’s still not perfect.

  • It’s difficult in case of nested callbacks, known as “callback hell”. It’s very easy to end up in a very long chain of callbacks, making it difficult to read and handle error cases, etc.
  • Also, it’s not self-evident on which thread the callback is invoked. Is it on main / UI thread? Or is it on some random worker thread? You need to read the doc or even read the code to figure out.

Futures / Reactive Extensions

RxJava is a very popular library among Android developers to do async programming:

fun requestAccessToken(): Single<AccessToken> { ... }
fun fetchFeedItems(accessToken: AccessToken): Single<List<FeedItem>> { ... }
fun render(feedItems: List<FeedItem>) { ... }

requestAccessToken()
  .map { accessToken ->
    return@Function fetchFeedItems(accessToken)
  }
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(object : DisposableSingleObserver<List<FeedItem>>() {
    override fun onSuccess(feedItems: List<FeedItem>) {
      render(feedItems>
    }

    override fun onError(t: Throwable) { ... }
  })

Compared to callbacks, instead of passing a function to be invoked as a parameter, it returns an object that can be operated on. It makes the code easier to understand, and gets rid of the “callback hell”. Also, the error handling is much nicer.

However, similar to callbacks, it changes the way how we write programming: instead of coding in a “sequential” fashion, with loops and other well known structures, developers need to structure everything as “streams”, and use operators like map() and flatMap(). The learning curve is relatively steep.

Coroutines

Kotlin’s approach to async programming is to use coroutines. The idea is to “suspend” the execution at some point, and “resume” later, making it possible to write non-block code in the traditional “sequential” fashion.

suspend fun fetchAccessToken(): AccessToken { ... }
suspend fun fetchFeedItems(accessToken: AccessToken): List<FeedItem> { ... }
fun render(feedItems: List<FeedItem>) { ... }

GlobalScope.launch {
  val accessToken = fetchAccessToken()
  val feedItems = fetchFeedItems(accessToken)
  render(feedItems)
}

Here, the suspend keyword tells that the function is a “suspending function”. It means that the function will execute, suspend execution, and resume later.

Note that “suspension” is a new capability that is only supported inside a coroutine, which is launched by the launch() function here.

Let’s first give a look to how to write the suspend functions, then discuss how to launch a coroutine and use the suspend functions.

How to Write Suspend Functions

Wrap synchronous calls

With the following code, the “synchronous” code is run on an IO thread, and therefore won’t block your main thread:

suspend fun fetchAccessToken(): AccessToken =
  withContext(Dispatchers.IO) {
  // Your old synchronous code to fetch the access token
}

Kotlin provides several built-in dispatchers:

  • Dispatchers.Main: Dispatches the code to the main thread for UI operations. Note that this needs runtime support from the underlying platform, e.g. the kotlinx-coroutines-android package for Android.
  • Dispatchers.Default: Dispatches the code to a worker thread for CPU-heavy jobs. By default, the maximum number of threads used by this dispatcher equals to the number of CPU cores.
  • Dispatchers.IO: Dispatches the code to a worker thread for I/O operations. By default, no more than 64 threads can be used by this dispatcher.

To make it more efficient, the IO dispatcher shares threads with the Default dispatcher, and tries to remove unnecessary context switch between threads.

Wrap asynchronous calls

Basically, you need to call suspendCancellableCoroutine() to suspend execution of the current coroutine, and tells it to resume() or resumeWithException() when the asynchronous call finishes.

Below is an example using Retrofit.

// It's an extension function on Retrofit's Call class.
// The function name "await" is a convention.
suspend fun <T> Call<T>.await(): T = suspendCancellableCoroutine { cont ->
  // Register a callback when the user cancels the task.
  cont.invokeOnCancellation {
    cancel() // Tells Retrofit to cancel the job.
  }

  enqueue(object : Callback<T> {
    override fun onResponse(call: Call<T>, response: Response<T>) {
      if (response.isSuccessful) {
        // Everything is fine, resume the execution.
        cont.resume(response.body())
      } else {
        // Server returns a non-2XX code, resume with an exception.
        // Effectively, it throws the exception to whoever is calling this await() function.
        cont.resumeWithException(HttpException(response))
      }
    }

    override fun onFailure(call: Call<T>, t: Throwable) {
      // Error occurs, e.g. network error, resume execution with the given exception.
      cont.resumeWithException(t)
    }
  })
}

// Assume your Retrofit interface is:
interface TranslationService {
  @GET("translationList")
  fun fetchTranslationList(): Call<TranslationList>
}

// You can use it like below:
GlobalScope.launch {
  val translationList = translationService.fetchTranslationList().await()
}

One nice thing is, Kotlin provides integration with several popular async libraries. Check here to see if your favorite library is included.

Launch a Coroutine

So now we have some suspend functions, let’s dive into how we can launch coroutines to use them.

suspend fun fetchAccessToken(): AccessToken =
  withContext(Dispatchers.IO) { ... }

suspend fun fetchFeedItems(accessToken: AccessToken): List<FeedItem> =
  withContext(Dispatchers.IO) { ... }

fun render(feedItems: List<FeedItem>) { ... }

GlobalScope.launch(Dispatchers.Main) {
  val accessToken = fetchAccessToken()
  val feedItems = fetchFeedItems(accessToken)
  render(feedItems)
}

Here, we launched a coroutine using launch(), which returns immediately. The code inside launch() is scheduled to be executed later on the main thread.

First, it calls the fetchAccessToken() function. The code inside this function will be executed on the IO thread. While it’s being executed, Kotlin suspends the coroutine, so that other code on the main thread can be executed.

Later, when the access token is fetched, the coroutine resumes on the main thread, and calls the fetchFeedItems() function. Similarly, the code inside will be executed on the IO thread, while the coroutine is suspended.

Finally, the feed items are fetched, and the coroutine is resumed on the main thread. Then it calls render() function to render the feed items.

There are several advantages using coroutines.

  • As mentioned earlier, the code is written in a “sequential” fashion, which is much easier to understand.

    • It means that you can use things like for-loop or try-catch as before:
      GlobalScope.launch {
        try {
          val accessToken = fetchAccessToken()
          for (feedItem in fetchFeedItems(accessToken)) {
            ...
          }
        } catch (e: Exception) {
          // Uncaught exceptions from fetchAccessToken() or fetchFeedItems() end up here.
        }
      }
  • It simplifies the thread handling significantly.

    • The suspend functions can decide on which thread pool they’re going to run, because they know the best.
    • It’s very obvious on which thread the coroutine is running. For example, it’s quite clear that the render() function above is run on the main thread. If you really want, you can let the render() function to decide where it should run:
      suspend fun render(feedItems: List<FeedItem>) {
        withContext(Dispatchers.Main) { ... }
      }
      
      GlobalScope.launch {
        val accessToken = fetchAccessToken()
        val feedItems = fetchFeedItems(accessToken)
        render(feedItems)
      }

Coroutine builders

As mentioned earlier, coroutines can only be launched by a coroutine builder in a context of some scope. Let’s focus on the coroutine builders here, and I’ll discuss more about context and scope in a future post.

launch()

The launch() builder is to launch a coroutine in a “fire-and-forget” fashion. It returns immediately a Job object as reference to the launched coroutine.

val job = GlobalScope.launch { ... }

You can call job.cancel() to cancel a running coroutine, and call job.join() to suspend the current coroutine until the job completes.

Note that any uncaught exceptions inside launch() will crash the app. I’ll discuss more about exception handling in a future post.

runBlocking()

The runBlocking() builder launches a coroutine, and blocks the current thread until the launched coroutine completes. It’s quite useful in the main() function and tests.

async()

The async() builder is the Kotlin way to achieve concurrency. It returns immediately a Deferred object as reference to the launched coroutine.

suspend fun readFromDisk(key: String): String = withContext(Dispatchers.IO) { ... }

val deferred1 = GlobalScope.async() { readFromDisk(key1) }
val deferred2 = GlobalScope.async() { readFromDisk(key2) }

GlobalScope.launch {
  val value1 = deferred1.await()
  val value2 = deferred2.await()
}

Not that any uncaught exceptions inside async() will be thrown when await() is called.

So, how to handle the case when e.g. exception is thrown from deferred1.await()? How are we going to cancel deferred2? That will be topic of the next post: structured concurrency.


See also

comments powered by Disqus