Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jq/0.7 #198

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/AWS4AuthRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ using ..URIs
using ..Pairs: getkv, setkv, rmkv
import ..@debug, ..DEBUG_LEVEL


"""
request(AWS4AuthLayer, ::URI, ::Request, body) -> HTTP.Response

Expand All @@ -19,7 +18,6 @@ Add a [AWS Signature Version 4](http://docs.aws.amazon.com/general/latest/gr/sig
Credentials are read from environment variables `AWS_ACCESS_KEY_ID`,
`AWS_SECRET_ACCESS_KEY` and `AWS_SESSION_TOKEN`.
"""

abstract type AWS4AuthLayer{Next <: Layer} <: Layer end
export AWS4AuthLayer

Expand All @@ -38,7 +36,6 @@ function request(::Type{AWS4AuthLayer{Next}},
return request(Next, url, req, body; kw...)
end


function sign_aws4!(method::String,
url::URI,
headers::Headers,
Expand All @@ -53,7 +50,6 @@ function sign_aws4!(method::String,
aws_session_token::String=get(ENV, "AWS_SESSION_TOKEN", ""),
kw...)


# ISO8601 date/time strings for time of request...
date = Dates.format(t, dateformat"yyyymmdd")
datetime = Dates.format(t, dateformat"yyyymmddTHHMMSS\Z")
Expand Down Expand Up @@ -128,7 +124,6 @@ credentials = NamedTuple()
Load Credentials from [AWS CLI ~/.aws/credentials file]
(http://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html).
"""

function dot_aws_credentials()::NamedTuple

global credentials
Expand Down
2 changes: 0 additions & 2 deletions src/BasicAuthRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ using ..URIs
using ..Pairs: getkv, setkv
import ..@debug, ..DEBUG_LEVEL


"""
request(BasicAuthLayer, method, ::URI, headers, body) -> HTTP.Response

Add `Authorization: Basic` header using credentials from url userinfo.
"""

abstract type BasicAuthLayer{Next <: Layer} <: Layer end
export BasicAuthLayer

Expand Down
8 changes: 2 additions & 6 deletions src/CanonicalizeRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ module CanonicalizeRequest

import ..Layer, ..request
using ..Messages
using ..Strings.tocameldash

using ..Strings: tocameldash

"""
request(CanonicalizeLayer, method, ::URI, headers, body) -> HTTP.Response

Rewrite request and response headers in Canonical-Camel-Dash-Format.
"""

abstract type CanonicalizeLayer{Next <: Layer} <: Layer end
export CanonicalizeLayer

Expand All @@ -26,8 +24,6 @@ function request(::Type{CanonicalizeLayer{Next}},
return res
end


canonicalizeheaders(h::T) where T = T([tocameldash(k) => v for (k,v) in h])

canonicalizeheaders(h::T) where {T} = T([tocameldash(k) => v for (k,v) in h])

end # module CanonicalizeRequest
41 changes: 0 additions & 41 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ When the `request` function has read the Response Message it calls
`closeread` to signal that the `Connection` can be reused for
reading.
"""

module ConnectionPool

export Connection, Transaction,
Expand All @@ -35,7 +34,6 @@ import ..@debug, ..@debugshow, ..DEBUG_LEVEL, ..taskid
import ..@require, ..precondition_error, ..@ensure, ..postcondition_error
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!


const default_connection_limit = 8
const default_pipeline_limit = 16
const nolimit = typemax(Int)
Expand All @@ -44,7 +42,6 @@ const nobytes = view(UInt8[], 1:0)
byteview(bytes::ByteView) = bytes
byteview(bytes)::ByteView = view(bytes, 1:length(bytes))


"""
Connection{T <: IO}

Expand All @@ -67,7 +64,6 @@ Fields:
- `readdone`, signal that `readcount` was incremented.
- `timestamp`, time data was last recieved.
"""

mutable struct Connection{T <: IO}
host::String
port::String
Expand All @@ -86,21 +82,18 @@ mutable struct Connection{T <: IO}
timestamp::Float64
end


"""
A single pipelined HTTP Request/Response transaction`.

Fields:
- `c`, the shared [`Connection`](@ref) used for this `Transaction`.
- `sequence::Int`, identifies this `Transaction` among the others that share `c`.
"""

struct Transaction{T <: IO} <: IO
c::Connection{T}
sequence::Int
end


Connection(host::AbstractString, port::AbstractString,
pipeline_limit::Int, io::T) where T <: IO =
Connection{T}(host, port, pipeline_limit,
Expand All @@ -122,10 +115,8 @@ function client_transaction(c)
return t
end


getrawstream(t::Transaction) = t.c.io


inactiveseconds(t::Transaction) = inactiveseconds(t.c)

function inactiveseconds(c::Connection)::Float64
Expand All @@ -135,7 +126,6 @@ function inactiveseconds(c::Connection)::Float64
return time() - c.timestamp
end


Base.unsafe_write(t::Transaction, p::Ptr{UInt8}, n::UInt) =
unsafe_write(t.c.io, p, n)

Expand All @@ -157,12 +147,10 @@ bytesavailable(t::Transaction) = bytesavailable(t.c)
bytesavailable(c::Connection) =
!isempty(c.excess) ? length(c.excess) : bytesavailable(c.io)


Base.isreadable(t::Transaction) = t.c.readbusy && t.c.readcount == t.sequence

Base.iswritable(t::Transaction) = t.c.writebusy && t.c.writecount == t.sequence


function Base.read(t::Transaction, nb::Integer)::ByteView
bytes = readavailable(t)
l = length(bytes)
Expand All @@ -173,7 +161,6 @@ function Base.read(t::Transaction, nb::Integer)::ByteView
return bytes
end


function Base.readavailable(t::Transaction)::ByteView
@require isreadable(t)
if !isempty(t.c.excess)
Expand All @@ -188,14 +175,12 @@ function Base.readavailable(t::Transaction)::ByteView
return bytes
end


"""
unread!(::Transaction, bytes)

Push bytes back into a connection's `excess` buffer
(to be returned by the next read).
"""

function IOExtras.unread!(t::Transaction, bytes::ByteView)
@require isreadable(t)
@require !isempty(bytes)
Expand All @@ -204,13 +189,11 @@ function IOExtras.unread!(t::Transaction, bytes::ByteView)
return
end


"""
startwrite(::Transaction)

Wait for prior pending writes to complete.
"""

function IOExtras.startwrite(t::Transaction)
@require !iswritable(t) ;t.c.writecount != t.sequence &&
@debug 1 "⏳ Wait write: $t"
Expand All @@ -222,13 +205,11 @@ function IOExtras.startwrite(t::Transaction)
return
end


"""
closewrite(::Transaction)

Signal that an entire Request Message has been written to the `Transaction`.
"""

function IOExtras.closewrite(t::Transaction)
@require iswritable(t)

Expand All @@ -241,13 +222,11 @@ function IOExtras.closewrite(t::Transaction)
return
end


"""
startread(::Transaction)

Wait for prior pending reads to complete.
"""

function IOExtras.startread(t::Transaction)
@require !isreadable(t) ;t.c.readcount != t.sequence &&
@debug 1 "⏳ Wait read: $t"
Expand All @@ -260,15 +239,13 @@ function IOExtras.startread(t::Transaction)
return
end


"""
closeread(::Transaction)

Signal that an entire Response Message has been read from the `Transaction`.

Increment `readcount` and wake up tasks waiting in `startread`.
"""

function IOExtras.closeread(t::Transaction)
@require isreadable(t)

Expand Down Expand Up @@ -301,13 +278,11 @@ function Base.close(c::Connection)
return
end


"""
purge(::Connection)

Remove unread data from a `Connection`.
"""

function purge(c::Connection)
@require !isopen(c.io)
while !eof(c.io)
Expand All @@ -317,7 +292,6 @@ function purge(c::Connection)
@ensure bytesavailable(c) == 0
end


"""
The `pool` is a collection of open `Connection`s. The `request`
function calls `getconnection` to retrieve a connection from the
Expand All @@ -327,7 +301,6 @@ for writing (to send the next Request). When the `request` function
has read the Response Message it calls `closeread` to signal that
the `Connection` can be reused for reading.
"""

const pool = Vector{Connection}()
const poollock = ReentrantLock()
const poolcondition = Condition()
Expand All @@ -337,7 +310,6 @@ const poolcondition = Condition()

Close all connections in `pool`.
"""

function closeall()

lock(poollock)
Expand All @@ -350,13 +322,11 @@ function closeall()
return
end


"""
findwritable(type, host, port) -> Vector{Connection}

Find `Connections` in the `pool` that are ready for writing.
"""

function findwritable(T::Type,
host::AbstractString,
port::AbstractString,
Expand All @@ -373,14 +343,12 @@ function findwritable(T::Type,
isopen(c.io)), pool)
end


"""
findoverused(type, host, port, reuse_limit) -> Vector{Connection}

Find `Connections` in the `pool` that are over the reuse limit
and have no more active readers.
"""

function findoverused(T::Type,
host::AbstractString,
port::AbstractString,
Expand All @@ -394,13 +362,11 @@ function findoverused(T::Type,
isopen(c.io)), pool)
end


"""
findall(type, host, port) -> Vector{Connection}

Find all `Connections` in the `pool` for `host` and `port`.
"""

function findall(T::Type,
host::AbstractString,
port::AbstractString,
Expand All @@ -413,7 +379,6 @@ function findall(T::Type,
isopen(c.io)), pool)
end


"""
purge()

Expand All @@ -424,14 +389,12 @@ function purge()
deleteat!(pool, map(isdeletable, pool))
end


"""
getconnection(type, host, port) -> Connection

Find a reusable `Connection` in the `pool`,
or create a new `Connection` if required.
"""

function getconnection(::Type{Transaction{T}},
host::AbstractString,
port::AbstractString;
Expand Down Expand Up @@ -488,7 +451,6 @@ function getconnection(::Type{Transaction{T}},
end
end


function getconnection(::Type{TCPSocket},
host::AbstractString,
port::AbstractString;
Expand All @@ -499,7 +461,6 @@ function getconnection(::Type{TCPSocket},
connect(getaddrinfo(host), p)
end


const nosslconfig = SSLConfig()
default_sslconfig = nothing
noverify_sslconfig = nothing
Expand Down Expand Up @@ -535,7 +496,6 @@ function getconnection(::Type{SSLContext},
return io
end


function Base.show(io::IO, c::Connection)
nwaiting = bytesavailable(tcpsocket(c.io))
print(
Expand All @@ -555,7 +515,6 @@ end

Base.show(io::IO, t::Transaction) = print(io, "T$(rpad(t.sequence,2)) ", t.c)


function tcpstatus(c::Connection)
s = Base.uv_status_string(tcpsocket(c.io))
if s == "connecting" return "🔜🔗"
Expand Down
Loading