skip to Main Content

I am calling a function multiple times one after another and am trying to synchronise it so it happens serially. The issue is, the function contains a Task, which executes in parallel. I have serialized the internals of my Task using help from this post. Here’s an example of my code:

class MyClass {
    let taskSerializer = SerialTasks<Void>()

    func viewDidLoad() {
        someFunction(1)
        someFunction(2)
        someFunction(3)
    }

    func someFunction(_ value: Int) {
        Task {
            try await taskSerializer.add {
                try? await self.myAsyncFunction(value: value)
            }
        }
    }

    private func myAsyncFunction(value: Int) async throws {
        print(value)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
        let task = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
        previousTask = task
        return try await task.value
    }
}

I expect to see 1, 2, 3 printed in order to the console. However, this is not always the case as sometimes 3 arrives to the taskSerializer before 2. How can I fix this?

UPDATE:

Replacing the code inside someFunction with this fixes the issue:


queue.async { [weak self] in
    guard let self = self else {
        return
    }
    self.semaphore.wait()
    Task {
        try? await self.myAsyncFunction(value: value)
        self.semaphore.signal()
    }
}

UPDATE 2:

I’ve tried an AsyncStream. However, this still prints elements in the wrong order sometimes, e.g. 1, 3, 2

class MyClass {
    lazy var stream: AsyncStream<Task<Void, Never>> = {
        AsyncStream { continuation in
            self.continuation = continuation
        }
    }()
    var continuation: AsyncStream<Task<Void, Never>>.Continuation?

    func viewDidLoad() {
        Task {
            for await tasks in stream {
                await tasks.value
            }
        }

        someFunction(1)
        someFunction(2)
        someFunction(3)
    }

    func someFunction(_ value: Int) {
        continuation?.yield(
            Task {
                try? await self.myAsyncFunction(value: value)
            }
        )
    }

    private func myAsyncFunction(value: Int) async throws {
        print(value)
    }
}




But apparently this is a bad idea to mix semaphores with Tasks.

2

Answers


  1. Chosen as BEST ANSWER

    Okay I think I solved this using a custom TaskQueue class. This way all tasks are added into an array and executed sequentially, calling a completion block after the task completes which gets the next task from the queue.

    class TaskQueue {
        private var taskQueue: [() async -> Void] = []
        private var queue = DispatchQueue(label: "myqueue")
    
        func addTask(task: @escaping () async -> Void) {
            queue.async { [weak self] in
              guard let self = self else {
                return
              }
              // Add the task to the queue
              self.taskQueue.append(task)
              // If there's only one task in the queue, start executing it
              if self.taskQueue.count == 1 {
                self.executeNextTask()
              }
            }
          }
    
          private func executeNextTask() {
            queue.async { [weak self] in
              guard let self = self else {
                return
              }
              // Check if there are tasks in the queue
              if taskQueue.isEmpty {
                return // No more tasks to execute
              }
    
              // Get the next task from the queue
              let nextTask = taskQueue.removeFirst()
    
              // Execute the task
              Task {
                await nextTask()
                // After the task completes, recursively execute the next task
                self.executeNextTask()
              }
            }
          }
      }
    
    ...
    
    taskQueue.addTask {
       try? await self.myAsyncFunction(value: value)
    }
    

  2. The issue is not SerialTasks. The problem is that the caller, someFunction, itself, is launching a series of unstructured concurrency tasks with Task {…} before it even gets to SerialTasks. Those tasks created by someFunction may not run in a strictly FIFO order and (unless you launched them from an actor-isolated context) they will run in parallel.

    So, SerialTasks will run them sequentially in the order it receives them, but you are not adding them in order. Once you start adding tasks to SerialTasks in the wrong order, it is too late.

    If you are launching a series of independent unstructured concurrency tasks to run in a FIFO manner, that is where you need to adopt the “await prior task” pattern illustrated by SerialTasks.

    @MainActor
    class MyClass2 {
        var previousTask: Task<Void, Error>?
    
        func startFifoSeries() {
            for i in 0 ..< 1_000 {
                someFunctionFifo(i)
            }
            print(#function, "done")
        }
    
        func someFunctionFifo(_ value: Int) {
            previousTask = Task { [previousTask] in
                _ = try await previousTask?.value
                try await self.myAsyncFunction(value: value)
            }
        }
    
        var previousValue: Int?
    
        private func myAsyncFunction(value: Int) async throws {
            if let previousValue, value < previousValue {
                print(value, "is less than previous value", previousValue)
            }
            previousValue = value
            …
        }
    }
    

    The bottom line is that the “await prior task” pattern is one way to ensure that they run in order and not in parallel. There are other patterns, too (e.g., an AsyncSequence such as AsyncChannel). (As an aside, I isolated MyClass2 to the main actor, but that is not necessary. You can obviously just make it an actor, itself, or isolate it to whatever actor you want. Just make sure it is actor-isolated if you are accessing properties.)


    For the sake of future readers, I would be very wary of using unstructured concurrency without adding cancelation handling (or use a pattern that handles cancelation for you). See Make tasks in Swift concurrency run serially.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search