limitedParallelism() doesn't limit concurrency

In Coroutine 1.6.0, limitedParallelism() was introduced to provide a view of the underlying dispatcher, guaranteeing no more than the specified coroutines can be executed at the same time.

So, if we set the limit to 1 like below, we can make sure the launched coroutines are executed one by one, right?

private val coroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

// job 1
coroutineScope.launch {
  println("Before delaying for 2000ms")
  delay(2000)
  println("After 2000ms")
}

// job 2
coroutineScope.launch {
  println("Before delaying for 1000ms")
  delay(1000)
  println("After 1000ms")
}

Unfortunately, the output might be something like this:

Before delaying for 2000ms
Before delaying for 1000ms
After 1000ms
After 2000ms

This is because limitedParallelism() limits parallelism, not concurrency.

In this case, when Job 1 is suspended after calling delay(2000), the dispatcher starts to execute Job 2, and eventually, Job 2 is finished before Job 1.

Similarly, the dispatchers created by e.g. ExecutorService.asCoroutineDispatcher() don’t limit the concurrency either.

Use a mutex

Fortunately, we can use Mutex or Semaphore to limit concurrency. For example, with the following code, we can guarantee the blocks locked by withLock() are executed one by one:

private val mutex = Mutex()

// Job 1
coroutineScope.launch {
  mutex.withLock {
    println("Before delaying for 2000ms")
    delay(2000)
    println("After 2000ms")
  }
}

// Job 2
coroutineScope.launch {
  mutex.withLock {
    println("Before delaying for 1000ms")
    delay(1000)
    println("After 1000ms")
  }
}

However, mutex and semaphore can only guarantee the blocks are executed one by one, but not in the order when the job is launched. This is because there is a race condition between the launch of coroutines and the time they are queued in the mutex.

Channel to the rescue

A Channel is conceptually similar to a BlockingQueue, but with support for suspend functions, making it a perfect candidate to ensure the execution order:

fun interface Task {
  suspend fun run()
}

class SequentialExecutor {
  private val coroutineScope = ...
  private val channelForSequentialExecution = Channel<Task>(capacity = ...)

  init {
    coroutineScope.launch {
      for (task in channelForSequentialExecution) {
        task.run()
      }
    }
  }

  fun enqueue(task: Task): Boolean {
    return channelForSequentialExecution.trySend(task).isSuccess
  }
}

Conclusion

Remember, limitedParallelism() limits parallelism, but not concurrency.


See also

comments powered by Disqus