skip to Main Content

In my Swift 5 project I have this extension to pass an async function to Publisher.map:

import Combine

public extension Publisher {
    func asyncMap<T>(
        _ asyncFunc: @escaping (Output) async -> T
    ) -> Publishers.FlatMap<Future<T, Never>, Self> {
        flatMap { value in
            Future { promise in
                Task {
                    let result = await asyncFunc(value)
                    promise(.success(result))
                }
            }
        }
    }
}

However, I cannot compile this in Xcode 16.1 beta using Swift 6, getting:

"Task-isolated value of type ‘() async -> ()’ passed as a strongly transferred parameter; later accesses could race".

Is it possible to migrate this extension to Swift 6 somehow? Already tried to add Sendable and @Sendable everywhere.

2

Answers


  1. If asyncMap is required to run on @MainActor, and T is Sendable, then it’s straightforward and will just work:

    public extension Publisher {
        @MainActor func asyncMap<T: Sendable>(
            _ asyncFunc: @escaping (Output) async -> T
        ) -> Publishers.FlatMap<Future<T, Never>, Self> {
            return flatMap { value in
                Future { promise in
                    Task {
                        let result = await asyncFunc(value)
                        promise(.success(result))
                    }
                }
            }
        }
    }
    

    If it is not…I don’t believe this is possible without using the universal escape hatch (I’ll show it in a moment). The problem is that Future does not mark its promise parameter is @Sendable or sending or anything. It doesn’t promise not to hold onto it and run it at some random point itself, and maybe it has side-effects which could cause a race. It doesn’t do that, but it’s not promised. And I don’t think there’s any way to really fix that without Apple updating Combine (which they seem to have mostly abandoned a this point).

    There is always the universal escape hatch: @unchecked Sendable, and coupled with marking just about every other thing @Sendable will make this work:

    struct UncheckedSendable<T>: @unchecked Sendable {
        let unwrap: T
        init(_ value: T) { unwrap = value}
    }
    
    public extension Publisher where Output: Sendable {
        func asyncMap<T: Sendable>(
            _ asyncFunc: @escaping @Sendable (Output) async -> T
        ) -> Publishers.FlatMap<Future<T, Never>, Self> {
            flatMap { value in
                Future { promise in
                    let promise = UncheckedSendable(promise)
                    Task {
                        let result = await asyncFunc(value)
                        promise.unwrap(.success(result))
                    }
                }
            }
        }
    }
    

    I’m sorry.

    Given what we know of Future, I believe this is actually safe. Lots of things Swift 6 calls unsafe are in fact unsafe, and we just kind of ignore it because "that won’t happen" (narrator: sometimes it happens). But I believe we do know that this is safe. Apple just hasn’t updated Combine to mark it.


    Or…. we could also reimplement Future:

    @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
    public final class SendingFuture<Output, Failure: Error>: Publisher, Sendable {
        public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
    
        private let attemptToFulfill: @Sendable (@escaping Promise) -> Void
    
        public init(_ attemptToFulfill: @Sendable @escaping (@escaping Promise) -> Void) {
            self.attemptToFulfill = attemptToFulfill
        }
    
        public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
            let subscription = SendingSubscription(subscriber: subscriber, attemptToFulfill: attemptToFulfill)
            subscriber.receive(subscription: subscription)
        }
    
        private final class SendingSubscription<S: Subscriber>: Subscription
        where S.Input == Output, S.Failure == Failure, S: Sendable {
            private var subscriber: S?
    
            init(subscriber: S, attemptToFulfill: @escaping (@escaping Promise) -> Void) {
                self.subscriber = subscriber
                attemptToFulfill { result in
                    switch result {
                    case .success(let output):
                        _ = subscriber.receive(output)
                        subscriber.receive(completion: .finished)
                    case .failure(let failure):
                        subscriber.receive(completion: .failure(failure))
                    }
                }
            }
    
            func request(_ demand: Subscribers.Demand) {}
    
            func cancel() {
                subscriber = nil
            }
        }
    }
    

    And then your original code will work, changing Future to SendingFuture.

    Login or Signup to reply.
  2. I think Rob Napier’s answer is very good (+1).

    But I might advise caution with unstructured concurrency (i.e., the use of Task{…}). If you create unstructured concurrency, you bear responsibility for handling cancelation. If you wrap a Task {…} in a standard Future there is no (easy) way to cancel tasks that are underway when the Future is canceled.

    If you are going to try to wrap asyncawait code in a Future, I might suggest that you will want to create a custom AsyncAwaitFuture that will also cancel the associated task if the future is canceled. And then, once you have that, you do not need to implement an asyncMap method, at all, but you should be able to use all of the standard Publisher methods.

    So, perhaps:

    /// AsyncAwaitFuture
    
    @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
    public final class AsyncAwaitFuture<Output, Failure: Error>: Publisher, Sendable {
        public typealias Promise = @Sendable (Result<Output, Failure>) -> Void
    
        private let work: @Sendable (@escaping Promise) async -> Void
    
        public init(_ work: @Sendable @escaping (@escaping Promise) async -> Void) {
            self.work = work
        }
    
        public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input, S: Sendable {
            let subscription = AsyncAwaitSubscription(subscriber: subscriber, work: work)
            subscriber.receive(subscription: subscription)
        }
    }
    
    /// AsyncAwaitFuture.AsyncAwaitSubscription
    
    private extension AsyncAwaitFuture {
        final class AsyncAwaitSubscription<S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure, S: Sendable {
            private var subscriber: S?
            private let task: Task<Void, Error>
    
            init(subscriber: S, work: @Sendable @escaping (@escaping Promise) async -> Void) {
                self.subscriber = subscriber
                task = Task {
                    await work { result in
                        switch result {
                        case .success(let output):
                            _ = subscriber.receive(output)
                            subscriber.receive(completion: .finished)
                        case .failure(let failure):
                            subscriber.receive(completion: .failure(failure))
                        }
                    }
                }
            }
    
            func request(_ demand: Subscribers.Demand) { }
    
            func cancel() {
                subscriber = nil
                task.cancel()
            }
        }
    }
    

    Then you can do things like:

    import os.log
    
    let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "ViewController")
    let poi = OSSignposter(subsystem: Bundle.main.bundleIdentifier!, category: .pointsOfInterest)
    

    And:

    var cancellable: AnyCancellable?
    
    func performSingleAsyncFuture(index: Int) -> AsyncAwaitFuture<Int, Error> {
        AsyncAwaitFuture { promise in
            let state = poi.beginInterval(#function, id: poi.makeSignpostID(), "(index)")
            defer { poi.endInterval(#function, state) }
            
            do {
                try await Task.sleep(for: .seconds(5))
                promise(.success(42))
            } catch {
                poi.emitEvent(#function, "error: (error)")
                promise(.failure(error))
            }
        }
    }
    
    func startWork() {
        logger.debug(#function)
        
        cancellable = performSingleAsyncFuture(index: 0)
            .sink { completion in
                switch completion {
                case .failure(let error):
                    logger.error("error: (error)")
                case .finished:
                    logger.debug("finished")
                }
            } receiveValue: { value in
                logger.debug("received: (value)")
            }
    }
    

    Or, if you wanted to do a series of these (say ten of them, but not more than three at a time), it might look like:

    func performSingleAsyncFuture(index: Int) -> AsyncAwaitFuture<Int, Error {…}
    
    func performMultipleAsyncFutures() -> AnyPublisher<Int, Error> {
        Publishers.Sequence(sequence: (0..<10).map { self.performSingleAsyncFuture(index: $0) })
            .flatMap(maxPublishers: .max(3)) { $0 }
            .eraseToAnyPublisher()
    }
    
    func startWork() {
        logger.debug(#function)
        
        cancellable = performMultipleAsyncFutures()
            .sink { completion in
                switch completion {
                case .failure(let error):
                    logger.error("error: (error)")
                case .finished:
                    logger.debug("finished")
                }
            } receiveValue: { value in
                logger.debug("received: (value)")
            }
    }
    

    So, when I profiled this in Instruments, I let it run once to completion, but the second time, I started it, but dismissed the view in question before the tasks finished, but it was canceled automatically at the ⓢ signpost when the cancellable fell out of scope:

    Instruments Time Profiler with Points of Interest showing both successful run of 10 items, but also cancelation support

    (Forgive the Instruments snapshot, as a I shorted the labels in the intervals so you could more easily see the full string.)


    In short, be careful with unstructured concurrency (Task {…}) and make sure you cancel the Task when it is no longer necessary.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search