skip to Main Content

Let say I have this code

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}

This would print

walk start
quack start
fly start
walk end
quack end
fly end

which I understand and expected. But what if I want those 3 Tasks run sequentially? Let say each Task is created by user pressing a button. I want the tasks to queue up in the background and run one by one. Is there any thing like you can queue up DispatchWorkItem in a DispatchQueue, but a Task version?


Edit:

I came up with a solution, but I am not sure if this is a good way to implement it. As this implementation potentially create many layer of cascaded Task, I wonder if there would be risk of stack overflow or memory leaks?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}

3

Answers


  1. Chosen as BEST ANSWER

    Update:

    For future people who find this post useful, I have created a swift package with better implementation and added support for queuing up AsyncThrowingStream too.

    https://github.com/rickymohk/SwiftTaskQueue


    Here is my updated implementation which I think is safer than the one I posted in the question. The TaskQueueActor part does all of the job, I wrap it with an outer class just to make it cleaner when calling from a non-async context.

    class TaskQueue{
        
        private actor TaskQueueActor{
            private var blocks : [() async -> Void] = []
            private var currentTask : Task<Void,Never>? = nil
            
            func addBlock(block:@escaping () async -> Void){
                blocks.append(block)
                next()
            }
            
            func next()
            {
                if(currentTask != nil) {
                    return
                }
                if(!blocks.isEmpty)
                {
                    let block = blocks.removeFirst()
                    currentTask = Task{
                        await block()
                        currentTask = nil
                        next()
                    }
                }
            }
        }
        private let taskQueueActor = TaskQueueActor()
        
        func dispatch(block:@escaping () async ->Void){
            Task{
                await taskQueueActor.addBlock(block: block)
            }
        }
    }
    

  2. I found this one on Github: https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift

    via

    https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18

    import Foundation
    
    public actor TaskQueue {
        private let concurrency: Int
        private var running: Int = 0
        private var queue = [CheckedContinuation<Void, Error>]()
    
        public init(concurrency: Int) {
            self.concurrency = concurrency
        }
    
        deinit {
            for continuation in queue {
                continuation.resume(throwing: CancellationError())
            }
        }
    
        public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
            try Task.checkCancellation()
    
            try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
                queue.append(continuation)
                tryRunEnqueued()
            }
    
            defer {
                running -= 1
                tryRunEnqueued()
            }
            try Task.checkCancellation()
            return try await operation()
        }
    
        private func tryRunEnqueued() {
            guard !queue.isEmpty else { return }
            guard running < concurrency else { return }
    
            running += 1
            let continuation = queue.removeFirst()
            continuation.resume()
        }
    }
    

    Seems to work

    @StateObject var taskQueue = TaskQueue(concurrency: 1)
    
                .task {
                    try? await taskQueue.enqueue {
                    //Task{
                        try? await Task.sleep(for: .seconds(1))
                        print("Done 1")
                    }
                    try? await taskQueue.enqueue {
                    //Task{
                        try? await Task.sleep(for: .seconds(1))
                        print("Done 2")
                    }
                    try? await taskQueue.enqueue {
                    //Task{
                        try? await Task.sleep(for: .seconds(1))
                        print("Done 3")
                    }
    
    Login or Signup to reply.
  3. I once was a proponent of the unstructured task approach, where each would await the prior one. In retrospect, this feels a bit brittle to me. Increasingly (with credit to Rob Napier for nudging me in this direction), I now use asynchronous sequences, specifically AsyncChannel from Apple’s swift-async-algorithms. I think it is a more robust behavior and is more consistent with the asynchronous sequences of modern Swift concurrency.

    Before we come to your example, consider this serial downloader, where we have one process (the user button clicking) send URL objects to another process monitoring the channel for URLs in a forawaitin loop:

    struct DownloadView: View {
        @StateObject var viewModel = DownloadViewModel()
    
        var body: some View {
            VStack {
                Button("1") { Task { await viewModel.appendDownload(1) } }
                Button("2") { Task { await viewModel.appendDownload(2) } }
                Button("3") { Task { await viewModel.appendDownload(3) } }
            }
            .task {
                await viewModel.monitorDownloadRequests()
            }
        }
    }
    
    @MainActor
    class DownloadViewModel: ObservableObject {
        private let session: URLSession = …
        private let baseUrl: URL = …
        private let folder: URL = …
        private let channel = AsyncChannel<URL>()   // note, we're sending URLs on this channel
    
        func monitorDownloadRequests() async {
            for await url in channel {
                await download(url)
            }
        }
    
        func appendDownload(_ index: Int) async {
            let url = baseUrl.appending(component: "(index).jpg")
            await channel.send(url)
        }
    
        func download(_ url: URL) async {
            do {
                let (location, _) = try await session.download(from: url)
                let fileUrl = folder.appending(component: url.lastPathComponent)
                try? FileManager.default.removeItem(at: fileUrl)
                try FileManager.default.moveItem(at: location, to: fileUrl)
            } catch {
                print(error)
            }
        }
    }
    

    We start monitorDownloadRequests and then append download requests to the channel.

    This performs the requests serially (because monitorDownloadRequests has a forawait loop). E.g., in Instruments’ “Points of Interest” tool, I have added some Ⓢ signposts where I clicked these buttons, and show intervals where the requests happen, and you can see that these three requests happen sequentially.

    enter image description here

    But the wonderful thing about channels is that they offer serial behaviors without introducing the problems of unstructured concurrency. They also handle cancelation automatically (if you want that behavior). If you cancel the forawaitin loop (which the .task {…} view modifier does for us automatically in SwiftUI when the view is dismissed). If you have a bunch of unstructured concurrency, with one Task awaiting the prior one, handling cancelation gets messy quickly.


    Now, in your case, you are asking about a more general queue, where you can await tasks. Well, you can have an AsyncChannel of closures:

    typealias AsyncClosure = () async -> Void
    
    let channel = AsyncChannel<AsyncClosure>()
    

    E.g.:

    typealias AsyncClosure = () async -> Void
    
    struct ExperimentView: View {
        @StateObject var viewModel = ExperimentViewModel()
    
        var body: some View {
            VStack {
                Button("Red")   { Task { await viewModel.addRed() } }
                Button("Green") { Task { await viewModel.addGreen() } }
                Button("Blue")  { Task { await viewModel.addBlue() } }
            }
            .task {
                await viewModel.monitorChannel()
            }
        }
    }
    
    @MainActor
    class ExperimentViewModel: ObservableObject {
        let channel = AsyncChannel<AsyncClosure>()
    
        func monitorChannel() async {
            for await task in channel {
                await task()
            }
        }
    
        func addRed() async {
            await channel.send { await self.red() }
        }
    
        func addGreen() async {
            await channel.send { await self.green() }
        }
    
        func addBlue() async {
            await channel.send { await self.blue() }
        }
    
        func red() async { … }
    
        func green() async { … }
    
        func blue() async { … }
    }
    

    That yields:

    enter image description here

    Here again, I am using Instruments to visualize what is going on. I clicked the “red”, “green”, and “blue” buttons quickly, in succession, twice. I then watched the six corresponding intervals for these three second tasks. I then repeated that six-click process a second time, but this time I dismissed the view in question before they finished, mid-way through the green task of the second series of button taps, illustrating the seamless cancelation capabilities of AsyncChannel (and asynchronous sequences in general).

    Now, I hope you forgive me, as I omitted the code to create all of these “Points of Interest” signposts and intervals, as it adds a lot of kruft that really is not relevant to the question at hand (but see this if you are interested). But hopefully these visualizations help illustrate what is going on.

    The take-home message is that AsyncChannel (and its sibling AsyncThrowingChannel) is a great way to remain within structured concurrency, but get serial (or constrained behavior, like shown at the end of this answer) that we used to get with queues, but with asynchronous tasks.

    I must confess that this latter AsyncClosure example, while it hopefully answers your question, feels a little forced to my eye. I have been using AsyncChannel for a few months now, and I personally always have a more concrete object being handled by the channel (e.g., URLs, GPS locations, image identifiers, etc.). This example with closures feels like it is trying just a little too hard to reproduce old fashioned dispatch/operation queue behaviors.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search