Skip to content

Commit

Permalink
apacheGH-36546: [Swift] The initial implementation for swift arrow fl…
Browse files Browse the repository at this point in the history
…ight
  • Loading branch information
abandy committed Jul 7, 2023
1 parent 9da86dc commit ba26178
Show file tree
Hide file tree
Showing 22 changed files with 3,599 additions and 0 deletions.
5 changes: 5 additions & 0 deletions ci/scripts/swift_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ source_dir=${1}/swift/Arrow
pushd ${source_dir}
swift test
popd

source_dir=${1}/swift/ArrowFlight
pushd ${source_dir}
swift test
popd
9 changes: 9 additions & 0 deletions swift/ArrowFlight/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.DS_Store
/.build
/Packages
/*.xcodeproj
xcuserdata/
DerivedData/
.swiftpm/
.netrc
Package.resolved
36 changes: 36 additions & 0 deletions swift/ArrowFlight/Package.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// swift-tools-version: 5.8
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription

let package = Package(
name: "ArrowFlight",
platforms: [
.macOS(.v10_15)
],
products: [
// Products define the executables and libraries a package produces, making them visible to other packages.
.library(
name: "ArrowFlight",
targets: ["ArrowFlight"]),
],
dependencies: [
.package(url: "https://github.com/grpc/grpc-swift.git", from: "1.15.0"),
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.6.0"),
.package(path: "../Arrow")
],
targets: [
// Targets are the basic building blocks of a package, defining a module or a test suite.
// Targets can depend on other targets in this package and products from dependencies.
.target(
name: "ArrowFlight",
dependencies: [
.product(name: "Arrow", package: "Arrow"),
.product(name: "GRPC", package: "grpc-swift"),
.product(name: "SwiftProtobuf", package: "swift-protobuf")
]),
.testTarget(
name: "ArrowFlightTests",
dependencies: ["ArrowFlight"]),
]
)
29 changes: 29 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightAction.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// File.swift
//
//
// Created by Alva Bandy on 6/27/23.
//

import Foundation

public class FlightAction {
public let type: String
public let body: Data
init(_ action: Arrow_Flight_Protocol_Action) {
self.type = action.type
self.body = action.body
}

public init(_ type: String, body: Data = Data()) {
self.type = type;
self.body = body;
}

func toProtocol() -> Arrow_Flight_Protocol_Action {
var flight_action = Arrow_Flight_Protocol_Action()
flight_action.type = self.type
flight_action.body = self.body
return flight_action
}
}
28 changes: 28 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightActionType.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// File.swift
//
//
// Created by Alva Bandy on 7/3/23.
//

import Foundation
public class FlightActionType {
public let type: String
public let description: String
init(_ actionType: Arrow_Flight_Protocol_ActionType) {
self.type = actionType.type
self.description = actionType.description_p

}
public init(_ type: String, description: String) {
self.type = type
self.description = description
}

func toProtocol() -> Arrow_Flight_Protocol_ActionType {
var actionType = Arrow_Flight_Protocol_ActionType()
actionType.type = self.type
actionType.description_p = self.description
return actionType
}
}
134 changes: 134 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// File.swift
//
//
// Created by Alva Bandy on 7/6/23.
//

import struct Foundation.Data
import struct Foundation.URL
import GRPC
import NIOCore
import NIOPosix
import Arrow

public class FlightClient {
let client: Arrow_Flight_Protocol_FlightServiceAsyncClient
public init(channel: GRPCChannel) {
client = Arrow_Flight_Protocol_FlightServiceAsyncClient(channel: channel)
}

public func listActions(_ closure: (FlightActionType) -> Void) async throws {
let listActions = client.makeListActionsCall(Arrow_Flight_Protocol_Empty())
for try await data in listActions.responseStream {
closure(FlightActionType(data))
}
}

public func listFlights(_ criteria :FlightCriteria, closure: (FlightInfo) throws -> Void ) async throws {
let listFlights = client.makeListFlightsCall(criteria.toProtocol())
for try await data in listFlights.responseStream {
try closure(FlightInfo(data));
}
}


public func doAction(_ action: FlightAction, closure: (FlightResult) throws -> Void) async throws {
let actionResponse = client.makeDoActionCall(action.toProtocol())
for try await data in actionResponse.responseStream {
try closure(FlightResult(data));
}
}

public func getSchema(_ descriptor: FlightDescriptor) async throws -> FlightSchemaResult {
let schemaResultResponse = client.makeGetSchemaCall(descriptor.toProtocol())
return FlightSchemaResult(try await schemaResultResponse.response)
}

public func doGet(_ ticket: FlightTicket, readerResultClosure: (ArrowReader.ArrowReaderResult) throws -> Void) async throws {
let getResult = client.makeDoGetCall(ticket.toProtocol())
let reader = ArrowReader()
for try await data in getResult.responseStream {
switch reader.fromStream(data.dataBody) {
case .success(let rb):
try readerResultClosure(rb)
case .failure(let error):
throw error
}
}
}

public func doGet(_ ticket: FlightTicket, flightDataClosure: (FlightData) throws -> Void) async throws {
let getResult = client.makeDoGetCall(ticket.toProtocol())
for try await data in getResult.responseStream {
try flightDataClosure(FlightData(data))
}
}

public func doPut(_ recordBatchs: [RecordBatch], closure: (FlightPutResult) throws -> Void) async throws {
if recordBatchs.isEmpty {
throw ArrowFlightError.EmptyCollection
}

let putCall = client.makeDoPutCall()
let writer = ArrowWriter()
let writerInfo = ArrowWriter.Info(.recordbatch, schema: recordBatchs[0].schema, batches: recordBatchs)
switch writer.toStream(writerInfo) {
case .success(let data):
try await putCall.requestStream.send(FlightData(data).toProtocol())
putCall.requestStream.finish()
for try await response in putCall.responseStream {
try closure(FlightPutResult(response))
}
case .failure(let error):
throw error
}
}

public func doPut(flightData: FlightData, closure: (FlightPutResult) throws -> Void) async throws {
let putCall = client.makeDoPutCall()
try await putCall.requestStream.send(flightData.toProtocol())
putCall.requestStream.finish()
for try await response in putCall.responseStream {
try closure(FlightPutResult(response))
}
}

public func doExchange(_ recordBatchs: [RecordBatch], closure: (ArrowReader.ArrowReaderResult) throws -> Void) async throws {
if recordBatchs.isEmpty {
throw ArrowFlightError.EmptyCollection
}

let exchangeCall = client.makeDoExchangeCall()
let writer = ArrowWriter()
let info = ArrowWriter.Info(.recordbatch, schema: recordBatchs[0].schema, batches: recordBatchs)
switch writer.toStream(info) {
case .success(let data):
let request = Arrow_Flight_Protocol_FlightData.with {
$0.dataBody = data
}
try await exchangeCall.requestStream.send(request)
exchangeCall.requestStream.finish()
let reader = ArrowReader()
for try await response in exchangeCall.responseStream {
switch reader.fromStream(response.dataBody) {
case .success(let rbResult):
try closure(rbResult)
case .failure(let error):
throw error
}
}
case .failure(let error):
throw error
}
}

public func doExchange(fligthData: FlightData, closure: (FlightData) throws -> Void) async throws {
let exchangeCall = client.makeDoExchangeCall()
try await exchangeCall.requestStream.send(fligthData.toProtocol())
exchangeCall.requestStream.finish()
for try await response in exchangeCall.responseStream {
try closure(FlightData(response))
}
}
}
27 changes: 27 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightCriteria.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//
// File.swift
//
//
// Created by Alva Bandy on 7/3/23.
//

import Foundation

public class FlightCriteria {
let criteria: Arrow_Flight_Protocol_Criteria

public var expression: Data { criteria.expression }
public init(_ expression: Data = Data()) {
criteria = Arrow_Flight_Protocol_Criteria.with {
$0.expression = expression
}
}

init(_ criteria: Arrow_Flight_Protocol_Criteria) {
self.criteria = criteria
}

func toProtocol() -> Arrow_Flight_Protocol_Criteria {
return criteria
}
}
36 changes: 36 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightData.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// File.swift
//
//
// Created by Alva Bandy on 7/5/23.
//

import Foundation

public class FlightData {
let flight_data: Arrow_Flight_Protocol_FlightData
public var flightDescriptor: FlightDescriptor? {
get { return flight_data.hasFlightDescriptor ? FlightDescriptor(flight_data.flightDescriptor) : nil }
}

public var dataBody: Data { flight_data.dataBody }

init(_ flight_data: Arrow_Flight_Protocol_FlightData) {
self.flight_data = flight_data
}

public init(_ dataBody: Data, flightDescriptor: FlightDescriptor? = nil) {
if flightDescriptor != nil {
self.flight_data = Arrow_Flight_Protocol_FlightData.with {
$0.dataBody = dataBody
$0.flightDescriptor = flightDescriptor!.toProtocol()
}
} else {
self.flight_data = Arrow_Flight_Protocol_FlightData.with {
$0.dataBody = dataBody
}
}
}

func toProtocol() -> Arrow_Flight_Protocol_FlightData { self.flight_data }
}
46 changes: 46 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightDescriptor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// File.swift
//
//
// Created by Alva Bandy on 6/27/23.
//

import Foundation

public class FlightDescriptor {
public enum type {
case unknown
case path
case cmd
}

public let type: FlightDescriptor.type
public let cmd: Data
public let paths: [String]

init(_ descriptor: Arrow_Flight_Protocol_FlightDescriptor) {
self.type = descriptor.type == .cmd ? .cmd : .path
self.cmd = descriptor.cmd
self.paths = descriptor.path
}

public init(cmd: Data) {
self.type = .cmd
self.cmd = cmd
self.paths = [String]()
}

public init(paths: [String]) {
self.type = .path
self.cmd = Data()
self.paths = paths
}

func toProtocol() -> Arrow_Flight_Protocol_FlightDescriptor {
var descriptor = Arrow_Flight_Protocol_FlightDescriptor()
descriptor.type = self.type == .cmd ? .cmd : .path
descriptor.cmd = self.cmd
descriptor.path = self.paths
return descriptor
}
}
28 changes: 28 additions & 0 deletions swift/ArrowFlight/Sources/ArrowFlight/FlightEndpoint.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// File.swift
//
//
// Created by Alva Bandy on 6/27/23.
//

import Foundation
public class FlightEndpoint {
let ticket: FlightTicket;
let locations: [FlightLocation]
init(_ endpoint: Arrow_Flight_Protocol_FlightEndpoint) {
self.ticket = FlightTicket(endpoint.ticket.ticket)
self.locations = endpoint.location.map {return FlightLocation($0)}
}

public init(_ ticket: FlightTicket, locations: [FlightLocation]) {
self.ticket = ticket
self.locations = locations;
}

func toProtocol() -> Arrow_Flight_Protocol_FlightEndpoint {
var endpoint = Arrow_Flight_Protocol_FlightEndpoint()
endpoint.ticket = self.ticket.toProtocol()
endpoint.location = self.locations.map { $0.toProtocol() }
return endpoint
}
}
Loading

0 comments on commit ba26178

Please sign in to comment.