Skip to content

Commit

Permalink
store: Allow using Bytes as the primary key type
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 16, 2020
1 parent d203a12 commit 230afc0
Show file tree
Hide file tree
Showing 4 changed files with 436 additions and 29 deletions.
29 changes: 27 additions & 2 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,15 @@ pub enum ColumnType {
/// A user-defined enum. The string contains the name of the Postgres
/// enum we created for it, fully qualified with the schema
Enum(SqlName),
/// A `bytea` in SQL, represented as a ValueType::String; this is
/// used for `id` columns of type `Bytes`
BytesId,
}

impl From<IdType> for ColumnType {
fn from(id_type: IdType) -> Self {
match id_type {
IdType::Bytes => ColumnType::Bytes,
IdType::Bytes => ColumnType::BytesId,
IdType::String => ColumnType::String,
}
}
Expand Down Expand Up @@ -697,6 +700,17 @@ impl ColumnType {
ColumnType::String => "text",
ColumnType::TSVector(_) => "tsvector",
ColumnType::Enum(name) => name.as_str(),
ColumnType::BytesId => "bytea",
}
}

/// Return the `IdType` corresponding to this column type. This can only
/// be called on a column that stores an `ID` and will panic otherwise
pub(crate) fn id_type(&self) -> IdType {
match self {
ColumnType::String => IdType::String,
ColumnType::BytesId => IdType::Bytes,
_ => unreachable!("only String and Bytes are allowed as primary keys"),
}
}
}
Expand Down Expand Up @@ -776,6 +790,10 @@ impl Column {
named_type(&self.field_type) == "fulltext"
}

pub fn is_primary_key(&self) -> bool {
self.name.as_str() == PRIMARY_KEY_COLUMN
}

/// Return `true` if this column stores user-supplied text. Such
/// columns may contain very large values and need to be handled
/// specially for indexing
Expand All @@ -794,7 +812,7 @@ impl Column {
if self.is_list() {
write!(out, "[]")?;
}
if self.name.0 == PRIMARY_KEY_COLUMN || !self.is_nullable() {
if self.is_primary_key() || !self.is_nullable() {
write!(out, " not null")?;
}
Ok(())
Expand Down Expand Up @@ -878,6 +896,13 @@ impl Table {
.ok_or_else(|| StoreError::UnknownField(field.to_string()))
}

pub fn primary_key(&self) -> &Column {
self.columns
.iter()
.find(|column| column.is_primary_key())
.expect("every table has a primary key")
}

/// Generate the DDL for one table, i.e. one `create table` statement
/// and all `create index` statements for the table's columns
///
Expand Down
120 changes: 94 additions & 26 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use diesel::pg::{Pg, PgConnection};
use diesel::query_builder::{AstPass, QueryFragment, QueryId};
use diesel::query_dsl::{LoadQuery, RunQueryDsl};
use diesel::result::QueryResult;
use diesel::result::{Error as DieselError, QueryResult};
use diesel::sql_types::{Array, Binary, Bool, Integer, Jsonb, Numeric, Range, Text};
use diesel::Connection;
use std::collections::{BTreeMap, HashSet};
Expand All @@ -28,9 +28,56 @@ use crate::block_range::{
};
use crate::entities::STRING_PREFIX_SIZE;
use crate::filter::UnsupportedFilter;
use crate::relational::{Column, ColumnType, Layout, SqlName, Table, PRIMARY_KEY_COLUMN};
use crate::relational::{Column, ColumnType, IdType, Layout, SqlName, Table, PRIMARY_KEY_COLUMN};
use crate::sql_value::SqlValue;

fn str_as_bytes(id: &str) -> QueryResult<scalar::Bytes> {
scalar::Bytes::from_str(&id).map_err(|e| DieselError::SerializationError(Box::new(e)))
}

/// Convert Postgres string representation of bytes "\xdeadbeef"
/// to ours of just "deadbeef".
fn bytes_as_str(id: &str) -> String {
id.trim_start_matches("\\x").to_owned()
}

/// Conveniences for handling primary keys depending on whether we are using
/// `IdType::Bytes` or `IdType::String` as the primary key
struct PrimaryKey {}

impl PrimaryKey {
fn eq(table: &Table, id: &str, out: &mut AstPass<Pg>) -> QueryResult<()> {
let pk = table.primary_key();

out.push_sql(pk.name.as_str());
out.push_sql(" = ");
match pk.column_type.id_type() {
IdType::String => out.push_bind_param::<Text, _>(&id),
IdType::Bytes => out.push_bind_param::<Binary, _>(&str_as_bytes(&id)?.as_slice()),
}
}

fn is_in(table: &Table, ids: &Vec<&str>, out: &mut AstPass<Pg>) -> QueryResult<()> {
let pk = table.primary_key();

out.push_sql(pk.name.as_str());
out.push_sql(" = any(");
match pk.column_type.id_type() {
IdType::String => out.push_bind_param::<Array<Text>, _>(ids)?,
IdType::Bytes => {
let ids = ids
.into_iter()
.map(|id| str_as_bytes(id))
.collect::<Result<Vec<scalar::Bytes>, _>>()?;
let id_slices = ids.iter().map(|id| id.as_slice()).collect::<Vec<_>>();
out.push_bind_param::<Array<Binary>, _>(&id_slices)?;
}
}
out.push_sql(")");
Ok(())
}
}

/// Helper struct for retrieving entities from the database. With diesel, we
/// can only run queries that return columns whose number and type are known
/// at compile time. Because of that, we retrieve the actual data for an
Expand Down Expand Up @@ -105,6 +152,7 @@ impl EntityData {
StoreError::Unknown(format_err!("failed to convert {} to Bytes: {}", s, e))
})
}
(j::String(s), ColumnType::BytesId) => Ok(g::String(bytes_as_str(&s))),
(j::String(s), column_type) => Err(StoreError::Unknown(format_err!(
"can not convert string {} to {:?}",
s,
Expand Down Expand Up @@ -186,6 +234,11 @@ impl<'a> QueryFragment<Pg> for QueryValue<'a> {
out.push_sql(")");
Ok(())
}
ColumnType::Bytes | ColumnType::BytesId => {
let bytes = scalar::Bytes::from_str(&s)
.map_err(|e| DieselError::SerializationError(Box::new(e)))?;
out.push_bind_param::<Binary, _>(&bytes.as_slice())
}
_ => unreachable!(
"only string, enum and tsvector columns have values of type string"
),
Expand Down Expand Up @@ -237,6 +290,7 @@ impl<'a> QueryFragment<Pg> for QueryValue<'a> {
out.push_sql("))");
Ok(())
}
ColumnType::BytesId => out.push_bind_param::<Array<Binary>, _>(&values),
}
}
Value::Null => {
Expand Down Expand Up @@ -810,9 +864,7 @@ impl<'a> QueryFragment<Pg> for FindQuery<'a> {
out.push_sql(" from ");
out.push_sql(self.table.qualified_name.as_str());
out.push_sql(" e\n where ");
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" = ");
out.push_bind_param::<Text, _>(&self.id)?;
PrimaryKey::eq(&self.table, &self.id, &mut out)?;
out.push_sql(" and ");
BlockRangeContainsClause::new("e.", self.block).walk_ast(out)
}
Expand Down Expand Up @@ -864,10 +916,8 @@ impl<'a> QueryFragment<Pg> for FindManyQuery<'a> {
out.push_sql(" from ");
out.push_sql(table.qualified_name.as_str());
out.push_sql(" e\n where ");
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" = any(");
out.push_bind_param::<Array<Text>, _>(&self.ids_for_type[table.object.as_str()])?;
out.push_sql(") and ");
PrimaryKey::is_in(table, &self.ids_for_type[table.object.as_str()], &mut out)?;
out.push_sql(" and ");
BlockRangeContainsClause::new("e.", self.block).walk_ast(out.reborrow())?;
}
Ok(())
Expand Down Expand Up @@ -1013,7 +1063,7 @@ impl<'a> QueryFragment<Pg> for UpdateQuery<'a> {
fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let updateable = { |column: &Column| column.name.as_str() != PRIMARY_KEY_COLUMN };
let updateable = { |column: &Column| !column.is_primary_key() };

// Construct a query
// update schema.table1
Expand Down Expand Up @@ -1050,9 +1100,7 @@ impl<'a> QueryFragment<Pg> for UpdateQuery<'a> {
QueryValue(value, &column.column_type).walk_ast(out.reborrow())?;
}
out.push_sql("\n where ");
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" = ");
out.push_bind_param::<Text, _>(&self.key.entity_id)?;
PrimaryKey::eq(&self.table, &self.key.entity_id, &mut out)?;
Ok(())
}
}
Expand All @@ -1079,13 +1127,10 @@ impl<'a> QueryFragment<Pg> for DeleteQuery<'a> {
// Construct a query
// delete from table
// where id = $key.entity_id
// returning id
out.push_sql("delete from ");
out.push_sql(self.table.qualified_name.as_str());
out.push_sql("\n where ");
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" = ");
out.push_bind_param::<Text, _>(&self.key.entity_id)
PrimaryKey::eq(&self.table, &self.key.entity_id, &mut out)
}
}

Expand Down Expand Up @@ -1520,7 +1565,7 @@ impl<'a> SortKey<'a> {
fn select(&self, out: &mut AstPass<Pg>) -> QueryResult<()> {
if let Some(column) = self.column {
let name = column.name.as_str();
if name != PRIMARY_KEY_COLUMN {
if !column.is_primary_key() {
out.push_sql(", c.");
out.push_identifier(name)?;
}
Expand Down Expand Up @@ -1969,9 +2014,7 @@ impl<'a> QueryFragment<Pg> for ClampRangeQuery<'a> {
out.push_sql("), ");
out.push_bind_param::<Integer, _>(&self.block)?;
out.push_sql(")\n where ");
out.push_identifier(PRIMARY_KEY_COLUMN)?;
out.push_sql(" = ");
out.push_bind_param::<Text, _>(&self.key.entity_id)?;
PrimaryKey::eq(&self.table, &self.key.entity_id, &mut out)?;
out.push_sql(" and (");
out.push_sql(BLOCK_RANGE_CURRENT);
out.push_sql(")");
Expand All @@ -1995,8 +2038,24 @@ pub struct RevertEntityData {
pub id: String,
}

impl RevertEntityData {
/// Convert primary key ids from Postgres' internal form to the format we
/// use by stripping `\\x` off the front of bytes strings
fn convert(table: &Table, mut data: Vec<RevertEntityData>) -> Vec<RevertEntityData> {
match table.primary_key().column_type.id_type() {
IdType::String => data,
IdType::Bytes => {
for entry in data.iter_mut() {
entry.id = bytes_as_str(&entry.id);
}
data
}
}
}
}

/// A query that removes all versions whose block range lies entirely
/// beyond `block`
/// beyond `block`.
#[derive(Debug, Clone, Constructor)]
pub struct RevertRemoveQuery<'a> {
table: &'a Table,
Expand All @@ -2018,7 +2077,9 @@ impl<'a> QueryFragment<Pg> for RevertRemoveQuery<'a> {
out.push_sql(") >= ");
out.push_bind_param::<Integer, _>(&self.block)?;
out.push_sql("\nreturning ");
out.push_identifier(PRIMARY_KEY_COLUMN)
out.push_sql(PRIMARY_KEY_COLUMN);
out.push_sql("::text");
Ok(())
}
}

Expand All @@ -2031,13 +2092,14 @@ impl<'a> QueryId for RevertRemoveQuery<'a> {
impl<'a> LoadQuery<PgConnection, RevertEntityData> for RevertRemoveQuery<'a> {
fn internal_load(self, conn: &PgConnection) -> QueryResult<Vec<RevertEntityData>> {
conn.query_by_name(&self)
.map(|data| RevertEntityData::convert(&self.table, data))
}
}

impl<'a, Conn> RunQueryDsl<Conn> for RevertRemoveQuery<'a> {}

/// A query that unclamps the block range of all versions that contain
/// `block` by setting the upper bound of the block range to infinity
/// `block` by setting the upper bound of the block range to infinity.
#[derive(Debug, Clone, Constructor)]
pub struct RevertClampQuery<'a> {
table: &'a Table,
Expand Down Expand Up @@ -2067,7 +2129,9 @@ impl<'a> QueryFragment<Pg> for RevertClampQuery<'a> {
out.push_sql(" and not ");
out.push_sql(BLOCK_RANGE_CURRENT);
out.push_sql("\nreturning ");
out.push_identifier(PRIMARY_KEY_COLUMN)
out.push_sql(PRIMARY_KEY_COLUMN);
out.push_sql("::text");
Ok(())
}
}

Expand All @@ -2080,6 +2144,7 @@ impl<'a> QueryId for RevertClampQuery<'a> {
impl<'a> LoadQuery<PgConnection, RevertEntityData> for RevertClampQuery<'a> {
fn internal_load(self, conn: &PgConnection) -> QueryResult<Vec<RevertEntityData>> {
conn.query_by_name(&self)
.map(|data| RevertEntityData::convert(&self.table, data))
}
}

Expand Down Expand Up @@ -2156,7 +2221,9 @@ impl<'a> QueryFragment<Pg> for DeleteByPrefixQuery<'a> {
out.push_sql(") = any(");
out.push_bind_param::<Array<Text>, _>(&self.prefixes)?;
out.push_sql(")\nreturning ");
out.push_identifier(PRIMARY_KEY_COLUMN)
out.push_sql(PRIMARY_KEY_COLUMN);
out.push_sql("::text");
Ok(())
}
}

Expand All @@ -2169,6 +2236,7 @@ impl<'a> QueryId for DeleteByPrefixQuery<'a> {
impl<'a> LoadQuery<PgConnection, RevertEntityData> for DeleteByPrefixQuery<'a> {
fn internal_load(self, conn: &PgConnection) -> QueryResult<Vec<RevertEntityData>> {
conn.query_by_name(&self)
.map(|data| RevertEntityData::convert(&self.table, data))
}
}

Expand Down
6 changes: 5 additions & 1 deletion store/postgres/src/sql_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use diesel::pg::Pg;
use diesel::serialize::{self, Output, ToSql};
use diesel::sql_types::{Binary, Bool, Integer, Numeric, Text};
use std::io::Write;
use std::str::FromStr;

use graph::data::store::Value;
use graph::data::store::{scalar, Value};

#[derive(Clone, Debug, PartialEq, AsExpression)]
pub struct SqlValue(Value);
Expand Down Expand Up @@ -58,6 +59,9 @@ impl ToSql<Binary, Pg> for SqlValue {
fn to_sql<W: Write>(&self, out: &mut Output<W, Pg>) -> serialize::Result {
match self.0 {
Value::Bytes(ref h) => <_ as ToSql<Binary, Pg>>::to_sql(&h.as_slice(), out),
Value::String(ref s) => {
<_ as ToSql<Binary, Pg>>::to_sql(scalar::Bytes::from_str(s)?.as_slice(), out)
}
_ => panic!("Failed to convert attribute value to Bytes in SQL"),
}
}
Expand Down
Loading

0 comments on commit 230afc0

Please sign in to comment.