Skip to content

Commit

Permalink
feat(pgsrv): Implement start of postgres extended query protocol (#117)
Browse files Browse the repository at this point in the history
* add extended query parse message

* parse step

* bind step

* Some Describe + Some Drop table

* close server on Terminate message

* Get further with extended query protocol (#127)

* chore: Update to datafusion 12 (#114)

* First draft of key layout using RocksDB (#116)

* First draft of key layout using RocksDB

* Additional future considerations from Sean

* feat: Add information_schema (#115)

* feat: Add information_schema

Fixes #98

Cloud will be making requests directly to the database to get info about the
contents of the database, including schemas, tables, and columns.

* fix: Remove datafusion-proto crate (#119)

We're not using if for anything yet, and this release seems to break building
container images.

```
error: builder for '/nix/store/75lddm4kg8mzn2x5nz8lg36gdj16p7ka-glaredb-cli-0.1.0.drv' failed with exit code 101;
       last 10 log lines:
       > Caused by:
       >   process didn't exit successfully: `/build/source/target/release/build/datafusion-proto-497b9ae0fe438eda/build-script-build` (exit status: 1)
       >   --- stdout
       >   cargo:rerun-if-env-changed=FORCE_REBUILD
       >   cargo:rerun-if-changed=proto/datafusion.proto
       >   Running: "/nix/store/2qg94y58v1jr4dw360bmpxlrs30m31ca-protobuf-3.19.4/bin/protoc" "--include_imports" "--include_source_info" "-o" "/build/prost-buildFXFfZG/prost-descriptor-set" "-I" "proto" "-I" "/nix/store/2qg94y58v1jr4dw360bmpxlrs30m31ca-protobuf-3.19.4/include" "proto/datafusion.proto"
       >
       >   --- stderr
       >   Error: "protobuf compilation failed: Permission denied (os error 13)"
       > warning: build failed, waiting for other jobs to finish...
       For full logs, run 'nix log /nix/store/75lddm4kg8mzn2x5nz8lg36gdj16p7ka-glaredb-cli-0.1.0.drv'.
```

Possibly related: apache/datafusion#3538

* feat: Implement raft via gRPC (#63)

* Replace toy-rpc with tonic gRPC

* implement glaredb cli for raft nodes and client

* current progress

* implement begin, allocate_table, and get_schema

* implement scan

* implement insert

* cleanup

* comment out old tests

* clean up ConsensusClient

* Implement change membership command

* rewrite cluster tests to use RaftClientSource

* add protoc to CI

* switch raft to in-memory implementation

* Remove application logic from raft cluster tests

* cargo fmt

* add tracing to RPC impls

* Remove lemur from raft crate

* remove raft_client example

* Apply suggestions from code review

Co-authored-by: Sean Smith <scsmithr@gmail.com>

* remove protoc from ci

* Remove lemur_impl from raft crate

* Store tonic clients instead of endpoint in ConsensusClient

* use shared n_retries

* Add default num_retries

* Apply suggestions from code review

Co-authored-by: Rustom Shareef <2547411+RustomMS@users.noreply.github.com>

* moved some mod.rs modules into their parent directories

* implement ConsensusClient retry to find leader using macro

* Fix missing delimiter

* fix clippy issues

* rewrite retry_rpc_on_leader macro to evaluate to an expression

* remove panics in rpc server impls

Co-authored-by: Sean Smith <scsmithr@gmail.com>
Co-authored-by: Rustom Shareef <2547411+RustomMS@users.noreply.github.com>

* build(nix): Use crane to cache cargo dependencies (#121)

* Add crane

* switch rust toolchain to come from fenix

* touch buildscript before executing cargo build

* add clippy and build checks

* rename arrowstore build script

* rename raft build script

* Send back BindComplete intead of ParseComplete

Also moved sending results into its own function since we need to send results
back after Execute commands complete.

* Add logical plan stub for SETting runtime vars

Also fixes logic for checking pg message length.

Co-authored-by: Rustom Shareef <2547411+RustomMS@users.noreply.github.com>
Co-authored-by: Justin Rubek <25621857+justinrubek@users.noreply.github.com>

* fix clippy issues

* replaced dbg usage with tracing

* updated some comments

* comment terminate message

Co-authored-by: Sean Smith <scsmithr@gmail.com>
Co-authored-by: Rustom Shareef <2547411+RustomMS@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 27, 2022
1 parent 34a5eac commit f980994
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 51 deletions.
91 changes: 89 additions & 2 deletions crates/pgsrv/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,80 @@ impl PgCodec {
})
}

fn decode_parse(buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
let name = buf.read_cstring()?.to_string();
let sql = buf.read_cstring()?.to_string();
let num_params = buf.get_i16() as usize;
let mut param_types = Vec::with_capacity(num_params);
for _ in 0..num_params {
param_types.push(buf.get_i32());
}
Ok(FrontendMessage::Parse {
name,
sql,
param_types,
})
}

fn decode_bind(buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
let portal = buf.read_cstring()?.to_string();
let statement = buf.read_cstring()?.to_string();

let num_params = buf.get_i16() as usize;
let mut param_formats = Vec::with_capacity(num_params);
for _ in 0..num_params {
param_formats.push(buf.get_i16());
}

let num_values = buf.get_i16() as usize; // must match num_params
let mut param_values = Vec::with_capacity(num_values);
for _ in 0..num_values {
let len = buf.get_i32();
if len == -1 {
param_values.push(None);
} else {
let mut val = vec![0; len as usize];
buf.copy_to_slice(&mut val);
param_values.push(Some(val));
}
}

let num_params = buf.get_i16() as usize;
let mut result_formats = Vec::with_capacity(num_params);
for _ in 0..num_params {
result_formats.push(buf.get_i16());
}

Ok(FrontendMessage::Bind {
portal,
statement,
param_formats,
param_values,
result_formats,
})
}

fn decode_describe(buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
let object_type = buf.get_u8().try_into()?;
let name = buf.read_cstring()?.to_string();

Ok(FrontendMessage::Describe { object_type, name })
}

fn decode_execute(buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
let portal = buf.read_cstring()?.to_string();
let max_rows = buf.get_i32();
Ok(FrontendMessage::Execute { portal, max_rows })
}

fn decode_sync(_buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
Ok(FrontendMessage::Sync)
}

fn decode_terminate(_buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
Ok(FrontendMessage::Terminate)
}

fn encode_scalar_as_text(scalar: ScalarValue, buf: &mut BytesMut) -> Result<()> {
if scalar.is_null() {
buf.put_i32(-1);
Expand Down Expand Up @@ -187,6 +261,9 @@ impl Encoder<BackendMessage> for PgCodec {
BackendMessage::DataRow(_, _) => b'D',
BackendMessage::ErrorResponse(_) => b'E',
BackendMessage::NoticeResponse(_) => b'N',
BackendMessage::ParseComplete => b'1',
BackendMessage::BindComplete => b'2',
BackendMessage::NoData => b'n',
};
dst.put_u8(byte);

Expand All @@ -198,6 +275,9 @@ impl Encoder<BackendMessage> for PgCodec {
BackendMessage::AuthenticationOk => dst.put_i32(0),
BackendMessage::AuthenticationCleartextPassword => dst.put_i32(3),
BackendMessage::EmptyQueryResponse => (),
BackendMessage::ParseComplete => (),
BackendMessage::BindComplete => (),
BackendMessage::NoData => (),
BackendMessage::ParameterStatus { key, val } => {
dst.put_cstring(&key);
dst.put_cstring(&val);
Expand Down Expand Up @@ -284,8 +364,8 @@ impl Decoder for PgCodec {
let msg_len = i32::from_be_bytes(src[1..5].try_into().unwrap()) as usize;

// Not enough bytes to read the full message yet.
if src.len() < msg_len {
src.reserve(msg_len - src.len());
if src.len() < msg_len + 1 {
src.reserve(msg_len + 1 - src.len());
return Ok(None);
}

Expand All @@ -296,6 +376,13 @@ impl Decoder for PgCodec {
let msg = match msg_type {
b'Q' => Self::decode_query(&mut buf)?,
b'p' => Self::decode_password(&mut buf)?,
b'P' => Self::decode_parse(&mut buf)?,
b'B' => Self::decode_bind(&mut buf)?,
b'D' => Self::decode_describe(&mut buf)?,
b'E' => Self::decode_execute(&mut buf)?,
b'S' => Self::decode_sync(&mut buf)?,
// X - Terminate
b'X' => return Ok(None),
other => return Err(PgSrvError::InvalidMsgType(other)),
};

Expand Down
3 changes: 3 additions & 0 deletions crates/pgsrv/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub enum PgSrvError {
#[error("missing null byte")]
MissingNullByte,

#[error("unexpected describe object type: {0}")]
UnexpectedDescribeObjectType(u8),

/// We've received an unexpected message identifier from the frontend.
/// Includes the char representation to allow for easy cross referencing
/// with the Postgres message format documentation.
Expand Down
Loading

0 comments on commit f980994

Please sign in to comment.