How Kotlin Coroutines Work Internally(Continuation Implementation On JVM) - Part-2.4

Welcome to the fifth articles of my series “How Kotlin Coroutines Work Internally.” If you haven’t had the opportunity to explore the…

How Kotlin Coroutines Work Internally(Continuation Implementation On JVM) - Part-2.4

Welcome to the fifth articles of my series “How Kotlin Coroutines Work Internally.” If you haven’t had the opportunity to explore the previous articles, you can find the first article here, the second article here, the third article here and the fourth article here.

In this article, I will explore the implementation of continuation classes on the JVM, specifically focusing on BaseContinuationImpl, ContinuationImpl, RestrictedContinuationImpl, CompletedContinuation, RestrictedSuspendLambda, SuspendFunction, and SuspendLambda. These continuation implementations are crucial for managing suspension and resumption in Kotlin's coroutines on the JVM. By delving into these classes, we will gain a deeper understanding of how the JVM handles coroutine execution and state management. If you’re interested in examining the actual source code, you can find it here.

BaseContinuationImpl

This class serves a critical role in managing the resumption and initiation of suspended coroutines. All suspended lambdas inherit indirectly from this class. It defines a single concrete method, the resumeWith function, while the remaining methods are abstract, requiring implementation by subclasses. Let’s delve into its implementation:

internal abstract class BaseContinuationImpl( 
    val completion: Continuation<Any?>? 
) : Continuation<Any?> { 
 
 
    override fun resumeWith(result: Result<Any?>) { 
 
        var current = this //1 
        var param = result //2 
        while (true) {  
 
            with(current) { 
                val completion = completion!! 
                val outcome: Result<Any?> = try { 
                    val outcome = invokeSuspend(param) 
                    if (outcome == COROUTINE_SUSPENDED) return 
                    Result.success(outcome) 
                } catch (exception: Throwable) { 
                    Result.failure(exception) 
                } 
                releaseIntercepted() 
                if (completion is BaseContinuationImpl) { 
                    current = completion 
                    param = outcome 
                } else { 
                    completion.resumeWith(outcome) 
                    return 
                } 
 
            } 
 
        } 
 
 
    } 
 
    protected abstract fun invokeSuspend(result: Result<Any?>): Any? 
 
    protected open fun releaseIntercepted() { 
        // does nothing here, overridden in ContinuationImpl 
    } 
 
    open fun create(completion: Continuation<Any?>): Continuation<Unit> { 
        throw UnsupportedOperationException("create(Continuation) has not been overridden") 
    } 
 
    open fun create(value: Any?, completion: Continuation<Any?>): Continuation<Unit> { 
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") 
    } 
 
}
  • Constructor: The constructor takes a completion parameter of type Continuation<Any?>?, representing the continuation object of the caller—essentially, the suspend function that invoked it. Once the called suspend function completes, the completion.resumeWith method is invoked with the result, signaling the completion of the called suspend function. In this manner, the execution progresses through the chain of continuations, eventually reaching the topmost continuation.
  • resumeWith Method: This method is responsible for resuming the execution of the current suspended function. If the current suspend function completes, it further notifies the parent continuation, allowing the continuation chain to proceed accordingly.
  • invokeSuspend Function: This method is where the core logic of the suspension is transferred and executed. It contains the actual code that gets invoked when the coroutine is suspended and resumed.
  • create Function: This method comes in two variants, both responsible for the creation of instances of this class. Each variant plays a key role in the initialization process, setting up the necessary state and behavior for the coroutine’s execution.
  • releaseIntercepted Function: As you may recall, in the ContinuationInterceptor, there's a method called releaseInterceptedContinuation that must be invoked when a ContinuationInterceptor is present in the continuation context. This function is responsible for ensuring that the interceptor is properly released, maintaining the integrity of the continuation chain and managing resources efficiently.

Working of resumedWith Function

At line 1, the object itself is stored in the variable current, and at line 2, the result is stored in the variable param. It then enters a while loop, where the invokeSuspend function is called within a try-catch block. If the result is COROUTINE_SUSPENDED (a special enum value that indicates the coroutine is suspended), the function simply returns.

Otherwise, the result is stored in the outcome variable. Following this, the releaseIntercepted function is invoked (as previously explained). Afterward, it checks whether the completion is an instance of BaseContinuationImpl. If it is, the variables current and param are updated with completion and outcome, respectively, allowing the loop to continue.

If not, the resumeWith method is called. The reasoning behind this is that this class is internal, meaning the topmost continuation will never be an object of this class. This ensures the correct resumption of the topmost continuation in the chain.

ContinuationImpl

This class is utilized for creating custom continuations within suspend functions. It inherits from BaseContinuationImpl and implements the logic for releaseIntercepted. Let’s examine its implementation details.

internal abstract class ContinuationImpl( 
    completion: Continuation<Any?>?, 
    private val _context: CoroutineContext? 
) : BaseContinuationImpl(completion) { 
 
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) 
 
    override val context: CoroutineContext 
        get() = _context!! 
 
 
    private var intercepted: Continuation<Any?>? = null 
 
    fun intercepted(): Continuation<Any?> = 
        intercepted 
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) 
                .also { intercepted = it } 
 
    override fun releaseIntercepted() { 
        val intercepted = intercepted 
        if (intercepted != null && intercepted !== this) { 
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted) 
        } 
        this.intercepted = CompletedContinuation 
    } 
 
 
}
  • Constructor: This constructor is similar to that of BaseContinuationImpl, with the addition of an extra parameter, context, of type CoroutineContext?. There is also a secondary constructor that takes a single parameter, completion, of type Continuation<Any?>?. This secondary constructor calls the primary constructor, passing completion along with the context of the completion.
  • intercepted Function: This function invokes the interceptContinuation method of the ContinuationInterceptor context, provided it exists within the context of the CoroutineContext. The result of this call is then assigned to the intercepted variable, effectively storing the intercepted continuation for further use.
  • releaseIntercepted Function: This function first checks whether a value was previously assigned to the intercepted variable. If so, it invokes the releaseInterceptedContinuation function of the corresponding ContinuationInterceptor. This ensures that any resources associated with the intercepted continuation are properly released, maintaining the integrity of the continuation chain. After calling releaseInterceptedContinuation, it stores the completed continuation in the intercepted variable, marking the completion of the intercepted continuation. Here is the implementation of CompletedContinuation, which is self-explanatory:
object CompletedContinuation : Continuation<Any?> { 
    override val context: CoroutineContext 
        get() = error("This continuation is already complete") 
 
    override fun resumeWith(result: Result<Any?>) { 
        error("This continuation is already complete") 
    } 
 
    override fun toString(): String = "This continuation is already complete" 
}

So far, I have explained the resumption of continuations and the interception of continuations.

SuspendFunction ,RestrictedContinuationImpl, RestrictedSuspendLambda and SuspendLambda

SuspendFunction, RestrictedSuspendLambda, RestrictedContinuationImpland SuspendLambda are interfaces or classes that add metadata to suspend functions. Here are the implementation details, which are self-explanatory:

internal abstract class RestrictedContinuationImpl( 
    completion: Continuation<Any?>? 
) : BaseContinuationImpl(completion) { 
    init { 
        completion?.let { 
            require(it.context === EmptyCoroutineContext) { 
                "Coroutines with restricted suspension must have EmptyCoroutineContext" 
            } 
        } 
    } 
 
    public override val context: CoroutineContext 
        get() = EmptyCoroutineContext 
} 
// To distinguish suspend function types from ordinary function types all suspend function types shall implement this interface 
internal interface SuspendFunction 
 
@SinceKotlin("1.3") 
// Restricted suspension lambdas inherit from this class 
internal abstract class RestrictedSuspendLambda( 
    public override val arity: Int, 
    completion: Continuation<Any?>? 
) : RestrictedContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction { 
    constructor(arity: Int) : this(arity, null) 
 
    public override fun toString(): String = 
        if (completion == null) 
            Reflection.renderLambdaToString(this) // this is lambda 
        else 
            super.toString() // this is continuation 
} 
 
@SinceKotlin("1.3") 
// Suspension lambdas inherit from this class 
internal abstract class SuspendLambda( 
    public override val arity: Int, 
    completion: Continuation<Any?>? 
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction { 
    constructor(arity: Int) : this(arity, null) 
 
    public override fun toString(): String = 
        if (completion == null) 
            Reflection.renderLambdaToString(this) // this is lambda 
        else 
            super.toString() // this is continuation 
}

That concludes this discussion. In the next article, we will delve into compiler support, which will provide a clearer understanding of their workings. These classes are crucial for grasping the concepts, so I chose to discuss them first.

If you have any questions, feel free to ask by replying! If you enjoyed the article, please give it a clap! Don’t forget to follow me on LinkedIn and Twitter for more updates!