Skip to content

Commit

Permalink
AsyncDataSequence for Swift 5.9+
Browse files Browse the repository at this point in the history
  • Loading branch information
swhitty committed Jul 14, 2024
1 parent afdd366 commit c1cd41d
Showing 1 changed file with 17 additions and 21 deletions.
38 changes: 17 additions & 21 deletions FlyingSocks/Sources/AsyncDataSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public struct AsyncDataSequence: AsyncSequence, Sendable {
self.loader = DataLoader(
count: count,
chunkSize: chunkSize,
iterator: AsyncBufferedSequenceIterator(bytes.makeAsyncIterator())
iterator: bytes.makeAsyncIterator()
)
}

Expand Down Expand Up @@ -105,10 +105,10 @@ private extension AsyncDataSequence {
actor DataLoader {
nonisolated fileprivate let count: Int
nonisolated fileprivate let chunkSize: Int
private let iterator: any AsyncDataIterator & Sendable
private var iterator: any AsyncBufferedIteratorProtocol<UInt8>
private var state: State

init(count: Int, chunkSize: Int, iterator: some AsyncDataIterator & Sendable) {
init(count: Int, chunkSize: Int, iterator: some AsyncBufferedIteratorProtocol<UInt8> & Sendable) {
self.count = count
self.chunkSize = chunkSize
self.iterator = iterator
Expand All @@ -129,7 +129,7 @@ private extension AsyncDataSequence {
}

do {
guard let element = try await iterator.nextChunk(suggested: nextCount) else {
guard let element = try await getNextBuffer(&iterator, suggested: nextCount) else {
throw Error.unexpectedEOF
}
state = .ready(index: index + element.count)
Expand All @@ -140,6 +140,13 @@ private extension AsyncDataSequence {
}
}

private func getNextBuffer(_ iterator: inout some AsyncBufferedIteratorProtocol<UInt8>, suggested count: Int) async throws -> Data? {
guard let buffer = try await iterator.nextBuffer(suggested: count) else {
return nil
}
return Data(buffer)
}

public func flushIfNeeded() async throws {
switch state {
case .ready(index: var index):
Expand All @@ -166,23 +173,16 @@ private extension AsyncDataSequence {
}
}

final class AsyncBufferedSequenceIterator<I: AsyncBufferedIteratorProtocol>: AsyncDataIterator, @unchecked Sendable where I.Element == UInt8 {
private var iterator: I
struct AsyncFileHandleIterator: AsyncBufferedIteratorProtocol {
typealias Element = UInt8

init(_ iterator: I) {
self.iterator = iterator
}
let handle: FileHandle?

func nextChunk(suggested count: Int) async throws -> Data? {
guard let buffer = try await iterator.nextBuffer(suggested: count) else { return nil }
return Data(buffer)
func next() async throws -> UInt8? {
fatalError()
}
}

struct AsyncFileHandleIterator: AsyncDataIterator {
let handle: FileHandle?

func nextChunk(suggested count: Int) throws -> Data? {
func nextBuffer(suggested count: Int) throws -> Data? {
guard let handle = handle else { throw SocketError.disconnected }
if #available(macOS 10.15.4, iOS 13.4, tvOS 13.4, *) {
return try handle.read(upToCount: count)
Expand All @@ -193,7 +193,3 @@ private extension AsyncDataSequence {
}
}

private protocol AsyncDataIterator {
func nextChunk(suggested count: Int) async throws -> Data?
}

0 comments on commit c1cd41d

Please sign in to comment.