[swift-users] dispatch concurrent map: is this right?

Karl razielim at gmail.com
Sun Oct 30 20:31:57 CDT 2016


> On 30 Oct 2016, at 19:23, Dave Abrahams via swift-users <swift-users at swift.org> wrote:
> 
> 
> on Sun Oct 30 2016, Karl <swift-users-AT-swift.org> wrote:
> 
>>> On 30 Oct 2016, at 09:15, Karl <raziel.im+swift-users at gmail.com> wrote:
>>> 
>>> I had the need for a concurrent map recently. I had a part of a
>>> program which needed to read chunks of data and concurrently process
>>> them and assemble the results in an array. This isn’t necessarily as
>>> obvious as it sounds, because of arrays being value types. I came up
>>> with the following snippet which I’d like to check for correctness;
>>> it could also be helpful to others.
>>> 
>>> Perhaps this is something Dispatch should provide out-of-the-box?
>>> 
>>> - Karl
>> 
>> Ah one second, I was refactoring this and forgot to test it. Here’s the actual code:
> 
> A map presumably requires an input 

DispatchQueue.concurrentMap maps a Range<Int> -> T, but since the range is always 0..<n, we only ask for the value of n. It could also be written quite naturally as an extension on Range and build everything on top of it.

> 
>> extension DispatchQueue {
>> 
>>  static func concurrentMap<T>(iterations: Int, execute block: (Int) -> T) -> [T] {
>> 
>>    let __result = UnsafeMutableRawBufferPointer.allocate(count: iterations * MemoryLayout<T>.stride)
>>    defer { __result.deallocate() }
>>    let _result  = __result.baseAddress?.assumingMemoryBound(to: T.self)
> 
> You never bound the memory to T, so this will be undefined behavior.  
> 
>>    let result   = UnsafeMutableBufferPointer<T>(start: _result, count: iterations)
>>    concurrentPerform(iterations: iterations) { idx in
>>      result[idx] = block(idx)
> 
> You also never initialized the Ts in that memory region, so assigning
> into them will also be undefined behavior.
> 
>>    }
>>    return Array(result)
>>  }
>> }
>> 
>> extension Array {
>>  func concurrentMap<T>(execute block: (Element)->T) -> [T] {
>>    return DispatchQueue.concurrentMap(iterations: count) { block(self[$0]) }
>>  }
>> }
>> 
>> Unfortunately I don’t think there’s a way to get an array to take over a +1
>> UnsafeMutableBufferPointer without copying.
> 
> The only correct way to do this without creating intermediate storage is
> to have a way to initialize your result elements, e.g.:
> 
>  import Dispatch
> 
>  protocol DefaultInitializable {
>    init()
>  }
> 
>  extension RandomAccessCollection {
>    func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T]
>    where T : DefaultInitializable {
>      var result = Array(
>        repeating: T(), count: numericCast(self.count))
> 
>      DispatchQueue.concurrentPerform(iterations: result.count) {
>        offset in 
>        result[offset] = transform(
>          self[index(startIndex, offsetBy: numericCast(offset))])
>      }
>      return result
>    }
>  }
> 
>  extension Int : DefaultInitializable {  }
> 
>  print((3..<20).concurrentMap { $0 * 2 })
> 

I had a go at doing that before, using Optional<T> and unwrapping at the end — but it occurred to me that it would be very inefficient for things like Optional<Int>, and introduces more allocations.


> If you don't want the DefaultInitializable requirement (or some other
> way to prepare initialized elements), you'll need to manage memory
> yourself:
> 
>  extension RandomAccessCollection {
>    func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {
>      let n = numericCast(self.count) as Int
>      let p = UnsafeMutablePointer<T>.allocate(capacity: n)
>      defer { p.deallocate(capacity: n) }
> 
>      DispatchQueue.concurrentPerform(iterations: n) {
>        offset in
>        (p + offset).initialize(
>          to: transform(
>            self[index(startIndex, offsetBy: numericCast(offset))]))
>      }
> 
>      return Array(UnsafeMutableBufferPointer(start: p, count: n))
>    }
>  }
> 
> This posting highlights a couple of weaknesses in the standard library
> for which I'd appreciate bug reports:
> 
> 1. No way to arbitrarily initialize an Array's storage.
> 2. UnsafeMutableBufferPointer doesn't have an allocating init
> 
> Thanks!
> 
> -- 
> -Dave
> 
> _______________________________________________
> swift-users mailing list
> swift-users at swift.org
> https://lists.swift.org/mailman/listinfo/swift-users


Filed:

1. https://bugs.swift.org/browse/SR-3087
2. https://bugs.swift.org/browse/SR-3088

What is your opinion on the corelibs extending the standard library types? Foundation does it to provide APIs from NSString, but it’s kind of a special case. Would it be reasonable for Dispatch (which is not _such_ a special case) to also extend types like Range and Collection?

I quite like the API as an extension on Range. I think it would be a nice addition to Dispatch (once we start allowing additive proposals):

extension Range where Bound : Strideable, Bound.Stride : SignedInteger {

  func concurrentMap<T>(_ transform: (Bound) -> T) -> [T] {
    let n        = numericCast(count) as Int
    let buffer = UnsafeMutablePointer<T>.allocate(capacity: n)

    DispatchQueue.concurrentPerform(iterations: n) {
      (buffer + $0).initialize(to: transform(lowerBound + numericCast($0)))
    }

    // Unfortunately, the buffer is copied when making it an Array<T>.
    defer { buffer.deallocate(capacity: n) }
    return Array(UnsafeMutableBufferPointer<T>(start: buffer, count: n))
  }
}

extension Collection {
  func concurrentMap<T>(_ transform: (Iterator.Element)->T) -> [T] {

    // ‘as Range’ because CountableRange is a collection, causing the function to be recursive.
    return ((0..<numericCast(count)) as Range).concurrentMap {
      transform(self[index(startIndex, offsetBy: numericCast($0))])
    }
  }
}

Thanks

- Karl

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.swift.org/pipermail/swift-users/attachments/20161031/6746157e/attachment.html>


More information about the swift-users mailing list