Skip to content

Commit

Permalink
Add support for decoding event streams (#566)
Browse files Browse the repository at this point in the history
* Add EventStream type

* Split event stream code into two files

* If response is chunked assume it is raw

* Fix test errors

* Fix a couple strict concurrency issues I missed

* Add tests for EventStreams

* Changes from PR review
  • Loading branch information
adam-fowler authored Jul 20, 2023
1 parent 15744ff commit adb4c26
Show file tree
Hide file tree
Showing 9 changed files with 499 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
env:
ENABLE_TIMING_TESTS: "false"
AWS_LOG_LEVEL: "trace"
SOTO_CORE_STRICT_CONCURRENCY: "true"

jobs:
macos:
Expand Down
10 changes: 10 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,13 @@ let package = Package(
]),
]
)

import Foundation

if ProcessInfo.processInfo.environment["SOTO_CORE_STRICT_CONCURRENCY"] == "true" {
for target in package.targets {
if !target.isTest {
target.swiftSettings = [.unsafeFlags(["-Xfrontend", "-strict-concurrency=complete"])]
}
}
}
91 changes: 91 additions & 0 deletions Sources/SotoCore/Concurrency/EventStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import NIOCore
import SotoXML

/// AsyncSequence of Event stream events
public struct AWSEventStream<Event: Sendable>: Sendable {
let base: AnyAsyncSequence<ByteBuffer>

/// Initialise AWSEventStream from an AsyncSequence of ByteBuffers
init<BaseSequence: AsyncSequence & Sendable>(_ base: BaseSequence) where BaseSequence.Element == ByteBuffer {
self.base = .init(base)
}
}

/// If Event is decodable then conform AWSEventStream to AsyncSequence
extension AWSEventStream: AsyncSequence where Event: Decodable {
public typealias Element = Event

public struct AsyncIterator: AsyncIteratorProtocol {
enum State {
case idle
case remainingBuffer(ByteBuffer)
}

var baseIterator: AnyAsyncSequence<ByteBuffer>.AsyncIterator
var state: State = .idle

public mutating func next() async throws -> Event? {
var accumulatedBuffer: ByteBuffer? = nil
var buffer: ByteBuffer?
// get buffer either from what is remaining from last buffer or a new buffer from
// the ByteBuffer sequence
switch self.state {
case .idle:
buffer = try await self.baseIterator.next()
case .remainingBuffer(let remainingBuffer):
buffer = remainingBuffer
}
while var validBuffer = buffer {
// have we already accumulated some buffer, if so append new buffer onto the end
if var validAccumulatedBuffer = accumulatedBuffer {
validAccumulatedBuffer.writeBuffer(&validBuffer)
validBuffer = validAccumulatedBuffer
accumulatedBuffer = validAccumulatedBuffer
} else {
accumulatedBuffer = validBuffer
}

if let event = try readEvent(&validBuffer) {
if validBuffer.readableBytes > 0 {
self.state = .remainingBuffer(validBuffer)
} else {
self.state = .idle
}
return event
}
buffer = try await self.baseIterator.next()
}

return nil
}

/// Read event from buffer
func readEvent(_ buffer: inout ByteBuffer) throws -> Event? {
do {
let event = try EventStreamDecoder().decode(Event.self, from: &buffer)
return event
} catch InternalAWSEventStreamError.needMoreData {
return nil
}
}
}

public func makeAsyncIterator() -> AsyncIterator {
.init(baseIterator: self.base.makeAsyncIterator())
}
}
6 changes: 3 additions & 3 deletions Sources/SotoCore/Doc/AWSShape.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,11 @@ public struct AWSShapeOptions: OptionSet {
self.rawValue = rawValue
}

/// Payload can be streamed
/// Request payload can be streamed
public static let allowStreaming = AWSShapeOptions(rawValue: 1 << 0)
/// Payload can be streamed using Transfer-Encoding: chunked
/// Request payload can be streamed using Transfer-Encoding: chunked
public static let allowChunkedStreaming = AWSShapeOptions(rawValue: 1 << 1)
/// Payload is raw data
/// Response Payload is raw data, or an event stream
public static let rawPayload = AWSShapeOptions(rawValue: 1 << 2)
/// Request can include a checksum header
public static let checksumHeader = AWSShapeOptions(rawValue: 1 << 3)
Expand Down
220 changes: 220 additions & 0 deletions Sources/SotoCore/Encoder/EventStreamDecoder.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
import NIOCore
import SotoXML

/// Event stream decoder. Decodes top level `:event-type` header and then passes the payload
/// to another decoder based off the `:content-type` header
struct EventStreamDecoder {
init() {}

func decode<T: Decodable>(_ type: T.Type, from buffer: inout ByteBuffer) throws -> T {
let decoder = try _EventStreamDecoder(buffer: &buffer)
let value = try T(from: decoder)
return value
}
}

/// Internal implementation of `EventStreamDecoder`
private struct _EventStreamDecoder: Decoder {
var codingPath: [CodingKey] { [] }
var userInfo: [CodingUserInfoKey: Any] { [:] }
let headers: [String: String]
let payload: ByteBuffer

init(buffer: inout ByteBuffer) throws {
let (headers, payload) = try Self.readEvent(&buffer)
self.headers = headers
self.payload = payload
}

func container<Key>(keyedBy type: Key.Type) throws -> KeyedDecodingContainer<Key> where Key: CodingKey {
return KeyedDecodingContainer(KDC<Key>(headers: self.headers, payload: self.payload))
}

struct KDC<Key: CodingKey>: KeyedDecodingContainerProtocol {
var codingPath: [CodingKey] { [] }
var allKeys: [Key] { self.eventTypeKey.map { [$0] } ?? [] }
let eventTypeKey: Key?
let headers: [String: String]
let payload: ByteBuffer

init(headers: [String: String], payload: ByteBuffer) {
self.headers = headers
self.payload = payload
self.eventTypeKey = self.headers[":event-type"].map { .init(stringValue: $0) } ?? nil
}

func contains(_ key: Key) -> Bool {
self.eventTypeKey?.stringValue == key.stringValue
}

func decodeNil(forKey key: Key) throws -> Bool {
return true
}

func decode<T>(_ type: T.Type, forKey key: Key) throws -> T where T: Decodable {
switch self.headers[":content-type"] {
case "application/json":
let jsonDecoder = JSONDecoder()
jsonDecoder.dateDecodingStrategy = .secondsSince1970
jsonDecoder.userInfo[.awsEvent] = EventDecodingContainer(payload: self.payload)
return try jsonDecoder.decode(T.self, from: self.payload)

case "text/xml", "application/xml":
let xmlDocument = try XML.Document(buffer: self.payload)
let xmlElement = xmlDocument.rootElement() ?? .init(name: "__empty_element")

var xmlDecoder = XMLDecoder()
xmlDecoder.userInfo[.awsEvent] = EventDecodingContainer(payload: self.payload)
return try xmlDecoder.decode(T.self, from: xmlElement)

case "application/octet-stream":
// if content-type is a raw buffer then use JSONDecoder() to pass this to `init(from:)`
// via the user info`
let jsonDecoder = JSONDecoder()
jsonDecoder.dateDecodingStrategy = .secondsSince1970
jsonDecoder.userInfo[.awsEvent] = EventDecodingContainer(payload: self.payload)
return try jsonDecoder.decode(T.self, from: .init(staticString: "{}"))

case .none:
// if there is no content-type then create object using JSONDecoder() and some empty json
return try JSONDecoder().decode(T.self, from: .init(staticString: "{}"))

case .some(let header):
throw AWSEventStreamError.unsupportedContentType(header)
}
}

func nestedContainer<NestedKey>(keyedBy type: NestedKey.Type, forKey key: Key) throws -> KeyedDecodingContainer<NestedKey> where NestedKey: CodingKey {
preconditionFailure("Nested containers are not supported")
}

func nestedUnkeyedContainer(forKey key: Key) throws -> UnkeyedDecodingContainer {
preconditionFailure("Nested unkeyed containers are not supported")
}

func superDecoder() throws -> Decoder {
preconditionFailure("Super decoders are not supported")
}

func superDecoder(forKey key: Key) throws -> Decoder {
preconditionFailure("Super decoders are not supported")
}
}

/// Read event from ByteBuffer. See https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more details
/// - Parameter byteBuffer: ByteBuffer containing event
/// - Returns: Headers and event payload
static func readEvent(_ byteBuffer: inout ByteBuffer) throws -> ([String: String], ByteBuffer) {
// read header values from ByteBuffer. Format is uint8 name length, name, 7, uint16 value length, value
func readHeaderValues(_ byteBuffer: ByteBuffer) throws -> [String: String] {
var byteBuffer = byteBuffer
var headers: [String: String] = [:]
while byteBuffer.readableBytes > 0 {
guard let headerLength: UInt8 = byteBuffer.readInteger(),
let header: String = byteBuffer.readString(length: Int(headerLength)),
let byte: UInt8 = byteBuffer.readInteger(), byte == 7,
let valueLength: UInt16 = byteBuffer.readInteger(),
let value: String = byteBuffer.readString(length: Int(valueLength))
else {
throw AWSEventStreamError.corruptHeader
}
headers[header] = value
}
return headers
}

guard byteBuffer.readableBytes > 0 else { throw InternalAWSEventStreamError.needMoreData }

// get prelude buffer and crc. Throw `needMoreData` if we don't have enough data
guard var preludeBuffer = byteBuffer.getSlice(at: byteBuffer.readerIndex, length: 8) else { throw InternalAWSEventStreamError.needMoreData }
guard let preludeCRC: UInt32 = byteBuffer.getInteger(at: byteBuffer.readerIndex + 8) else { throw InternalAWSEventStreamError.needMoreData }
// verify crc
let calculatedPreludeCRC = soto_crc32(0, bytes: ByteBufferView(preludeBuffer))
guard UInt(preludeCRC) == calculatedPreludeCRC else { throw AWSEventStreamError.corruptPayload }
// get lengths
guard let totalLength: Int32 = preludeBuffer.readInteger(),
let headerLength: Int32 = preludeBuffer.readInteger() else { throw InternalAWSEventStreamError.needMoreData }

// get message and message CRC. Throw `needMoreData` if we don't have enough data
guard var messageBuffer = byteBuffer.readSlice(length: Int(totalLength - 4)),
let messageCRC: UInt32 = byteBuffer.readInteger() else { throw InternalAWSEventStreamError.needMoreData }
// verify message CRC
let calculatedCRC = soto_crc32(0, bytes: ByteBufferView(messageBuffer))
guard UInt(messageCRC) == calculatedCRC else { throw AWSEventStreamError.corruptPayload }

// skip past prelude
messageBuffer.moveReaderIndex(forwardBy: 12)

// get headers
guard let headerBuffer: ByteBuffer = messageBuffer.readSlice(length: Int(headerLength)) else {
throw AWSEventStreamError.corruptHeader
}
let headers = try readHeaderValues(headerBuffer)

// if message type is an error then throw error
if headers[":message-type"] == "error" {
throw AWSEventStreamError.errorMessage(headers[":error-code"] ?? "Unknown")
}

let payloadSize = Int(totalLength - headerLength - 16)
let payloadBuffer = messageBuffer.readSlice(length: payloadSize)

return (headers, payloadBuffer ?? .init())
}

func unkeyedContainer() throws -> UnkeyedDecodingContainer {
preconditionFailure("Unkeyed containers are not supported")
}

func singleValueContainer() throws -> SingleValueDecodingContainer {
preconditionFailure("Single value containers are not supported")
}
}

/// Container used for passed event payload to decoders
public struct EventDecodingContainer {
let payload: ByteBuffer

/// Return payload from EventStream payload
/// - Returns: Payload as ByteBuffer
public func decodePayload() -> ByteBuffer {
return self.payload
}
}

extension CodingUserInfoKey {
/// AWS Event user info key
public static var awsEvent: Self { return .init(rawValue: "soto.awsEvent")! }
}

/// Errors thrown while decoding the event stream buffers
enum AWSEventStreamError: Error {
/// The message headers are corrupt
case corruptHeader
/// The message payload is corrupt
case corruptPayload
/// The message was an error
case errorMessage(String)
/// Unsupported content type
case unsupportedContentType(String)
}

/// Internal error used to indicate we need more data to parse this message
internal enum InternalAWSEventStreamError: Error {
case needMoreData
}
4 changes: 4 additions & 0 deletions Sources/SotoCore/Encoder/ResponseContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public struct ResponseDecodingContainer {
return self.response.body
}

public func decodeEventStream<Event>() -> AWSEventStream<Event> {
return .init(self.response.body)
}

public func decode(_ type: Date.Type = Date.self, forHeader header: String) throws -> Date {
guard let headerValue = response.headers[header].first else {
throw HeaderDecodingError.headerNotFound(header)
Expand Down
2 changes: 1 addition & 1 deletion Sources/SotoTestUtils/TestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public enum TestEnvironment {
return (Environment["AWS_ENABLE_LOGGING"] == "true") ? [AWSLoggingMiddleware(logger: TestEnvironment.logger, logLevel: .info)] : []
}

public static var logger: Logger = {
public static let logger: Logger = {
if let loggingLevel = Environment["AWS_LOG_LEVEL"] {
if let logLevel = Logger.Level(rawValue: loggingLevel.lowercased()) {
var logger = Logger(label: "soto")
Expand Down
12 changes: 9 additions & 3 deletions Sources/SotoXML/Expat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Expat {
guard let parser = encoding.withCString({ cs in
Soto_XML_ParserCreate(cs)
}) else {
throw XML_ERROR_NO_MEMORY
throw XMLError(XML_ERROR_NO_MEMORY)
}
self.parser = parser

Expand Down Expand Up @@ -80,7 +80,7 @@ class Expat {
if let callback = cbError {
callback(error)
}
throw error
throw XMLError(error)
}
}

Expand Down Expand Up @@ -193,4 +193,10 @@ class Expat {
}
}

extension XML_Error: Error {}
struct XMLError: Error {
let error: XML_Error

init(_ error: XML_Error) {
self.error = error
}
}
Loading

0 comments on commit adb4c26

Please sign in to comment.