[swift-evolution] Contextualizing async coroutines

Brent Royal-Gordon brent at architechies.com
Sat Sep 2 21:50:52 CDT 2017


> On Sep 2, 2017, at 2:56 AM, Pierre Habouzit <phabouzit at apple.com> wrote:
> 
>> `onResume` hooks seem like a really good way to, essentially, allow arbitrary concurrency primitives to be passed into `async` functions. My main question is, if we have it, why do we need coroutine contexts? It seems to me that anything the coroutine might do with the context could either be passed into its parameters, or encapsulated in its `onResume` hook.
> 
> No it's not the same. Arbitrary code is this: arbitrary code and data.
> 
> Please read the few mails I sent recently about this, but to recap here quickly:
> 
> It is needed for the runtime (in a broad sense, from language to the operating system) to be able to introspect these:
> - priority attributes
> - dependencies
> - execution contexts (thread/queue/runloop/...)
> - ownership
> 
> Without this info, the scheduling of these coroutines will essentially be random, subject to priority inversions and other similar issues.

I will freely admit that I don't understand all of these details, so in lieu of rebutting this, I will simply state what I'm saying more explicitly and ask you to explain why I'm wrong in smaller words. :^)

Basically, what I'm saying is: Why do the context details need to be available *within the async function*, rather than being available only to the resume hook?

For a GCD example, suppose the normal, C-based `dispatch_async` function is exposed to Swift as `__dispatch_async`, and `beginAsync` has a signature like this:

	// I don't think `rethrows` works this way, but pretend it did.
	//
	/// Starts running an asynchronous function which is started and restarted by `resumeHook`.
	/// 
	/// - Parameter body: The async function to run.
	/// - Parameter resumeHook: A function called once for each time `body` starts or resumes running. 
	/// 			It is passed a `continuation` function representing the next synchronous chunk of 
	///			`body`, which it should run (or schedule to run). If the `continuation` throws or returns 
	///			a non-`nil` value, the function has terminated, and the result should be handled 
	///			appropriately. If the `continuation` returns `nil`, then it has not finished executing.
	func beginAsync<Return>(do body: () async throws -> Return, startingWith resumeHook: @escaping (_ continuation: @escaping () rethrows -> Return?) -> Void) { … }

You can then write async-function-handling versions of `async` like:

	extension DispatchQueue {
		// This version runs a nullary, non-throwing, Void async function, and can be called from non-async code.
		func async(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async -> Void) {
			beginAsync(do: body, startingWith: { continuation in
				let workItem = DispatchWorkItem(qos: qos, flags: flags) {
					_ = continuation()
				}
				__dispatch_async(self, workItem)
			})
		}
		
		// This version runs any (nullary) async function, and can be called from async code.
		func async<Return>(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async throws -> Return) async rethrows -> Return {
			return try await suspendAsync { successHandler, failureHandler in
				beginAsync(do: body, startingWith: { continuation in
					let workItem = DispatchWorkItem(qos: qos, flags: flags)  {
						do {
							if let returnValue = try continuation() {
								successHandler(returnValue)
							}
						}
						catch {
							failureHandler(returnValue)
						}
					}
					__dispatch_async(self, workItem)
				})
			}
		}
	}

Information like the QoS is encapsulated by the closure, so that each time it enqueues another chunk of work, it attaches that information to it. Is that good enough? Or do you need more?

	* * *

I *think* you might be saying that, when GCD wants to run a given async block, it wants to be able to look ahead to where the `successHandler` will want to run so it can schedule the first block on a thread that will be able to immediately run the `successHandler`. But if so, that still only requires `suspendAsync` to extract the context and pass it to its parameter—it doesn't require arbitrary code running when the function is *not* suspended to access the context.

You could perhaps imagine the standard library providing these declarations:

	protocol AsyncContext {
		func resumeAsync(_ resumePoint: @escaping () -> Void)
	}
	struct AsyncContinuation<Returning, Throwing: Error> {
		// These are notionally packaged up here, although it might actually be implemented differently.
		private let successHandler: (Returning) -> Void
		private let failureHandler: (Throwing) -> Void
		
		func resumeAsync(in context: AsyncContext, returning value: Returning) {
			context.resumeAsync { successHandler(value) }
		}
		func resumeAsync(in context: AsyncContext, throwing error: Throwing) {
			context.resumeAsync { failureHandler(error) }
		}
	}
	func beginAsync(in context: AsyncContext, do body: () async -> Void) { … }
	func suspendAsync<Returning, Throwing>(_ handler: (AsyncContext, AsyncContinuation<Returning, Throwing>) -> Void) async throws<Throwing> -> Returning { … }

Then GCD could do something like:

	extension DispatchQueue {
		struct Context: AsyncContext {
			let queue: DispatchQueue
			let qos: DispatchQoS
			let flags: DispatchFlags
			let nextContext: Context?

			func resumeAsync(_ resumePoint: @escaping () -> Void) {
				let workItem = DispatchWorkItem(qos: qos, flags: flags, block: resumePoint)
				__dispatch_async_with_next_queue(queue, workItem, nextContext?.queue)
			}
		}
		
		// This version runs a nullary, non-throwing, Void async function, and can be called from non-async code.
		func async(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async -> Void) {
			beginAsync(in: Context(queue: self, qos: qos, flags: flags, nextContext: nil), do: body)
		}
		
		// This version runs any (nullary) async function, and can be called from async code.
		func async<Return>(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async throws -> Return) async rethrows -> Return {
			return try await suspendAsync { context, continuation in
				let newContext = Context(queue: self, qos: qos, flags: flags, nextContext: context as? DispatchQueue.Context)
				
				beginAsync(in: newContext) {
					do {
						continuation.resumeAsync(in: context, returning: try await body())
					}
					catch {
						continuation.resumeAsync(in: context, throwing: error)
					}
				}
			}
		}
	}

This allows GCD to look arbitrarily deep into the future, but the context can only be inspected at suspension points; it's otherwise encapsulated. The context is also now in control of execution, rather than being some passive data that may or may not be present and may or may not have any particular meaning.

	* * *

Actually, looking at this, it seems to me that `beginAsync(in:do:)` internally just creates a continuation for the beginning of an async function and resumes it. With a small language feature addition, we can have this in the standard library:

	protocol AsyncContext {
		// …as before…
	}
	struct AsyncContinuation<Returning, Throwing: Error> {
		// …as before…
	}
	extension AsyncContinuation where Throwing == Never {
		init(starting function: (#splat(Returning)) async -> Void) { … }
	}
	func suspendAsync<Returning, Throwing>(_ handler: (AsyncContext, AsyncContinuation<Returning, Throwing>) -> Void) async throws<Throwing> -> Returning { … }	

And this in GCD:

	extension DispatchQueue {
		struct Context: AsyncContext {
			let queue: DispatchQueue
			let qos: DispatchQoS
			let flags: DispatchFlags
			let nextContext: Context?

			func resumeAsync(_ resumePoint: @escaping () -> Void) {
				let workItem = DispatchWorkItem(qos: qos, flags: flags, block: resumePoint)
				__dispatch_async_with_next_queue(queue, workItem, nextContext?.queue)
			}
		}
		
		// This version runs a nullary, non-throwing, Void async function, and can be called from non-async code.
		func async(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async -> Void) {
			let context = Context(queue: self, qos: qos, flags: flags, nextContext: nil)
			let starter = AsyncContinuation(starting: body)
			starter.resumeAsync(in: context, returning: ())
		}
		
		// This version runs any (nullary) async function, and can be called from async code.
		func async<Return>(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async throws -> Return) async rethrows -> Return {
			return try await suspendAsync { context, continuation in
				let newContext = Context(queue: self, qos: qos, flags: flags, nextContext: context as? DispatchQueue.Context)
				let starter = AsyncContinuation(starting: {
					do {
						continuation.resumeAsync(in: context, returning: try await body())
					}
					catch {
						continuation.resumeAsync(in: context, throwing: error)
					}
				})
				starter.resumeAsync(in: newContext, returning: ())
			}
		}
	}

We could even encapsulate the second version's chaining logic in `AsyncContinuation`:

	extension AsyncContinuation where Throwing == Never {
		init<StarterReturning, StarterThrowing>(starting starter: (#splat(Returning)) async throws<StarterThrowing> -> StarterReturning, returningTo continuation: AsyncContinuation<StarterReturning, StarterThrowing>, in returningContext: AsyncContext) {
			self.init(starting: {
				do {
					continuation.resumeAsync(in: returningContext, returning: try await starter())
				}
				catch {
					continuation.resumeAsync(in: returningContext, throwing: error)
				}
			})
		}
	}
	
	extension DispatchQueue {
		// …as before…
		
		// This version runs any (nullary) async function, and can be called from async code.
		func async<Return>(qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: () async throws -> Return) async rethrows -> Return {
			return try await suspendAsync { context, continuation in
				let newContext = Context(queue: self, qos: qos, flags: flags, nextContext: context as? DispatchQueue.Context)
				let starter = AsyncContinuation(starting: body, returningTo: continuation, in: context)
				starter.resumeAsync(in: newContext, returning: ())
			}
		}
	}

Make `suspendAsync` a class method on `AsyncContinuation` and we've pretty much walled off all these low-level guts in a single type!

(P.S. Should the `AsyncContext` be a public property of the continuation? Maybe—that would make it harder to accidentally run continuations in the wrong context.)

-- 
Brent Royal-Gordon
Architechies



More information about the swift-evolution mailing list