Structured Concurrency in action! (using Kotlin coroutines)

Rashanjyot Singh
ProAndroidDev
Published in
8 min readMay 22, 2021

--

Photo by Chester Alvarez on Unsplash

To get started, let’s see what ʻStructured Concurrency’ may look like in a real-world scenario:

Let’s say John decides to build a house. It requires completion of certain tasks such as bricklaying, carpentry, plumbing, electrical installations and roofing. While some of these tasks can overlap and be carried out concurrently, the house is said to be built completely only when all these tasks are completed.

Now, let’s say John runs out of budget in between for some reason and hence, decides to stop building the house further. In this case, any tasks in execution, be it bricklaying, carpentry or plumbing, would have to be halted.

This is the core concept of structured concurrency. It states that all sub-tasks shall complete before the completion of their parent task(s), i.e. no sub-task can outlive its parent task(s).

Definition (in context to programming):

“Structured concurrency” refers to a way to structure async/concurrent computations so that child operations are guaranteed to complete before their parents, i.e. no child operation is executed outside the scope of a parent operation.

Let’s have a look at the image below to understand the difference between structured and unstructured concurrency:

The benefit of structured concurrency is encapsulation. When a method is invoked to do a task, its caller should not care whether or not the method decomposes the work into several sub-tasks that are executed by hundreds or thousands of threads concurrently. When the method completes, it should guarantee that the requested work has been completed.

Before going ahead, please make sure that you understand:

  1. What are coroutines and how to create coroutines using coroutine builders in Kotlin?
  2. What aJob is and the different states a Jobcan be in?

(The above points contain a link each that can help you understand them.)

Kotlin coroutines & structured concurrency

In Kotlin, we can create coroutines using builders such aslaunch and async, which return aJob instance. This Job may further contain nested coroutine builders that create children Job instances that are computed concurrently.

Let’s see a simple code example of structured concurrency using the above-mentioned builders:

In the above example, the code within childJob executes and prints the value of count 3 times with a delay of 100 milliseconds between each, until the parentJob is cancelled after a total delay of 250 milliseconds, which in turn cancels the childJob. This is exactly what structured concurrency is all about.

Coroutine Job(s) have a parent-child relationship. All coroutine builders (such aslaunchand async) take in a CoroutineContext parameter, that contains an instance of a Job, to which the coroutine is bound, to enforce the discipline of structured concurrency with the propagation of cancellation.

fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
// ...
): Job

Understanding cancellation

Until so far, everything has been pretty straightforward. We saw a code sample depicting the cancellation of a parent Job, which recursively cancelled its child Job and in turn, halted the execution of code within the child Job.

⚠️ However, that may not always be the case and the code inside a coroutine may/may-not continue to execute even if cancel() is invoked on its Job instance.

Before discussing when and how that happens, let’s take note of the following two things:

  • Whether the execution of code in a coroutine should continue or not after cancel() is invoked, is a decision that lies in the hands of a developer, which will, in turn, depend on the use case.
  • Execution of code within a coroutine even after cancel() is invoked does not mean that structured concurrency is violated since the parent Job will still wait for the children Job(s) to reach a final state before reaching its final state. A Job is in a final state when its isCompleted = true and we can use join() in our code to wait for the completion of a Job before executing any further code.

Read more about states of a Job here.
⚠️️ Note carefully how cancellation is different from completion.

When a coroutine cancellation is invoked, theJob instance’s value for the flag isActive is changed to false. This value can be used to determine whether it should continue to execute further or not. It is the responsibility of the developer to account for the conditional check in the code, as per the use case.

P.S. — More often than not, use cases demand that coroutines discontinue execution on cancellation. However, this is not a rule and can vary from use case to use case.

Okay, now let’s get on with it and see this is in action. Here is another code sample that does the same thing as the one before, except that this time the code in coroutine is executed even after cancellation is invoked.

The only difference between the two implementations is that the first one uses delay(...) while the second one uses System.currentTimeMillis(). The code execution halts in the first case because delay(...) and various other kotlinx provided in-built suspending functions use suspendCancellableCoroutine, which internally checks and continues execution only if the flagisActiveis true.

💡Always check the internal implementation details or behaviour of an in-built suspending function before using it, to avoid unexpected results.

To convert the second example to behave similar to the first one, we can explicitly add the check for isActive as follows:

while (count <= 5 && isActive)

Hence, it is important to check whether a Job is active or not, to avoid long-running operations from running even when they are no longer needed.

Parallel Decomposition

We have seen how the parent-child relationship is handled between Job instances. However, an interesting point to note is the support for parallel decomposition when multiple children Job instances are running concurrently within a parent Job. Let’s look at the code below:

suspend fun performTasks(task1: String, task2: String) {
val job1 = async { doTask(task1) }
val
job2 = async { doTask(task2) }
mergeTasks(job1.await(), job2.await())
}

This code does not compile and we’ll quickly see why!

Let’s say that the first task, that is, job1 fails and throws the corresponding exception, then ideally job2 should also not keep working in the background because its result is no longer needed. This is what parallel decomposition is all about.

In Kotlin, every coroutine must be created from aCoroutineScope, else compilation would fail just as in the example above. Thus, for the same reason, any coroutine builder is defined as an extension function of CoroutineScope. For instance,launch coroutine builder is defined as:

fun CoroutineScope.launch(
// ...
block: suspend CoroutineScope.() -> Unit
): Job

There are two ways to provideCoroutineScope to a Job :

  • The Job can be directly written and executed within the lambda block of anotherJob, to implicitly use it as its scope. For instance:
val parentJob = launch {
val childJob = launch { ... }
}
  • In a suspending function, you can wrap your code into a
    coroutineScope { … } block that establishes a boundary of your operation, its scope.
suspend fun performTasks(task1: String, task2: String) {
coroutineScope {
val job1 = async { doTask(task1) }
val job2 = async { doTask(task2) }
mergeTasks(job1.await(), job2.await())
}
}

As we can see from the above two ways, all the coroutines will implicitly become the children of the scope they are called from, and if the scope fails with an exception or is cancelled, all the children are cancelled too.

CoroutineContext vs CoroutineScope

Did you notice the emphasis on the word — `implicitly`? It is done so because a coroutine may explicitly define a CoroutineContext, instead of implicitly being bound to the CoroutineContext of the CoroutineScope it is being called from.

This sounds confusing, doesn’t it? Let’s simplify this by defining CoroutineContext and CoroutineScope.

CoroutineContext: Every coroutine in Kotlin has a context that is represented by an instance of CoroutineContext interface. A context is a set of elements and is responsible for the coroutine’s lifecycle, cancellation, and parent-child relations.

We have observed earlier in this blog that coroutine Job(s) have a parent-child relationship. All coroutine builders (such aslaunchand async) take in a CoroutineContext parameter, that contains an instance of a Job, to which the coroutine is bound to, in order to enforce the discipline of structured concurrency with the propagation of cancellation. For instance:

fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
// ...
): Job

CoroutineScope: It is just a wrapper around the context, that is, it is an interface that consists of a sole property — val coroutineContext: CoroutineContext. It has nothing else but context.

public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

So, why do they both exist and how do they differ? The difference between a CoroutineContext and CoroutineScope is in their intended purpose.

The intended purpose of theCoroutineScope receiver is to reference a scope in which a new coroutine is launched.

On the other hand, the intended purpose of theCoroutineContext parameter is to provide elements that are responsible for the coroutine’s lifecycle, cancellation, and parent-child relations.

Important to know!By convention, the CoroutineContext present in theCoroutineScope contains a Job that is going to become the parent of a new coroutine, unless any other CoroutineContext containing some other Job instance is explicitly defined. We do not usually pass a Job in a context parameter to launch, since that breaks the parent-child relationship and hence breaks structured concurrency, unless we explicitly and consciously want to break it for some reason/use-case.

Let’s have a look at a code snippet wherein a coroutine is created from a CoroutineScope but runs in a CoroutineContext other than that provided by the CoroutineScope it was created from and hence, the parent-child relationship is broken. It does so by explicitly defining the context as a parameter to the launch coroutine builder:

⚠️ Notice how the childJob continues to execute even after completion ofparentJob. It is because parentJob is the CoroutineScope that executes childJob and parallelJob is the CoroutineContext to which the lifecycle of the childJob is bound.

Therefore, ensure that your coroutine is bound to the correct CoroutineContext and CoroutineScope to avoid unexpected behaviour.

💡If you’re curious and wish to dive deeper into CoroutineContext and CoroutineScope, here’s a great blog.

Great! you’ve just aced Structured Concurrency using Kotlin coroutines. Thanks for making it till here and 👏 if this helped you.

Special thanks to Nishant Shah for sharing his learnings and helping me on this topic.

Feel free to reach out to me if you have any questions, suggestions or ideas.

LinkedIn | Github | Email — rashanjyotg@gmail.com

--

--