Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TCP connection away from Mac Network lib over to swift-nio enabling #5

Merged
merged 8 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions IBKit/IBKit/Client/IBClient+Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
//

import Foundation
import Network


extension IBClient: IBConnectionDelegate {

Expand Down
263 changes: 131 additions & 132 deletions IBKit/IBKit/Client/IBClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,148 +25,147 @@


import Foundation
import Network
import Combine

import NIOCore
import NIOConcurrencyHelpers
import NIOPosix

open class IBClient {

internal var subject = PassthroughSubject<IBEvent,Never>()

lazy public var eventFeed = subject.share().eraseToAnyPublisher()

var identifier: Int

var connection: IBConnection?

let host: NWEndpoint.Host

let port: NWEndpoint.Port

var serverVersion: Int?

public var connectionTime: String?


/// Creates new api client.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter address: Address where your IB Gatweay / Worskatation is running


public init(id masterID: Int, address: String, port: UInt16) {

guard let host = URL(string: address)?.host else {
fatalError("Cant figure out the host to connect")
}

self.host = NWEndpoint.Host(host)
self.port = NWEndpoint.Port(rawValue: port)!
self.identifier = masterID

}
internal var subject = PassthroughSubject<IBEvent,Never>()
lazy public var eventFeed = subject.share().eraseToAnyPublisher()

var identifier: Int
var connection: IBConnection?

let host: String
let port: Int

var serverVersion: Int?

public var connectionTime: String?


/// Creates new api client.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter address: Address where your IB Gatweay / Worskatation is running


public init(id masterID: Int, address: String, port: Int) {
guard let host = URL(string: address)?.host else {
fatalError("Cant figure out the host to connect")
}

self.host = host
self.port = port
self.identifier = masterID

}

var _nextValidID: Int = 0

/// Return next valid request identifier you should use to make request or subscription
var _nextValidID: Int = 0

public var nextRequestID: Int {
let value = _nextValidID
_nextValidID += 1
return value
}

/// Disconnect client from IB Gateway or Workstation
///
public func connect() throws {

if connection != nil { throw IBError.connectionError("Already connected")}

let newConnection = NWConnection(host: self.host, port: self.port, using: .tcp)
connection = IBConnection(newConnection)
connection?.didStopCallback = didStopCallback(error:)
connection?.delegate = self
connection?.start()

try self.startAPI(clientID: self.identifier)

}

/// Disconnect client from IB Gateway or Workstation
///
public func disconnect() {
if connection != nil {
connection?.stop()
connection = nil
subject.send(completion: .finished)
}
}

func send(encoder: IBEncoder) throws {
if connection == nil { throw IBError.serverError("No connection found") }
let requestDataWithLength = encoder.data.count.toBytes(size: 4) + encoder.data
connection?.send(data: requestDataWithLength)
}


func didStopCallback(error: Error?) {

subject.send(completion: .finished)

if error == nil {
exit(EXIT_SUCCESS)
} else {
exit(EXIT_FAILURE)
}

}

}
/// Return next valid request identifier you should use to make request or subscription

public var nextRequestID: Int {
let value = _nextValidID
_nextValidID += 1
return value
}

/// Disconnect client from IB Gateway or Workstation
///
public func connect() throws {
guard connection == nil else {
throw IBError.connectionError("Already connected")
}

let connection = try IBConnection(host: host, port: port)
connection.didStopCallback = didStopCallback(error:)
connection.stateDidChangeCallback = stateDidChange(to:)
connection.delegate = self
self.connection = connection
}

/// Disconnect client from IB Gateway or Workstation
///
public func disconnect() {
guard let connection else { return }
connection.disconnect()
self.connection = nil
subject.send(completion: .finished)
}

func send(encoder: IBEncoder) throws {
guard let connection else {
throw IBError.serverError("No connection found")
}
let requestDataWithLength = encoder.data.count.toBytes(size: 4) + encoder.data
connection.send(data: requestDataWithLength)
}


private func didStopCallback(error: Error?) {
subject.send(completion: .finished)

if error == nil {
exit(EXIT_SUCCESS)
} else {
exit(EXIT_FAILURE)
}
}

private func stateDidChange(to state: IBConnection.State) {
switch state {
case .connectedToAPI:
do {
try self.startAPI(clientID: self.identifier)
} catch {
print(error)
}
default:
break
}
}
}

public extension IBClient {

enum ConnectionType{
case gateway
case workstation

internal var host: String{
return "https://127.0.0.1"
}


internal var liveTradingPort: UInt16 {
switch self{
case .gateway: return 4001
case .workstation: return 7496
}
}

internal var simulatedTradingPort: UInt16{
switch self{
case .gateway: return 4002
case .workstation: return 7497
}
}

}

/// Creates new live trading client. All orders you send to broker, will be real and executed.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter type: Connection type you are using.

static func live(id: Int, type: ConnectionType = .gateway) -> IBClient {
return IBClient(id: id, address: type.host, port: type.liveTradingPort)
}

/// Creates new paper trading client, with simulated orders.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter type: Connection type you are using.
enum ConnectionType {
case gateway
case workstation

internal var host: String {
"https://127.0.0.1"
}

internal var liveTradingPort: Int {
switch self{
case .gateway: return 4001
case .workstation: return 7496
}
}

internal var simulatedTradingPort: Int {
switch self{
case .gateway: return 4002
case .workstation: return 7497
}
}

}

/// Creates new live trading client. All orders you send to broker, will be real and executed.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter type: Connection type you are using.

static func live(id: Int, type: ConnectionType = .gateway) -> IBClient {
IBClient(id: id, address: type.host, port: type.liveTradingPort)
}

/// Creates new paper trading client, with simulated orders.
/// - Parameter id: Master API ID, set in IB Gateway or Workstation
/// - Parameter type: Connection type you are using.

static func paper(id: Int, type: ConnectionType = .gateway) -> IBClient {
return IBClient(id: id, address: type.host, port: type.simulatedTradingPort)
}


static func paper(id: Int, type: ConnectionType = .gateway) -> IBClient {
IBClient(id: id, address: type.host, port: type.simulatedTradingPort)
}
}
75 changes: 75 additions & 0 deletions IBKit/IBKit/Client/IBClientFrameDecoder.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
// IBConnection.swift
// IBKit
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

import NIOCore

class IBClientFrameDecoder: ByteToMessageDecoder & NIOSingleStepByteToMessageDecoder {
typealias InboundIn = ByteBuffer
typealias InboundOut = ByteBuffer

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
if let frame = try self.readFrame(buffer: &buffer) {
context.fireChannelRead(wrapInboundOut(frame))
return .continue
} else {
return .needMoreData
}
}

func decode(buffer: inout NIOCore.ByteBuffer) throws -> NIOCore.ByteBuffer? {
return try self.readFrame(buffer: &buffer)
}

func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
while try self.decode(context: context, buffer: &buffer) == .continue {}
if buffer.readableBytes > 0 {
context.fireErrorCaught(IBError.failedToRead("leftover bytes"))
}
return .needMoreData
}

func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
let decoded = try self.decode(buffer: &buffer)
if buffer.readableBytes > 0 {
throw IBError.failedToRead("leftover bytes")
}
return decoded
}

private func readFrame(buffer: inout ByteBuffer) throws -> ByteBuffer? {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I did implemented correctly frames decoding, but I left some comments here. Let me know how does it look.

This handler is mostly responsible for reading data and packaging them up into frames.

guard buffer.readableBytes >= 4 else {
return nil
}

let lengthPrefix = buffer.getInteger(at: buffer.readerIndex, as: Int32.self)!
let frameLength = Int(lengthPrefix.littleEndian)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI:
I had error here, thought it is bigEndian, but it does appear to be littleEndian


guard buffer.readableBytes >= 4 + frameLength else {
return nil
}

buffer.moveReaderIndex(forwardBy: 4)
let frame = buffer.readSlice(length: frameLength)
return frame
}
}
Loading