Kotlin Coroutines: Flows

Kotlin Coroutines enables us to asynchronously return a single value, but how about multiple values?

Flows

Kotlin’s approach to asynchronously return multiple values is to use flows:

fun foo() = flow {
  for (i in 0 until 3) {
    println("about to emit")
    emit(i)
    println("emitted")
  }
}

suspend fun main() {
  val f = foo()

  delay(500)
  println("start collecting...")

  f.collect {
    println(it)
    delay(1000)
  }
}

Here, the flow() function (called a “flow builder”) creates a flow that emit three integer values of 0, 1, and 2, which are collected using the collect() function.

The output of the above snippet will be:

start collecting...
about to emit
0
emitted
about to emit
1
emitted
about to emit
2
emitted

We can easily see that flows are cold. It means that the code inside the flow builders are NOT run until they’re collected, and no new values will be emitted, until the already-emitted values are collected or can be buffered. This is why the foo() function doesn’t need to be a suspend function: it returns immediately, not waiting for anything.

Also, the same flow can be started as many times as you want:

suspend fun main() {
  val f = foo()

  println("start collecting...")
  f.collect { println(it) }

  println("collecting again...")
  f.collect { println(it) }
}

The output will be:

start collecting...
0
1
2
collecting again...
0
1
2

Flow builders

There are several ways to build flows:

  • flowOf() can be used to build a flow from a fixed set of values.
  • asFlow() extension function can be used to convert collections or broadcast channels, etc. to flows.
  • The flow() function in the above example can be used to build flows, if you have more complicated logics.

Wrap asynchronous calls

If you have some existing asynchronous calls that returns multiple values, you can easily wrap them using the callbackFlow() function.

The following example shows how to wrap your existing CallbackApi and Callback:

fun flowFrom(api: CallbackApi): Flow<T> = callbackFlow {
  val callback = object: Callback {
    override fun onNextValue(value: T) {
      // emit the value to the flow
      offer(value)
    }

    override fun onCompleted() {
      // close the flow
      close()
    }

    override fun onError(e: Exception) {
      // cancel the flow due to error
      cancel("Error occurred", e)
    }
  }

  api.execute(callback)

  // suspend until close() or cancel() is called
  // make sure all resources are released
  awaitClose {
    api.cancel(callback)
  }
}

Flow operators

There are two types of operators:

Terminal operators

Terminal operators that are used to collect values emitted from the flow. Calling these operators will start the flow, therefore they are suspending function.

For example, the collect() operator in the above example is the most basic one, which collects all the emitted values:

flowOf(0, 1, 2)
  .collect { println("collected: $it") }

There are also other convenient operators, e.g. toList() to create a list containing all values from the flow, or first() to return the first element from the flow and then terminates the flow, etc.

Intermediate operators

Intermediate operators that are used to transform flows. They are cold, like flows are, meaning a call to such an operator won’t start the flow.

For example, the map() operator takes an upstream flow, transforms each of the emitted element, and emits the transformed element to a new downstream flow:

flowOf(0, 1, 2)
  .map { it * 10 } // creates a new flow that will emit 0, 10, and 20

There are also other intermediate operators like filter(), sample(), etc. You can also combine multiple flow using e.g. zip() or combine(), etc.

If you can’t find the operator you want, you can always create a new one using transform(). For example, the following code creates a custom “map” operator:

inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> =
  transform { value -> emit(transform(value)) }

Flow context

The terminal operators always run in the context of the calling coroutine.

For example, in the following snippet, doSomething() always run in the context, regardless of the implementation detail of foo():

fun foo() = flow {
  ...
}

withContext(context) {
  foo().collect { doSomething(it) }
}

By default, the code inside flow() is run on the same context as provided by the collector. To specify a custom context, use the flowOn() operator.

Note that the flowOn() operator only changes context of upstream flows, and does not affect downstream flows. For example:

withContext(Dispatchers.Main) {
  foo() // run on Dispatchers.Default
    .map { ... } // run on Dispatchers.Default
    .flowOn(Dispatchers.Default())
    .map { ... } // run on Dispatchers.IO
    .flowOn(Dispatchers.IO())
    .collect { ... } // run on Dispatchers.Main
}

Exception handling

You can use the traditional try/catch block to handle exceptions upon the collect() or other terminal operators:

try {
  foo().collect { ... }
} catch (e: Exception) {
  e.printStackTrace()
}

Alternatively, you can use the catch() intermediate operator to catch the exceptions from upstreams. For example:

flow { emitData() }
  .map { map1(it) }
  .catch { ... } // catches exceptions thrown from emitData() and map1()
  .map { map2(it) }
  .collect { doSomething(it) } // throws exceptions from map2() and doSomething()
}

This is pretty much it for the basics of Kotlin flows. Hope you like it, and enjoy hacking!


See also

comments powered by Disqus