In Coroutine 1.6.0, limitedParallelism()
was introduced to provide a view of the underlying dispatcher, guaranteeing no more than the specified coroutines can be executed at the same time.
So, if we set the limit to 1
like below, we can make sure the launched coroutines are executed one by one, right?
private val coroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
// job 1
coroutineScope.launch {
println("Before delaying for 2000ms")
delay(2000)
println("After 2000ms")
}
// job 2
coroutineScope.launch {
println("Before delaying for 1000ms")
delay(1000)
println("After 1000ms")
}
Unfortunately, the output might be something like this:
Before delaying for 2000ms
Before delaying for 1000ms
After 1000ms
After 2000ms
This is because limitedParallelism()
limits parallelism, not concurrency.
In this case, when Job 1 is suspended after calling delay(2000)
, the dispatcher starts to execute Job 2, and eventually, Job 2 is finished before Job 1.
Similarly, the dispatchers created by e.g. ExecutorService.asCoroutineDispatcher()
don’t limit the concurrency either.
Use a mutex
Fortunately, we can use Mutex
or Semaphore
to limit concurrency. For example, with the following code, we can guarantee the blocks locked by withLock()
are executed one by one:
private val mutex = Mutex()
// Job 1
coroutineScope.launch {
mutex.withLock {
println("Before delaying for 2000ms")
delay(2000)
println("After 2000ms")
}
}
// Job 2
coroutineScope.launch {
mutex.withLock {
println("Before delaying for 1000ms")
delay(1000)
println("After 1000ms")
}
}
However, mutex and semaphore can only guarantee the blocks are executed one by one, but not in the order when the job is launched. This is because there is a race condition between the launch of coroutines and the time they are queued in the mutex.
Channel to the rescue
A Channel
is conceptually similar to a BlockingQueue
, but with support for suspend functions, making it a perfect candidate to ensure the execution order:
fun interface Task {
suspend fun run()
}
class SequentialExecutor {
private val coroutineScope = ...
private val channelForSequentialExecution = Channel<Task>(capacity = ...)
init {
coroutineScope.launch {
for (task in channelForSequentialExecution) {
task.run()
}
}
}
fun enqueue(task: Task): Boolean {
return channelForSequentialExecution.trySend(task).isSuccess
}
}
Conclusion
Remember, limitedParallelism()
limits parallelism, but not concurrency.