How Kotlin Coroutines Work Internally(Continuation Implementation On JVM) - Part-2.4
Welcome to the fifth articles of my series “How Kotlin Coroutines Work Internally.” If you haven’t had the opportunity to explore the…
Welcome to the fifth articles of my series “How Kotlin Coroutines Work Internally.” If you haven’t had the opportunity to explore the previous articles, you can find the first article here, the second article here, the third article here and the fourth article here.
In this article, I will explore the implementation of continuation classes on the JVM, specifically focusing on BaseContinuationImpl, ContinuationImpl, RestrictedContinuationImpl, CompletedContinuation, RestrictedSuspendLambda, SuspendFunction, and SuspendLambda. These continuation implementations are crucial for managing suspension and resumption in Kotlin's coroutines on the JVM. By delving into these classes, we will gain a deeper understanding of how the JVM handles coroutine execution and state management. If you’re interested in examining the actual source code, you can find it here.
BaseContinuationImpl
This class serves a critical role in managing the resumption and initiation of suspended coroutines. All suspended lambdas inherit indirectly from this class. It defines a single concrete method, the resumeWith function, while the remaining methods are abstract, requiring implementation by subclasses. Let’s delve into its implementation:
internal abstract class BaseContinuationImpl(
val completion: Continuation<Any?>?
) : Continuation<Any?> {
override fun resumeWith(result: Result<Any?>) {
var current = this //1
var param = result //2
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> = try {
val outcome = invokeSuspend(param)
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
open fun create(completion: Continuation<Any?>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
open fun create(value: Any?, completion: Continuation<Any?>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
}- Constructor: The constructor takes a
completionparameter of typeContinuation<Any?>?, representing the continuation object of the caller—essentially, the suspend function that invoked it. Once the called suspend function completes, thecompletion.resumeWithmethod is invoked with the result, signaling the completion of the called suspend function. In this manner, the execution progresses through the chain of continuations, eventually reaching the topmost continuation. resumeWithMethod: This method is responsible for resuming the execution of the current suspended function. If the current suspend function completes, it further notifies the parent continuation, allowing the continuation chain to proceed accordingly.invokeSuspendFunction: This method is where the core logic of the suspension is transferred and executed. It contains the actual code that gets invoked when the coroutine is suspended and resumed.- create Function: This method comes in two variants, both responsible for the creation of instances of this class. Each variant plays a key role in the initialization process, setting up the necessary state and behavior for the coroutine’s execution.
releaseInterceptedFunction: As you may recall, in theContinuationInterceptor, there's a method calledreleaseInterceptedContinuationthat must be invoked when aContinuationInterceptoris present in the continuation context. This function is responsible for ensuring that the interceptor is properly released, maintaining the integrity of the continuation chain and managing resources efficiently.
Working of resumedWith Function
At line 1, the object itself is stored in the variable current, and at line 2, the result is stored in the variable param. It then enters a while loop, where the invokeSuspend function is called within a try-catch block. If the result is COROUTINE_SUSPENDED (a special enum value that indicates the coroutine is suspended), the function simply returns.
Otherwise, the result is stored in the outcome variable. Following this, the releaseIntercepted function is invoked (as previously explained). Afterward, it checks whether the completion is an instance of BaseContinuationImpl. If it is, the variables current and param are updated with completion and outcome, respectively, allowing the loop to continue.
If not, the resumeWith method is called. The reasoning behind this is that this class is internal, meaning the topmost continuation will never be an object of this class. This ensures the correct resumption of the topmost continuation in the chain.
ContinuationImpl
This class is utilized for creating custom continuations within suspend functions. It inherits from BaseContinuationImpl and implements the logic for releaseIntercepted. Let’s examine its implementation details.
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
override val context: CoroutineContext
get() = _context!!
private var intercepted: Continuation<Any?>? = null
fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation
}
}- Constructor: This constructor is similar to that of
BaseContinuationImpl, with the addition of an extra parameter,context, of typeCoroutineContext?. There is also a secondary constructor that takes a single parameter,completion, of typeContinuation<Any?>?. This secondary constructor calls the primary constructor, passingcompletionalong with the context of the completion. interceptedFunction: This function invokes theinterceptContinuationmethod of theContinuationInterceptorcontext, provided it exists within the context of theCoroutineContext. The result of this call is then assigned to theinterceptedvariable, effectively storing the intercepted continuation for further use.releaseInterceptedFunction: This function first checks whether a value was previously assigned to theinterceptedvariable. If so, it invokes thereleaseInterceptedContinuationfunction of the correspondingContinuationInterceptor. This ensures that any resources associated with the intercepted continuation are properly released, maintaining the integrity of the continuation chain. After callingreleaseInterceptedContinuation, it stores the completed continuation in theinterceptedvariable, marking the completion of the intercepted continuation. Here is the implementation ofCompletedContinuation, which is self-explanatory:
object CompletedContinuation : Continuation<Any?> {
override val context: CoroutineContext
get() = error("This continuation is already complete")
override fun resumeWith(result: Result<Any?>) {
error("This continuation is already complete")
}
override fun toString(): String = "This continuation is already complete"
}So far, I have explained the resumption of continuations and the interception of continuations.
SuspendFunction ,RestrictedContinuationImpl, RestrictedSuspendLambda and SuspendLambda
SuspendFunction, RestrictedSuspendLambda, RestrictedContinuationImpland SuspendLambda are interfaces or classes that add metadata to suspend functions. Here are the implementation details, which are self-explanatory:
internal abstract class RestrictedContinuationImpl(
completion: Continuation<Any?>?
) : BaseContinuationImpl(completion) {
init {
completion?.let {
require(it.context === EmptyCoroutineContext) {
"Coroutines with restricted suspension must have EmptyCoroutineContext"
}
}
}
public override val context: CoroutineContext
get() = EmptyCoroutineContext
}
// To distinguish suspend function types from ordinary function types all suspend function types shall implement this interface
internal interface SuspendFunction
@SinceKotlin("1.3")
// Restricted suspension lambdas inherit from this class
internal abstract class RestrictedSuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : RestrictedContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
@SinceKotlin("1.3")
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}That concludes this discussion. In the next article, we will delve into compiler support, which will provide a clearer understanding of their workings. These classes are crucial for grasping the concepts, so I chose to discuss them first.
If you have any questions, feel free to ask by replying! If you enjoyed the article, please give it a clap! Don’t forget to follow me on LinkedIn and Twitter for more updates!