diff --git a/graph/sql/cockroach/cockroach.go b/graph/sql/cockroach/cockroach.go index 69388b247..92dfbc570 100644 --- a/graph/sql/cockroach/cockroach.go +++ b/graph/sql/cockroach/cockroach.go @@ -22,6 +22,7 @@ func init() { TimeType: `timestamp with time zone`, NodesTableExtra: ` FAMILY fhash (hash), + FAMILY frefs (refs), FAMILY fvalue (value, value_string, datatype, language, iri, bnode, value_int, value_bool, value_float, value_time) `, @@ -32,19 +33,25 @@ func init() { // return "SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname='"+table+"';" //}, RunTx: runTxCockroach, + TxRetry: retryTxCockroach, NoSchemaChangesInTx: true, }) } +func runTxCockroach(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts) error { + // FIXME: on conflict for SPOL; blocked by CockroachDB not supporting empty ON CONFLICT statements + return postgres.RunTx(tx, nodes, quads, opts, `(subject_hash, predicate_hash, object_hash)`) +} + // AmbiguousCommitError represents an error that left a transaction in an // ambiguous state: unclear if it committed or not. type AmbiguousCommitError struct { error } -// runTxCockroach runs the transaction and will retry in case of a retryable error. +// retryTxCockroach runs the transaction and will retry in case of a retryable error. // https://www.cockroachlabs.com/docs/transactions.html#client-side-transaction-retries -func runTxCockroach(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error { +func retryTxCockroach(tx *sql.Tx, stmts func() error) error { // Specify that we intend to retry this txn in case of CockroachDB retryable // errors. if _, err := tx.Exec("SAVEPOINT cockroach_restart"); err != nil { @@ -54,8 +61,7 @@ func runTxCockroach(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error { for { released := false - // FIXME: on conflict for SPOL; blocked by CockroachDB not supporting empty ON CONFLICT statements - err := postgres.RunTx(tx, in, opts, `(subject_hash, predicate_hash, object_hash)`) + err := stmts() if err == nil { // RELEASE acts like COMMIT in CockroachDB. We use it since it gives us an diff --git a/graph/sql/cockroach/cockroach_test.go b/graph/sql/cockroach/cockroach_test.go index 274a9128b..26a96f19f 100644 --- a/graph/sql/cockroach/cockroach_test.go +++ b/graph/sql/cockroach/cockroach_test.go @@ -3,19 +3,19 @@ package cockroach import ( - "testing" "database/sql" + "testing" "github.com/cayleygraph/cayley/graph" - "github.com/cayleygraph/cayley/internal/dock" "github.com/cayleygraph/cayley/graph/sql/sqltest" + "github.com/cayleygraph/cayley/internal/dock" "github.com/lib/pq" ) func makeCockroach(t testing.TB) (string, graph.Options, func()) { var conf dock.Config - conf.Image = "cockroachdb/cockroach:v1.0.5" + conf.Image = "cockroachdb/cockroach:v1.1.2" conf.Cmd = []string{"start", "--insecure"} addr, closer := dock.RunAndWait(t, conf, func(addr string) bool { @@ -26,19 +26,19 @@ func makeCockroach(t testing.TB) (string, graph.Options, func()) { conn.Close() return true }) - addr = `postgresql://root@`+addr+`:26257` - db, err := sql.Open(driverName,addr+`?sslmode=disable`) + addr = `postgresql://root@` + addr + `:26257` + db, err := sql.Open(driverName, addr+`?sslmode=disable`) if err != nil { closer() t.Fatal(err) } defer db.Close() const dbName = "cayley" - if _, err = db.Exec("CREATE DATABASE "+dbName); err != nil { + if _, err = db.Exec("CREATE DATABASE " + dbName); err != nil { closer() t.Fatal(err) } - addr = addr + `/`+dbName+`?sslmode=disable` + addr = addr + `/` + dbName + `?sslmode=disable` return addr, nil, func() { closer() } @@ -46,7 +46,7 @@ func makeCockroach(t testing.TB) (string, graph.Options, func()) { func TestCockroach(t *testing.T) { sqltest.TestAll(t, Type, makeCockroach, &sqltest.Config{ - TimeRound: true, - SkipIntHorizon: true, + TimeRound: true, + SkipIntHorizon: true, }) } diff --git a/graph/sql/database.go b/graph/sql/database.go index 6f10abafd..4937c8ecd 100644 --- a/graph/sql/database.go +++ b/graph/sql/database.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/cayleygraph/cayley/graph" + "github.com/cayleygraph/cayley/quad" ) var types = make(map[string]Registration) @@ -18,6 +19,17 @@ func Register(name string, f Registration) { registerQuadStore(name, name) } +type NodeUpdate struct { + Hash NodeHash + Val quad.Value + RefInc int +} + +type QuadUpdate struct { + Quad QuadHashes + Del bool +} + type Registration struct { Driver string // sql driver to use on dial HashType string // type for hash fields @@ -34,7 +46,8 @@ type Registration struct { Error func(error) error // error conversion function Estimated func(table string) string // query that string that returns an estimated number of rows in table - RunTx func(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error + RunTx func(tx *sql.Tx, nodes []NodeUpdate, quads []QuadUpdate, opts graph.IgnoreOpts) error + TxRetry func(tx *sql.Tx, stmts func() error) error NoSchemaChangesInTx bool } @@ -57,6 +70,7 @@ func (r Registration) nodesTable() string { } return `CREATE TABLE nodes ( hash ` + htyp + ` PRIMARY KEY, + refs INT NOT NULL, value ` + btyp + `, value_string TEXT, datatype TEXT, diff --git a/graph/sql/mysql/mysql.go b/graph/sql/mysql/mysql.go index a28807764..d98764c1e 100644 --- a/graph/sql/mysql/mysql.go +++ b/graph/sql/mysql/mysql.go @@ -38,131 +38,86 @@ func init() { }) } -func runTxMysql(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error { - ignore := "" - if opts.IgnoreDup { - ignore = " IGNORE" - } - +func runTxMysql(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts) error { + // update node ref counts and insert nodes var ( - insertQuad *sql.Stmt - insertValue map[csql.ValueType]*sql.Stmt // prepared statements for each value type - inserted map[csql.NodeHash]struct{} // tracks already inserted values - - deleteQuad *sql.Stmt - deleteTriple *sql.Stmt + // prepared statements for each value type + insertValue = make(map[csql.ValueType]*sql.Stmt) + updateValue *sql.Stmt ) - - var err error - for _, d := range in { - switch d.Action { - case graph.Add: - if insertQuad == nil { - insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`) - if err != nil { - return err - } - insertValue = make(map[csql.ValueType]*sql.Stmt) - inserted = make(map[csql.NodeHash]struct{}, len(in)) + for _, n := range nodes { + if n.RefInc >= 0 { + nodeKey, values, err := csql.NodeValues(n.Hash, n.Val) + if err != nil { + return err } - var hs, hp, ho, hl csql.NodeHash - for _, dir := range quad.Directions { - v := d.Quad.Get(dir) - if v == nil { - continue - } - h := csql.HashOf(v) - switch dir { - case quad.Subject: - hs = h - case quad.Predicate: - hp = h - case quad.Object: - ho = h - case quad.Label: - hl = h - } - if !h.Valid() { - continue - } else if _, ok := inserted[h]; ok { - continue - } - nodeKey, values, err := csql.NodeValues(h, v) - if err != nil { - return err - } - stmt, ok := insertValue[nodeKey] - if !ok { - var ph = make([]string, len(values)-1) - for i := range ph { - ph[i] = "?" - } - stmt, err = tx.Prepare(`INSERT IGNORE INTO nodes(hash, ` + - strings.Join(nodeKey.Columns(), ", ") + - `) VALUES (?, ` + - strings.Join(ph, ", ") + - `);`) - if err != nil { - return err - } - insertValue[nodeKey] = stmt + values = append([]interface{}{n.RefInc}, values...) + values = append(values, n.RefInc) // one more time for UPDATE + stmt, ok := insertValue[nodeKey] + if !ok { + var ph = make([]string, len(values)-1) // excluding last increment + for i := range ph { + ph[i] = "?" } - _, err = stmt.Exec(values...) - err = convInsertErrorMySql(err) + stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` + + strings.Join(nodeKey.Columns(), ", ") + + `) VALUES (` + strings.Join(ph, ", ") + + `) ON DUPLICATE KEY UPDATE refs = refs + ?;`) if err != nil { - clog.Errorf("couldn't exec INSERT statement: %v", err) return err } - inserted[h] = struct{}{} + insertValue[nodeKey] = stmt } - _, err := insertQuad.Exec( - hs.SQLValue(), hp.SQLValue(), ho.SQLValue(), hl.SQLValue(), - ) + _, err = stmt.Exec(values...) err = convInsertErrorMySql(err) if err != nil { clog.Errorf("couldn't exec INSERT statement: %v", err) return err } - case graph.Delete: - if deleteQuad == nil { - deleteQuad, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=? and predicate_hash=? and object_hash=? and label_hash=?;`) - if err != nil { - return err - } - deleteTriple, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=? and predicate_hash=? and object_hash=? and label_hash is null;`) + } else { + panic("unexpected node update") + } + } + for _, s := range insertValue { + s.Close() + } + if s := updateValue; s != nil { + s.Close() + } + insertValue = nil + updateValue = nil + + // now we can deal with quads + ignore := "" + if opts.IgnoreDup { + ignore = " IGNORE" + } + + var ( + insertQuad *sql.Stmt + err error + ) + for _, d := range quads { + dirs := make([]interface{}, 0, len(quad.Directions)) + for _, h := range d.Quad { + dirs = append(dirs, h.SQLValue()) + } + if !d.Del { + if insertQuad == nil { + insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`) if err != nil { return err } + insertValue = make(map[csql.ValueType]*sql.Stmt) } - var result sql.Result - if d.Quad.Label == nil { - result, err = deleteTriple.Exec( - csql.HashOf(d.Quad.Subject).SQLValue(), - csql.HashOf(d.Quad.Predicate).SQLValue(), - csql.HashOf(d.Quad.Object).SQLValue(), - ) - } else { - result, err = deleteQuad.Exec( - csql.HashOf(d.Quad.Subject).SQLValue(), - csql.HashOf(d.Quad.Predicate).SQLValue(), - csql.HashOf(d.Quad.Object).SQLValue(), - csql.HashOf(d.Quad.Label).SQLValue(), - ) - } - if err != nil { - clog.Errorf("couldn't exec DELETE statement: %v", err) - return err - } - affected, err := result.RowsAffected() + _, err := insertQuad.Exec(dirs...) + err = convInsertErrorMySql(err) if err != nil { - clog.Errorf("couldn't get DELETE RowsAffected: %v", err) + clog.Errorf("couldn't exec INSERT statement: %v", err) return err } - if affected != 1 && !opts.IgnoreMissing { - return graph.ErrQuadNotExist - } - default: - panic("unknown action") + } else { + panic("unexpected quad delete") } } return nil diff --git a/graph/sql/postgres/postgres.go b/graph/sql/postgres/postgres.go index 5a6979034..f8613e975 100644 --- a/graph/sql/postgres/postgres.go +++ b/graph/sql/postgres/postgres.go @@ -104,17 +104,61 @@ func convInsertErrorPG(err error) error { // return nil //} -func RunTxPostgres(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error { - return RunTx(tx, in, opts, "") +func RunTxPostgres(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts) error { + return RunTx(tx, nodes, quads, opts, "") } -func RunTx(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts, onConflict string) error { - //allAdds := true - //for _, d := range in { - // if d.Action != graph.Add { - // allAdds = false - // } - //} +func RunTx(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts, onConflict string) error { + // update node ref counts and insert nodes + var ( + // prepared statements for each value type + insertValue = make(map[csql.ValueType]*sql.Stmt) + updateValue *sql.Stmt + ) + for _, n := range nodes { + if n.RefInc >= 0 { + nodeKey, values, err := csql.NodeValues(n.Hash, n.Val) + if err != nil { + return err + } + values = append([]interface{}{n.RefInc}, values...) + stmt, ok := insertValue[nodeKey] + if !ok { + var ph = make([]string, len(values)) + for i := range ph { + ph[i] = "$" + strconv.FormatInt(int64(i)+1, 10) + } + stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` + + strings.Join(nodeKey.Columns(), ", ") + + `) VALUES (` + strings.Join(ph, ", ") + + `) ON CONFLICT (hash) DO UPDATE SET refs = nodes.refs + EXCLUDED.refs;`) + if err != nil { + return err + } + insertValue[nodeKey] = stmt + } + _, err = stmt.Exec(values...) + err = convInsertErrorPG(err) + if err != nil { + clog.Errorf("couldn't exec INSERT statement: %v", err) + return err + } + } else { + panic("unexpected node update") + } + } + for _, s := range insertValue { + s.Close() + } + if s := updateValue; s != nil { + s.Close() + } + insertValue = nil + updateValue = nil + + // now we can deal with quads + + // TODO: copy //if allAdds && !opts.IgnoreDup { // return qs.copyFrom(tx, in, opts) //} @@ -125,125 +169,30 @@ func RunTx(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts, onConflict strin } var ( - insertQuad *sql.Stmt - insertValue map[csql.ValueType]*sql.Stmt // prepared statements for each value type - inserted map[csql.NodeHash]struct{} // tracks already inserted values - - deleteQuad *sql.Stmt - deleteTriple *sql.Stmt + insertQuad *sql.Stmt + err error ) - - var err error - for _, d := range in { - switch d.Action { - case graph.Add: + for _, d := range quads { + dirs := make([]interface{}, 0, len(quad.Directions)) + for _, h := range d.Quad { + dirs = append(dirs, h.SQLValue()) + } + if !d.Del { if insertQuad == nil { insertQuad, err = tx.Prepare(`INSERT INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES ($1, $2, $3, $4, now())` + end) if err != nil { return err } insertValue = make(map[csql.ValueType]*sql.Stmt) - inserted = make(map[csql.NodeHash]struct{}, len(in)) - } - var hs, hp, ho, hl csql.NodeHash - for _, dir := range quad.Directions { - v := d.Quad.Get(dir) - if v == nil { - continue - } - h := csql.HashOf(v) - switch dir { - case quad.Subject: - hs = h - case quad.Predicate: - hp = h - case quad.Object: - ho = h - case quad.Label: - hl = h - } - if !h.Valid() { - continue - } else if _, ok := inserted[h]; ok { - continue - } - nodeKey, values, err := csql.NodeValues(h, v) - if err != nil { - return err - } - stmt, ok := insertValue[nodeKey] - if !ok { - var ph = make([]string, len(values)-1) - for i := range ph { - ph[i] = "$" + strconv.FormatInt(int64(i)+2, 10) - } - stmt, err = tx.Prepare(`INSERT INTO nodes(hash, ` + - strings.Join(nodeKey.Columns(), ", ") + - `) VALUES ($1, ` + - strings.Join(ph, ", ") + - `) ON CONFLICT (hash) DO NOTHING;`) - if err != nil { - return err - } - insertValue[nodeKey] = stmt - } - _, err = stmt.Exec(values...) - err = convInsertErrorPG(err) - if err != nil { - clog.Errorf("couldn't exec INSERT statement: %v", err) - return err - } - inserted[h] = struct{}{} } - _, err := insertQuad.Exec( - hs.SQLValue(), hp.SQLValue(), ho.SQLValue(), hl.SQLValue(), - ) + _, err := insertQuad.Exec(dirs...) err = convInsertErrorPG(err) if err != nil { clog.Errorf("couldn't exec INSERT statement: %v", err) return err } - - case graph.Delete: - if deleteQuad == nil { - deleteQuad, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=$1 and predicate_hash=$2 and object_hash=$3 and label_hash=$4;`) - if err != nil { - return err - } - deleteTriple, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=$1 and predicate_hash=$2 and object_hash=$3 and label_hash is null;`) - if err != nil { - return err - } - } - var result sql.Result - if d.Quad.Label == nil { - result, err = deleteTriple.Exec( - csql.HashOf(d.Quad.Subject).SQLValue(), - csql.HashOf(d.Quad.Predicate).SQLValue(), - csql.HashOf(d.Quad.Object).SQLValue(), - ) - } else { - result, err = deleteQuad.Exec( - csql.HashOf(d.Quad.Subject).SQLValue(), - csql.HashOf(d.Quad.Predicate).SQLValue(), - csql.HashOf(d.Quad.Object).SQLValue(), - csql.HashOf(d.Quad.Label).SQLValue(), - ) - } - if err != nil { - clog.Errorf("couldn't exec DELETE statement: %v", err) - return err - } - affected, err := result.RowsAffected() - if err != nil { - clog.Errorf("couldn't get DELETE RowsAffected: %v", err) - return err - } - if affected != 1 && !opts.IgnoreMissing { - return graph.ErrQuadNotExist - } - default: - panic("unknown action") + } else { + panic("unexpected quad delete") } } return nil diff --git a/graph/sql/quadstore.go b/graph/sql/quadstore.go index 8e9838226..1860572b6 100644 --- a/graph/sql/quadstore.go +++ b/graph/sql/quadstore.go @@ -382,16 +382,145 @@ func NodeValues(h NodeHash, v quad.Value) (ValueType, []interface{}, error) { } func (qs *QuadStore) ApplyDeltas(in []graph.Delta, opts graph.IgnoreOpts) error { + // first calculate values ref deltas + hnodes := make(map[NodeHash]*NodeUpdate, len(in)*2) + quadIns := make([]QuadUpdate, 0, len(in)) + quadDel := make([]QuadUpdate, 0, len(in)/2) + var nadd, ndel int + for _, d := range in { + dn := 0 + switch d.Action { + case graph.Add: + dn = +1 + nadd++ + case graph.Delete: + dn = -1 + ndel++ + default: + panic("unknown action") + } + var q QuadHashes + for _, dir := range quad.Directions { + v := d.Quad.Get(dir) + if v == nil { + continue + } + h := HashOf(v) + q.Set(dir, h) + n := hnodes[h] + if n == nil { + n = &NodeUpdate{Hash: h, Val: v} + hnodes[h] = n + } + n.RefInc += dn + } + u := QuadUpdate{Quad: q, Del: d.Action == graph.Delete} + if !u.Del { + quadIns = append(quadIns, u) + } else { + quadDel = append(quadDel, u) + } + } + insNodes := make([]NodeUpdate, 0, nadd) + updNodes := make([]NodeUpdate, 0, ndel) + for _, n := range hnodes { + if n.RefInc >= 0 { + insNodes = append(insNodes, *n) + } else { + updNodes = append(updNodes, *n) + } + } + hnodes = nil + tx, err := qs.db.Begin() if err != nil { clog.Errorf("couldn't begin write transaction: %v", err) return err } - err = qs.flavor.RunTx(tx, in, opts) + + retry := qs.flavor.TxRetry + if retry == nil { + retry = func(tx *sql.Tx, stmts func() error) error { + return stmts() + } + } + p := make([]string, 4) + for i := range p { + p[i] = qs.flavor.Placeholder(i + 1) + } + + err = retry(tx, func() error { + // node update SQL is generic enough to run it here + updateNode, err := tx.Prepare(`UPDATE nodes SET refs = refs + ` + p[0] + ` WHERE hash = ` + p[1] + `;`) + if err != nil { + return err + } + for _, n := range updNodes { + _, err := updateNode.Exec(n.RefInc, n.Hash.SQLValue()) + if err != nil { + clog.Errorf("couldn't exec UPDATE statement: %v", err) + return err + } + } + err = qs.flavor.RunTx(tx, insNodes, quadIns, opts) + if err != nil { + return err + } + // quad delete is also generic, execute here + var ( + deleteQuad *sql.Stmt + deleteTriple *sql.Stmt + ) + for _, d := range quadDel { + dirs := make([]interface{}, 0, len(quad.Directions)) + for _, h := range d.Quad { + dirs = append(dirs, h.SQLValue()) + } + if deleteQuad == nil { + deleteQuad, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=` + p[0] + ` and predicate_hash=` + p[1] + ` and object_hash=` + p[2] + ` and label_hash=` + p[3] + `;`) + if err != nil { + return err + } + deleteTriple, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=` + p[0] + ` and predicate_hash=` + p[1] + ` and object_hash=` + p[2] + ` and label_hash is null;`) + if err != nil { + return err + } + } + stmt := deleteQuad + if i := len(dirs) - 1; dirs[i] == nil { + stmt = deleteTriple + dirs = dirs[:i] + } + result, err := stmt.Exec(dirs...) + if err != nil { + clog.Errorf("couldn't exec DELETE statement: %v", err) + return err + } + affected, err := result.RowsAffected() + if err != nil { + clog.Errorf("couldn't get DELETE RowsAffected: %v", err) + return err + } + if affected != 1 && !opts.IgnoreMissing { + return graph.ErrQuadNotExist + } + } + if ndel == 0 { + return nil + } + // and remove unused nodes at last + _, err = tx.Exec(`DELETE FROM nodes WHERE refs <= 0;`) + if err != nil { + clog.Errorf("couldn't exec DELETE nodes statement: %v", err) + return err + } + return nil + }) if err != nil { tx.Rollback() return err } + qs.mu.Lock() qs.size = -1 // TODO(barakmich): Sync size with writes. qs.mu.Unlock() diff --git a/graph/sql/sqltest/sqltest.go b/graph/sql/sqltest/sqltest.go index 53e3009c4..f2b356122 100644 --- a/graph/sql/sqltest/sqltest.go +++ b/graph/sql/sqltest/sqltest.go @@ -25,12 +25,11 @@ func TestAll(t *testing.T, typ string, fnc DatabaseFunc, c *Config) { t.Run("graph", func(t *testing.T) { t.Parallel() graphtest.TestAll(t, create, &graphtest.Config{ - NoPrimitives: true, - TimeInMcs: true, - TimeRound: c.TimeRound, - OptimizesComparison: true, - SkipNodeDelAfterQuadDel: true, - SkipIntHorizon: c.SkipIntHorizon, + NoPrimitives: true, + TimeInMcs: true, + TimeRound: c.TimeRound, + OptimizesComparison: true, + SkipIntHorizon: c.SkipIntHorizon, }) }) t.Run("zero rune", func(t *testing.T) {