Skip to content

Commit

Permalink
sql: track nodes reference count and simplify dialects; fixes #617
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Dec 19, 2017
1 parent 73089a9 commit 31b0f48
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 241 deletions.
14 changes: 10 additions & 4 deletions graph/sql/cockroach/cockroach.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func init() {
TimeType: `timestamp with time zone`,
NodesTableExtra: `
FAMILY fhash (hash),
FAMILY frefs (refs),
FAMILY fvalue (value, value_string, datatype, language, iri, bnode,
value_int, value_bool, value_float, value_time)
`,
Expand All @@ -32,19 +33,25 @@ func init() {
// return "SELECT reltuples::BIGINT AS estimate FROM pg_class WHERE relname='"+table+"';"
//},
RunTx: runTxCockroach,
TxRetry: retryTxCockroach,
NoSchemaChangesInTx: true,
})
}

func runTxCockroach(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts) error {
// FIXME: on conflict for SPOL; blocked by CockroachDB not supporting empty ON CONFLICT statements
return postgres.RunTx(tx, nodes, quads, opts, `(subject_hash, predicate_hash, object_hash)`)
}

// AmbiguousCommitError represents an error that left a transaction in an
// ambiguous state: unclear if it committed or not.
type AmbiguousCommitError struct {
error
}

// runTxCockroach runs the transaction and will retry in case of a retryable error.
// retryTxCockroach runs the transaction and will retry in case of a retryable error.
// https://www.cockroachlabs.com/docs/transactions.html#client-side-transaction-retries
func runTxCockroach(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error {
func retryTxCockroach(tx *sql.Tx, stmts func() error) error {
// Specify that we intend to retry this txn in case of CockroachDB retryable
// errors.
if _, err := tx.Exec("SAVEPOINT cockroach_restart"); err != nil {
Expand All @@ -54,8 +61,7 @@ func runTxCockroach(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error {
for {
released := false

// FIXME: on conflict for SPOL; blocked by CockroachDB not supporting empty ON CONFLICT statements
err := postgres.RunTx(tx, in, opts, `(subject_hash, predicate_hash, object_hash)`)
err := stmts()

if err == nil {
// RELEASE acts like COMMIT in CockroachDB. We use it since it gives us an
Expand Down
18 changes: 9 additions & 9 deletions graph/sql/cockroach/cockroach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
package cockroach

import (
"testing"
"database/sql"
"testing"

"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/internal/dock"
"github.com/cayleygraph/cayley/graph/sql/sqltest"
"github.com/cayleygraph/cayley/internal/dock"
"github.com/lib/pq"
)

func makeCockroach(t testing.TB) (string, graph.Options, func()) {
var conf dock.Config

conf.Image = "cockroachdb/cockroach:v1.0.5"
conf.Image = "cockroachdb/cockroach:v1.1.2"
conf.Cmd = []string{"start", "--insecure"}

addr, closer := dock.RunAndWait(t, conf, func(addr string) bool {
Expand All @@ -26,27 +26,27 @@ func makeCockroach(t testing.TB) (string, graph.Options, func()) {
conn.Close()
return true
})
addr = `postgresql://root@`+addr+`:26257`
db, err := sql.Open(driverName,addr+`?sslmode=disable`)
addr = `postgresql://root@` + addr + `:26257`
db, err := sql.Open(driverName, addr+`?sslmode=disable`)
if err != nil {
closer()
t.Fatal(err)
}
defer db.Close()
const dbName = "cayley"
if _, err = db.Exec("CREATE DATABASE "+dbName); err != nil {
if _, err = db.Exec("CREATE DATABASE " + dbName); err != nil {
closer()
t.Fatal(err)
}
addr = addr + `/`+dbName+`?sslmode=disable`
addr = addr + `/` + dbName + `?sslmode=disable`
return addr, nil, func() {
closer()
}
}

func TestCockroach(t *testing.T) {
sqltest.TestAll(t, Type, makeCockroach, &sqltest.Config{
TimeRound: true,
SkipIntHorizon: true,
TimeRound: true,
SkipIntHorizon: true,
})
}
16 changes: 15 additions & 1 deletion graph/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/quad"
)

var types = make(map[string]Registration)
Expand All @@ -18,6 +19,17 @@ func Register(name string, f Registration) {
registerQuadStore(name, name)
}

type NodeUpdate struct {
Hash NodeHash
Val quad.Value
RefInc int
}

type QuadUpdate struct {
Quad QuadHashes
Del bool
}

type Registration struct {
Driver string // sql driver to use on dial
HashType string // type for hash fields
Expand All @@ -34,7 +46,8 @@ type Registration struct {

Error func(error) error // error conversion function
Estimated func(table string) string // query that string that returns an estimated number of rows in table
RunTx func(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error
RunTx func(tx *sql.Tx, nodes []NodeUpdate, quads []QuadUpdate, opts graph.IgnoreOpts) error
TxRetry func(tx *sql.Tx, stmts func() error) error
NoSchemaChangesInTx bool
}

Expand All @@ -57,6 +70,7 @@ func (r Registration) nodesTable() string {
}
return `CREATE TABLE nodes (
hash ` + htyp + ` PRIMARY KEY,
refs INT NOT NULL,
value ` + btyp + `,
value_string TEXT,
datatype TEXT,
Expand Down
165 changes: 60 additions & 105 deletions graph/sql/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,131 +38,86 @@ func init() {
})
}

func runTxMysql(tx *sql.Tx, in []graph.Delta, opts graph.IgnoreOpts) error {
ignore := ""
if opts.IgnoreDup {
ignore = " IGNORE"
}

func runTxMysql(tx *sql.Tx, nodes []csql.NodeUpdate, quads []csql.QuadUpdate, opts graph.IgnoreOpts) error {
// update node ref counts and insert nodes
var (
insertQuad *sql.Stmt
insertValue map[csql.ValueType]*sql.Stmt // prepared statements for each value type
inserted map[csql.NodeHash]struct{} // tracks already inserted values

deleteQuad *sql.Stmt
deleteTriple *sql.Stmt
// prepared statements for each value type
insertValue = make(map[csql.ValueType]*sql.Stmt)
updateValue *sql.Stmt
)

var err error
for _, d := range in {
switch d.Action {
case graph.Add:
if insertQuad == nil {
insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)
if err != nil {
return err
}
insertValue = make(map[csql.ValueType]*sql.Stmt)
inserted = make(map[csql.NodeHash]struct{}, len(in))
for _, n := range nodes {
if n.RefInc >= 0 {
nodeKey, values, err := csql.NodeValues(n.Hash, n.Val)
if err != nil {
return err
}
var hs, hp, ho, hl csql.NodeHash
for _, dir := range quad.Directions {
v := d.Quad.Get(dir)
if v == nil {
continue
}
h := csql.HashOf(v)
switch dir {
case quad.Subject:
hs = h
case quad.Predicate:
hp = h
case quad.Object:
ho = h
case quad.Label:
hl = h
}
if !h.Valid() {
continue
} else if _, ok := inserted[h]; ok {
continue
}
nodeKey, values, err := csql.NodeValues(h, v)
if err != nil {
return err
}
stmt, ok := insertValue[nodeKey]
if !ok {
var ph = make([]string, len(values)-1)
for i := range ph {
ph[i] = "?"
}
stmt, err = tx.Prepare(`INSERT IGNORE INTO nodes(hash, ` +
strings.Join(nodeKey.Columns(), ", ") +
`) VALUES (?, ` +
strings.Join(ph, ", ") +
`);`)
if err != nil {
return err
}
insertValue[nodeKey] = stmt
values = append([]interface{}{n.RefInc}, values...)
values = append(values, n.RefInc) // one more time for UPDATE
stmt, ok := insertValue[nodeKey]
if !ok {
var ph = make([]string, len(values)-1) // excluding last increment
for i := range ph {
ph[i] = "?"
}
_, err = stmt.Exec(values...)
err = convInsertErrorMySql(err)
stmt, err = tx.Prepare(`INSERT INTO nodes(refs, hash, ` +
strings.Join(nodeKey.Columns(), ", ") +
`) VALUES (` + strings.Join(ph, ", ") +
`) ON DUPLICATE KEY UPDATE refs = refs + ?;`)
if err != nil {
clog.Errorf("couldn't exec INSERT statement: %v", err)
return err
}
inserted[h] = struct{}{}
insertValue[nodeKey] = stmt
}
_, err := insertQuad.Exec(
hs.SQLValue(), hp.SQLValue(), ho.SQLValue(), hl.SQLValue(),
)
_, err = stmt.Exec(values...)
err = convInsertErrorMySql(err)
if err != nil {
clog.Errorf("couldn't exec INSERT statement: %v", err)
return err
}
case graph.Delete:
if deleteQuad == nil {
deleteQuad, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=? and predicate_hash=? and object_hash=? and label_hash=?;`)
if err != nil {
return err
}
deleteTriple, err = tx.Prepare(`DELETE FROM quads WHERE subject_hash=? and predicate_hash=? and object_hash=? and label_hash is null;`)
} else {
panic("unexpected node update")
}
}
for _, s := range insertValue {
s.Close()
}
if s := updateValue; s != nil {
s.Close()
}
insertValue = nil
updateValue = nil

// now we can deal with quads
ignore := ""
if opts.IgnoreDup {
ignore = " IGNORE"
}

var (
insertQuad *sql.Stmt
err error
)
for _, d := range quads {
dirs := make([]interface{}, 0, len(quad.Directions))
for _, h := range d.Quad {
dirs = append(dirs, h.SQLValue())
}
if !d.Del {
if insertQuad == nil {
insertQuad, err = tx.Prepare(`INSERT` + ignore + ` INTO quads(subject_hash, predicate_hash, object_hash, label_hash, ts) VALUES (?, ?, ?, ?, now());`)
if err != nil {
return err
}
insertValue = make(map[csql.ValueType]*sql.Stmt)
}
var result sql.Result
if d.Quad.Label == nil {
result, err = deleteTriple.Exec(
csql.HashOf(d.Quad.Subject).SQLValue(),
csql.HashOf(d.Quad.Predicate).SQLValue(),
csql.HashOf(d.Quad.Object).SQLValue(),
)
} else {
result, err = deleteQuad.Exec(
csql.HashOf(d.Quad.Subject).SQLValue(),
csql.HashOf(d.Quad.Predicate).SQLValue(),
csql.HashOf(d.Quad.Object).SQLValue(),
csql.HashOf(d.Quad.Label).SQLValue(),
)
}
if err != nil {
clog.Errorf("couldn't exec DELETE statement: %v", err)
return err
}
affected, err := result.RowsAffected()
_, err := insertQuad.Exec(dirs...)
err = convInsertErrorMySql(err)
if err != nil {
clog.Errorf("couldn't get DELETE RowsAffected: %v", err)
clog.Errorf("couldn't exec INSERT statement: %v", err)
return err
}
if affected != 1 && !opts.IgnoreMissing {
return graph.ErrQuadNotExist
}
default:
panic("unknown action")
} else {
panic("unexpected quad delete")
}
}
return nil
Expand Down
Loading

0 comments on commit 31b0f48

Please sign in to comment.