skip to Main Content

I want to try concurrent processing of an array for the first time. The compiler gives this warning: "Mutation of captured var ‘scaledImage’ in concurrently-executing code; this is an error in Swift 6".

I then revised the code by copying the source array a local let variable; thus all the data are now local but the warning persisted. Since all the data are now local and the array "scaledImage" is a ‘sink’ array I don’t understand why the warning persists nor what to do about it.

    // Concurrent processing 1st attempt
    func processArrayConcurrently() async -> [UInt8] {
        var scaledImage = [UInt8]()
        scaledImage.reserveCapacity(self.backingImage!.count)
        // constants for the data
        let dMin = Double(self.backingMinMax[0])
        let dRange = Double(self.backingRange)
        let backingImage = self.backingImage!

        let concurrentQueue = DispatchQueue(label: "com.example.concurrentQueue", attributes: .concurrent)

        DispatchQueue.concurrentPerform(iterations: backingImage.count) { index in
            let processedValue = UInt8(backingImage[index] * 2) // Example processing
            concurrentQueue.async(flags: .barrier) {  // WARNING next line
                scaledImage[index] = processedValue
            }
        }

        // Wait for all tasks to complete
        concurrentQueue.sync(flags: .barrier) {}

        return scaledImage
    }

Edit: Where image occurs in variable name, it refers to an array with data to be converted into an image.

My intended use is to allow the user to ‘grab’ a point on the image and ‘mouse’ it around to change the scaling and improve the image. The images are all infrared views of wildfire consequently the range of values from unburned terrain to ‘hot’ pixels is much more that 255 counts so some processing is required. This would let the user scale the lower pixel values differently from the higher values and see the results in real time. I’ve done this in the remote past with C/C++.

2

Answers


  1. The warning here is because Swift Concurrency doesn’t understand all the semantics of GCD. In this case, it doesn’t understand what .barrier means.

    Even though scaledImage is local variable, it is captured in a @Sendable closure (the closure passed to concurrentQueue.async(flags: .barrier)), and at that point it becomes a shared state. The closure can be passed around across different threads and all sorts of things can happen as far as Swift Concurrency is concerned.

    Bottom line is, you should not mix Swift Concurrency with GCD. If you want to use GCD, remove async and add a completion handler closure parameter. There are also some mistakes in your code to fix. reserveCapacity does not actually add anything to the array, so all the array accesses will be out-of-bounds. It seems like you want to do all the processing on concurrentQueue, but DispatchQueue.concurrentPerform runs on the current dispatch queue, which is definitely not concurrentQueue.

    You can do something similar to Rob’s answer here, where you write to a thread-safe array.

    func processArrayConcurrently(completion: @escaping @Sendable ([UInt8]) -> Void) {
        let backingImage = self.backingImage
    
        let concurrentQueue = DispatchQueue(label: "com.example.concurrentQueue", attributes: .concurrent)
    
        concurrentQueue.async {
            let scaledImage = SynchronizedArray<UInt8>(count: backingImage.count, defaultValue: 0)
            // concurrentPerform should be called in concurrentQueue.async!
            // otherwise this won't be run on concurrentQueue
            DispatchQueue.concurrentPerform(iterations: backingImage.count) { index in
                // you might want to check Task.isCancelled here if you plan on using this from Swift Concurrency
                // if Task.isCancelled { return }
                let processedValue = UInt8(backingImage[index] * 2) // Example processing
                scaledImage[index] = processedValue
            }
            completion(scaledImage.wrappedValue)
        }
    }
    
    final class SynchronizedArray<Value>: @unchecked Sendable {
        private var values: [Value]
        private let lock = NSLock()
        
        init(count: Int, defaultValue: Value) {
            values = .init(repeating: defaultValue, count: count)
        }
    
        subscript(index: Int) -> Value {
            get { lock.withLock { values[index] } }
            set { lock.withLock { values[index] = newValue } }
        }
    
        var wrappedValue: [Value] {
            get { lock.withLock { values } }
            set { lock.withLock { values = newValue } }
        }
    }
    

    You can then write an async wrapper of this like so

    func processArrayConcurrently() async -> [UInt8] {
        await withCheckedContinuation { continuation in
            processArrayConcurrently {
                continuation.resume(returning: $0)
            }
        }
    }
    

    If the task you are doing is not CPU-intensive, you can also do this purely with Swift Concurrency using a TaskGroup.

    func processArrayConcurrently() async -> [UInt8] {
        let dMin = Double(self.backingMinMax[0])
        let dRange = Double(self.backingRange)
        let backingImage = self.backingImage
    
        return await withTaskGroup(of: (index: Int, image: UInt8).self) { group in
            var scaledImage = [UInt8](repeating: 0, count: backingImage.count)
            for (i, image) in backingImage.enumerated() {
                group.addTask {
                    (i, UInt8(image * 2)) // do some processing
                }
            }
            for await (i, image) in group {
                scaledImage[i] = image
            }
            return scaledImage
        }
    }
    

    This creates a task group with subtask results of type (Int, UInt8). The Int is for recording the index of the scaledImage that the UInt8 (the actual result) should be inserted to.

    As long as processArrayConcurrently is not isolated to any actor, the processing will run in a background thread.

    Login or Signup to reply.
  2. I believe that Sweeper answered your question: To let the compiler know that you have manually synchronized your access to your buffer, you would either use nonisolated(unsafe) var, or wrap it in a type that is designated as @unchecked Sendable.

    A few observations, each subsequent alternative offering a further performance benefit:

    1. There is no point in using a concurrent queue for synchronization if all of your calls are going to use barriers. If you wanted to use GCD for synchronization, what you’ve got here is equivalent to a serial queue.

    2. Personally, I would use a lock: NSLock is simple and faster than GCD queues, OSAllocatedUnfairLock is even faster and eliminates the nonisolated(unsafe) var and/or @unchecked Sendable workarounds:

      import os.lock
      
      nonisolated func processParallel(backingImage: [UInt8]) async -> [UInt8] {
          await withCheckedContinuation { continuation in
              let count = backingImage.count
              let lock = OSAllocatedUnfairLock(initialState: [UInt8](repeating: 0, count: count))
      
              DispatchQueue.concurrentPerform(iterations: count) { index in
                  let processedValue = UInt8(UInt16(backingImage[index]) * 2 % 256) // Example processing
                  lock.withLock { buffer in
                      buffer[index] = processedValue
                  }
              }
              lock.withLock { buffer in
                  continuation.resume(returning: buffer)
              }
          }
      }
      
    3. Alternatively, you can use a UnsafeMutableBufferPointer<UInt8>, and because your algorithm guarantees that no individual byte in that buffer is mutated in parallel, no synchronization is needed at all:

      nonisolated func processParallel(backingImage: [UInt8]) async -> [UInt8] {
          await withCheckedContinuation { continuation in
              let count = backingImage.count
              nonisolated(unsafe) let buffer = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: count)
              defer { buffer.deallocate() }
      
              DispatchQueue.concurrentPerform(iterations: count) { index in
                  let processedValue = UInt8(UInt16(backingImage[index]) * 2 % 256) // Example processing
                  buffer[index] = processedValue
              }
              continuation.resume(returning: [UInt8](buffer))
          }
      }
      

      Note, this technique of dropping the synchronization cannot be done with Array<UInt8> (aka, [UInt8]) or Data types, but you can get away with it when using Unsafe[Mutable][Raw][Buffer]Pointer types.

    4. You should “stride”. E.g., let’s imaging that your buffer has 10m bytes, you really do not want to do 10m context switches. Instead, have each iteration process 100k or 1m points. This will have a significant performance impact:

      nonisolated func processStriding(backingImage: [UInt8]) async -> [UInt8] {
          await withCheckedContinuation { continuation in
              let count = backingImage.count
              nonisolated(unsafe) let buffer = UnsafeMutableBufferPointer<UInt8>.allocate(capacity: count)
              defer { buffer.deallocate() }
      
              let stride = 1_000_000
              var (iterations, remainder) = count.quotientAndRemainder(dividingBy: stride)
              if remainder != 0 {
                  iterations += 1
              }
      
              DispatchQueue.concurrentPerform(iterations: iterations) { i in
                  let start = i * stride
                  let end = min(start + stride, count)
                  for index in start ..< end {
                      let processedValue = UInt8(UInt16(backingImage[index]) * 2 % 256) // Example processing
                      buffer[index] = processedValue
                  }
              }
              continuation.resume(returning: [UInt8](buffer))
          }
      }
      

      You will want to play around with different striding factors and experimentally verify which offers the best performance.

    5. A minor point, but your question does not share where you got this [UInt8] “image” to start with, but if you, for example, extracted it from a CGImage data provider, you might not want to use [UInt8] at all, but just access the CGBitmapContextGetData data directly (or whatever) and avoid these high-level data types altogether. Manipulating native image buffers results in more complex code, but it frequently will be far more performant than round-tripping through [UInt8].

    6. As a final observation, before you lose too much sleep optimizing parallel calculations on the CPU, there are many custom frameworks for image processing (e.g., the Accelerate, CoreImage, Vision, etc.). These offer a dizzying array of specialized, highly-optimized algorithms, and many are GPU-based.

      So, before you go too far in writing your own parallelized CPU-based algorithms, I would suggest seeing if there is an existing framework that can be leveraged to better solve your specific business problem. And I would not dwell on the simplified “multiply by 2” sort of problem, but focus on the actual business problem you are trying to solve, as the implementations might be radically different from each other. E.g., one sort of approach might be relevant if you really want to multiply every byte by some scalar, but if you are planning on something more substantive, all the work in the “vector/matrix times scalar” details becomes irrelevant.

      This obviously is far beyond the scope of this question, but just a word of advice.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search