From 005f17e30b2661148553232f54623491a8aea03b Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Tue, 22 Aug 2017 10:57:51 -0700 Subject: [PATCH 1/3] changefeed for rethinkdb Signed-off-by: David Lawrence (github: endophage) --- server/storage/memory.go | 4 +- server/storage/mysql_test.go | 2 +- server/storage/postgresql_test.go | 2 +- server/storage/rethink_realdb_test.go | 8 ++ server/storage/rethinkdb.go | 177 +++++++++++++++++++++----- server/storage/rethinkdb_models.go | 14 ++ server/storage/rethinkdb_test.go | 8 -- server/storage/sql_models.go | 10 +- server/storage/sqldb.go | 4 +- server/storage/storage_test.go | 49 +++---- 10 files changed, 206 insertions(+), 72 deletions(-) diff --git a/server/storage/memory.go b/server/storage/memory.go index a83e16146..641ffb514 100644 --- a/server/storage/memory.go +++ b/server/storage/memory.go @@ -85,7 +85,7 @@ func (st *MemStorage) UpdateCurrent(gun data.GUN, update MetaUpdate) error { // the MemStorage. Behaviour is undefined otherwise func (st *MemStorage) writeChange(gun data.GUN, version int, checksum string) { c := Change{ - ID: uint(len(st.changes) + 1), + ID: strconv.Itoa(len(st.changes) + 1), GUN: gun.String(), Version: version, SHA256: checksum, @@ -200,7 +200,7 @@ func (st *MemStorage) Delete(gun data.GUN) error { } delete(st.checksums, gun.String()) c := Change{ - ID: uint(len(st.changes) + 1), + ID: strconv.Itoa(len(st.changes) + 1), GUN: gun.String(), Category: changeCategoryDeletion, CreatedAt: time.Now(), diff --git a/server/storage/mysql_test.go b/server/storage/mysql_test.go index bd19d78d9..4900d6d10 100644 --- a/server/storage/mysql_test.go +++ b/server/storage/mysql_test.go @@ -43,7 +43,7 @@ func init() { // drop all tables, if they exist gormDB.DropTable(&TUFFile{}) - gormDB.DropTable(&Change{}) + gormDB.DropTable(&SQLChange{}) } cleanup1() dbStore := SetupSQLDB(t, "mysql", dburl) diff --git a/server/storage/postgresql_test.go b/server/storage/postgresql_test.go index bb87c8124..8d498223a 100644 --- a/server/storage/postgresql_test.go +++ b/server/storage/postgresql_test.go @@ -44,7 +44,7 @@ func init() { // drop all tables, if they exist gormDB.DropTable(&TUFFile{}) - gormDB.DropTable(&Change{}) + gormDB.DropTable(&SQLChange{}) } cleanup1() dbStore := SetupSQLDB(t, notary.PostgresBackend, dburl) diff --git a/server/storage/rethink_realdb_test.go b/server/storage/rethink_realdb_test.go index 84165501d..8ed64246f 100644 --- a/server/storage/rethink_realdb_test.go +++ b/server/storage/rethink_realdb_test.go @@ -36,6 +36,7 @@ func rethinkDBSetup(t *testing.T) (RethinkDB, func()) { cleanup() require.NoError(t, rethinkdb.SetupDB(session, dbName, []rethinkdb.Table{ TUFFilesRethinkTable, + ChangeRethinkTable, })) return NewRethinkDBStorage(dbName, "", "", session), cleanup } @@ -169,3 +170,10 @@ func TestRethinkTUFMetaStoreGetCurrent(t *testing.T) { testTUFMetaStoreGetCurrent(t, dbStore) } + +func TestRethinkDBGetChanges(t *testing.T) { + dbStore, cleanup := rethinkDBSetup(t) + defer cleanup() + + testGetChanges(t, dbStore) +} diff --git a/server/storage/rethinkdb.go b/server/storage/rethinkdb.go index 8d5326472..e59356a66 100644 --- a/server/storage/rethinkdb.go +++ b/server/storage/rethinkdb.go @@ -4,7 +4,6 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "errors" "fmt" "sort" "time" @@ -15,6 +14,8 @@ import ( "gopkg.in/dancannon/gorethink.v3" ) +var blackoutTime = 60 + // RDBTUFFile is a TUF file record type RDBTUFFile struct { rethinkdb.Timing @@ -29,7 +30,22 @@ type RDBTUFFile struct { // TableName returns the table name for the record type func (r RDBTUFFile) TableName() string { - return "tuf_files" + return TUFFileTableName +} + +// Change defines the the fields required for an object in the changefeed +type Change struct { + ID string `gorethink:"id,omitempty"` + CreatedAt time.Time `gorethink:"created_at"` + GUN string `gorethink:"gun"` + Version int `gorethink:"version"` + SHA256 string `gorethink:"sha256"` + Category string `gorethink:"category"` +} + +// TableName sets a specific table name for Changefeed +func (rdb Change) TableName() string { + return ChangefeedTableName } // gorethink can't handle an UnmarshalJSON function (see https://github.com/gorethink/gorethink/issues/201), @@ -65,6 +81,14 @@ func rdbTUFFileFromJSON(data []byte) (interface{}, error) { }, nil } +func rdbChangeFromJSON(data []byte) (interface{}, error) { + res := Change{} + if err := json.Unmarshal(data, &res); err != nil { + return Change{}, err + } + return res, nil +} + // RethinkDB implements a MetaStore against the Rethink Database type RethinkDB struct { dbName string @@ -87,35 +111,27 @@ func NewRethinkDBStorage(dbName, user, password string, sess *gorethink.Session) // if it's a new role, or the version is greater than the current version // for the role. Otherwise an error is returned. func (rdb RethinkDB) UpdateCurrent(gun data.GUN, update MetaUpdate) error { - now := time.Now() - checksum := sha256.Sum256(update.Data) - file := RDBTUFFile{ - Timing: rethinkdb.Timing{ - CreatedAt: now, - UpdatedAt: now, - }, - GunRoleVersion: []interface{}{gun, update.Role, update.Version}, - Gun: gun.String(), - Role: update.Role.String(), - Version: update.Version, - SHA256: hex.EncodeToString(checksum[:]), - Data: update.Data, + // empty string is the zero value for tsChecksum in the RDBTUFFile struct. + // Therefore we can just call through to updateCurrentWithTSChecksum passing + // "" for the tsChecksum value. + if err := rdb.updateCurrentWithTSChecksum(gun.String(), "", update); err != nil { + return err } - _, err := gorethink.DB(rdb.dbName).Table(file.TableName()).Insert( - file, - gorethink.InsertOpts{ - Conflict: "error", // default but explicit for clarity of intent - }, - ).RunWrite(rdb.sess) - if err != nil && gorethink.IsConflictErr(err) { - return ErrOldVersion{} + if update.Role == data.CanonicalTimestampRole { + tsChecksumBytes := sha256.Sum256(update.Data) + return rdb.writeChange( + gun.String(), + update.Version, + hex.EncodeToString(tsChecksumBytes[:]), + changeCategoryUpdate, + ) } - return err + return nil } -// UpdateCurrentWithTSChecksum adds new metadata version for the given GUN with an associated +// updateCurrentWithTSChecksum adds new metadata version for the given GUN with an associated // checksum for the timestamp it belongs to, to afford us transaction-like functionality -func (rdb RethinkDB) UpdateCurrentWithTSChecksum(gun, tsChecksum string, update MetaUpdate) error { +func (rdb RethinkDB) updateCurrentWithTSChecksum(gun, tsChecksum string, update MetaUpdate) error { now := time.Now() checksum := sha256.Sum256(update.Data) file := RDBTUFFile{ @@ -162,11 +178,15 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error { // find the timestamp first and save its checksum // then apply the updates in alphabetic role order with the timestamp last // if there are any failures, we roll back in the same alphabetic order - var tsChecksum string + var ( + tsChecksum string + tsVersion int + ) for _, up := range updates { if up.Role == data.CanonicalTimestampRole { tsChecksumBytes := sha256.Sum256(up.Data) tsChecksum = hex.EncodeToString(tsChecksumBytes[:]) + tsVersion = up.Version break } } @@ -175,7 +195,7 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error { sort.Stable(updateSorter(updates)) for _, up := range updates { - if err := rdb.UpdateCurrentWithTSChecksum(gun.String(), tsChecksum, up); err != nil { + if err := rdb.updateCurrentWithTSChecksum(gun.String(), tsChecksum, up); err != nil { // roll back with best-effort deletion, and then error out rollbackErr := rdb.deleteByTSChecksum(tsChecksum) if rollbackErr != nil { @@ -185,6 +205,11 @@ func (rdb RethinkDB) UpdateMany(gun data.GUN, updates []MetaUpdate) error { return err } } + + // if the update included a timestamp, write a change object + if tsChecksum != "" { + return rdb.writeChange(gun.String(), tsVersion, tsChecksum, changeCategoryUpdate) + } return nil } @@ -259,7 +284,7 @@ func (rdb RethinkDB) Delete(gun data.GUN) error { if err != nil { return fmt.Errorf("unable to delete %s from database: %s", gun.String(), err.Error()) } - return nil + return rdb.writeChange(gun.String(), 0, "", changeCategoryDeletion) } // deleteByTSChecksum removes all metadata by a timestamp checksum, used for rolling back a "transaction" @@ -271,6 +296,7 @@ func (rdb RethinkDB) deleteByTSChecksum(tsChecksum string) error { if err != nil { return fmt.Errorf("unable to delete timestamp checksum data: %s from database: %s", tsChecksum, err.Error()) } + // DO NOT WRITE CHANGE! THIS IS USED _ONLY_ TO ROLLBACK A FAILED INSERT return nil } @@ -278,6 +304,7 @@ func (rdb RethinkDB) deleteByTSChecksum(tsChecksum string) error { func (rdb RethinkDB) Bootstrap() error { if err := rethinkdb.SetupDB(rdb.sess, rdb.dbName, []rethinkdb.Table{ TUFFilesRethinkTable, + ChangeRethinkTable, }); err != nil { return err } @@ -294,7 +321,97 @@ func (rdb RethinkDB) CheckHealth() error { return nil } +func (rdb RethinkDB) writeChange(gun string, version int, sha256, category string) error { + now := time.Now() + ch := Change{ + CreatedAt: now, + GUN: gun, + Version: version, + SHA256: sha256, + Category: category, + } + _, err := gorethink.DB(rdb.dbName).Table(ch.TableName()).Insert( + ch, + gorethink.InsertOpts{ + Conflict: "error", // default but explicit for clarity of intent + }, + ).RunWrite(rdb.sess) + return err +} + // GetChanges is not implemented for RethinkDB func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string) ([]Change, error) { - return nil, errors.New("Not Implemented") + var ( + lower, upper, bound []interface{} + idx = "rdb_created_at_id" + max = []interface{}{gorethink.Now().Sub(blackoutTime), gorethink.MaxVal} + min = []interface{}{gorethink.MinVal, gorethink.MinVal} + order gorethink.OrderByOpts + changes = make([]Change, 0, pageSize) + reversed bool + ) + if filterName != "" { + idx = "rdb_gun_created_at_id" + max = append([]interface{}{filterName}, max...) + min = append([]interface{}{filterName}, min...) + } + + switch changeID { + case "0", "-1": + lower = min + upper = max + default: + bound, idx = rdb.bound(changeID, filterName) + if pageSize < 0 { + lower = min + upper = bound + } else { + lower = bound + upper = max + } + } + + if changeID == "-1" || pageSize < 0 { + reversed = true + order = gorethink.OrderByOpts{Index: gorethink.Desc(idx)} + } else { + order = gorethink.OrderByOpts{Index: gorethink.Asc(idx)} + } + + if pageSize < 0 { + pageSize = pageSize * -1 + } + + res, err := gorethink.DB(rdb.dbName). + Table(Change{}.TableName()). + OrderBy(order). + Between( + lower, + upper, + gorethink.BetweenOpts{ + LeftBound: "open", + RightBound: "open", + }, + ).Limit(pageSize).Run(rdb.sess) + if err != nil { + return nil, err + } + defer res.Close() + + if reversed { + // results are currently newest to oldest, should be oldest to newest + for i, j := 0, len(changes)-1; i < j; i, j = i+1, j-1 { + changes[i], changes[j] = changes[j], changes[i] + } + } + + return changes, res.All(&changes) +} + +func (rdb RethinkDB) bound(changeID, filterName string) ([]interface{}, string) { + term := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at") + if filterName != "" { + return []interface{}{filterName, term, changeID}, "rdb_gun_created_at_id" + } + return []interface{}{term, changeID}, "rdb_created_at_id" } diff --git a/server/storage/rethinkdb_models.go b/server/storage/rethinkdb_models.go index 56295c590..3236a7735 100644 --- a/server/storage/rethinkdb_models.go +++ b/server/storage/rethinkdb_models.go @@ -30,4 +30,18 @@ var ( }, JSONUnmarshaller: rdbTUFFileFromJSON, } + + // ChangeRethinkTable is the table definition for changefeed objects + ChangeRethinkTable = rethinkdb.Table{ + Name: Change{}.TableName(), + PrimaryKey: "id", + SecondaryIndexes: map[string][]string{ + "rdb_created_at_id": {"created_at", "id"}, + "rdb_gun_created_at_id": {"gun", "created_at", "id"}, + }, + Config: map[string]string{ + "write_acks": "majority", + }, + JSONUnmarshaller: rdbChangeFromJSON, + } ) diff --git a/server/storage/rethinkdb_test.go b/server/storage/rethinkdb_test.go index aaa5a1d89..f0a93efe2 100644 --- a/server/storage/rethinkdb_test.go +++ b/server/storage/rethinkdb_test.go @@ -117,11 +117,3 @@ func TestRDBTUFFileJSONUnmarshallingFailure(t *testing.T) { require.Error(t, err) } } - -func TestRethinkDBGetChanges(t *testing.T) { - s := NewRethinkDBStorage("dbname", "user", "pwd", nil) - c, err := s.GetChanges("foo", 10, "") - require.Error(t, err) - require.Nil(t, c) - require.Contains(t, err.Error(), "Not Implemented") -} diff --git a/server/storage/sql_models.go b/server/storage/sql_models.go index 12ff97691..a7d9ee023 100644 --- a/server/storage/sql_models.go +++ b/server/storage/sql_models.go @@ -32,9 +32,9 @@ func (g TUFFile) TableName() string { return TUFFileTableName } -// Change defines the the fields required for an object in the changefeed -type Change struct { - ID uint `gorm:"primary_key" sql:"not null" json:",string"` +// SQLChange defines the the fields required for an object in the changefeed +type SQLChange struct { + ID uint `gorm:"primary_key" sql:";not null" json:",string"` CreatedAt time.Time GUN string `gorm:"column:gun" sql:"type:varchar(255);not null"` Version int `sql:"not null"` @@ -43,7 +43,7 @@ type Change struct { } // TableName sets a specific table name for Changefeed -func (c Change) TableName() string { +func (c SQLChange) TableName() string { return ChangefeedTableName } @@ -61,6 +61,6 @@ func CreateTUFTable(db gorm.DB) error { // CreateChangefeedTable creates the DB table for Changefeed func CreateChangefeedTable(db gorm.DB) error { - query := db.AutoMigrate(&Change{}) + query := db.AutoMigrate(&SQLChange{}) return query.Error } diff --git a/server/storage/sqldb.go b/server/storage/sqldb.go index b8ee7a307..413aeaebd 100644 --- a/server/storage/sqldb.go +++ b/server/storage/sqldb.go @@ -166,7 +166,7 @@ func (db *SQLStorage) UpdateMany(gun data.GUN, updates []MetaUpdate) error { } func (db *SQLStorage) writeChangefeed(tx *gorm.DB, gun data.GUN, version int, checksum string) error { - c := &Change{ + c := &SQLChange{ GUN: gun.String(), Version: version, SHA256: checksum, @@ -244,7 +244,7 @@ func (db *SQLStorage) Delete(gun data.GUN) error { if res.RowsAffected == 0 { return nil } - c := &Change{ + c := &SQLChange{ GUN: gun.String(), Category: changeCategoryDeletion, } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index d0dfef444..9e8bb69db 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -195,6 +195,7 @@ func testUpdateManyNoConflicts(t *testing.T, s MetaStore) []StoredTUFMeta { // UpdateMany does not insert any rows (or at least rolls them back) if there // are any conflicts. func testUpdateManyConflictRollback(t *testing.T, s MetaStore) []StoredTUFMeta { + blackoutTime = 0 var gun data.GUN = "testGUN" successBatch := make([]StoredTUFMeta, 4) updates := make([]MetaUpdate, 4) @@ -206,9 +207,7 @@ func testUpdateManyConflictRollback(t *testing.T, s MetaStore) []StoredTUFMeta { require.NoError(t, s.UpdateMany(gun, updates)) before, err := s.GetChanges("0", 1000, "") - if _, ok := s.(RethinkDB); !ok { - require.NoError(t, err) - } + require.NoError(t, err) // conflicts with what's in DB badBatch := make([]StoredTUFMeta, 4) @@ -224,9 +223,7 @@ func testUpdateManyConflictRollback(t *testing.T, s MetaStore) []StoredTUFMeta { // check no changes were written when there was a conflict+rollback after, err := s.GetChanges("0", 1000, "") - if _, ok := s.(RethinkDB); !ok { - require.NoError(t, err) - } + require.NoError(t, err) require.Equal(t, len(before), len(after)) err = s.UpdateMany(gun, updates) @@ -298,6 +295,7 @@ func testDeleteSuccess(t *testing.T, s MetaStore) { } func testGetChanges(t *testing.T, s MetaStore) { + blackoutTime = 0 // non-int changeID c, err := s.GetChanges("foo", 10, "") require.Error(t, err) @@ -310,16 +308,22 @@ func testGetChanges(t *testing.T, s MetaStore) { Version: 1, Data: []byte{'1'}, }, + })) + require.NoError(t, s.UpdateMany("alpine", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 2, Data: []byte{'2'}, }, + })) + require.NoError(t, s.UpdateMany("alpine", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 3, Data: []byte{'3'}, }, + })) + require.NoError(t, s.UpdateMany("alpine", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 4, @@ -332,16 +336,22 @@ func testGetChanges(t *testing.T, s MetaStore) { Version: 1, Data: []byte{'5'}, }, + })) + require.NoError(t, s.UpdateMany("busybox", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 2, Data: []byte{'6'}, }, + })) + require.NoError(t, s.UpdateMany("busybox", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 3, Data: []byte{'7'}, }, + })) + require.NoError(t, s.UpdateMany("busybox", []MetaUpdate{ { Role: data.CanonicalTimestampRole, Version: 4, @@ -355,48 +365,43 @@ func testGetChanges(t *testing.T, s MetaStore) { require.Len(t, c, 8) for i := 0; i < 4; i++ { - require.Equal(t, uint(i+1), c[i].ID) require.Equal(t, "alpine", c[i].GUN) require.Equal(t, i+1, c[i].Version) } for i := 4; i < 8; i++ { - require.Equal(t, uint(i+1), c[i].ID) require.Equal(t, "busybox", c[i].GUN) require.Equal(t, i-3, c[i].Version) } + full := c c, err = s.GetChanges("-1", 4, "") require.NoError(t, err) require.Len(t, c, 4) for i := 0; i < 4; i++ { - require.Equal(t, uint(i+5), c[i].ID) require.Equal(t, "busybox", c[i].GUN) require.Equal(t, i+1, c[i].Version) } - c, err = s.GetChanges("10", 4, "") + c, err = s.GetChanges(full[7].ID, 4, "") require.NoError(t, err) require.Len(t, c, 0) - c, err = s.GetChanges("10", -4, "") - require.NoError(t, err) - require.Len(t, c, 4) - for i := 0; i < 4; i++ { - require.Equal(t, uint(i+5), c[i].ID) - require.Equal(t, "busybox", c[i].GUN) - require.Equal(t, i+1, c[i].Version) - } + //c, err = s.GetChanges(full[7].ID, -4, "") + //require.NoError(t, err) + //require.Len(t, c, 4) + //for i := 0; i < 4; i++ { + // require.Equal(t, "busybox", c[i].GUN) + // require.Equal(t, i+1, c[i].Version) + //} - c, err = s.GetChanges("7", -4, "") + c, err = s.GetChanges(full[6].ID, -4, "") require.NoError(t, err) require.Len(t, c, 4) for i := 0; i < 2; i++ { - require.Equal(t, uint(i+3), c[i].ID) require.Equal(t, "alpine", c[i].GUN) require.Equal(t, i+3, c[i].Version) } for i := 2; i < 4; i++ { - require.Equal(t, uint(i+3), c[i].ID) require.Equal(t, "busybox", c[i].GUN) require.Equal(t, i-1, c[i].Version) } @@ -405,7 +410,6 @@ func testGetChanges(t *testing.T, s MetaStore) { require.NoError(t, err) require.Len(t, c, 4) for i := 0; i < 4; i++ { - require.Equal(t, uint(i+5), c[i].ID) require.Equal(t, "busybox", c[i].GUN) require.Equal(t, i+1, c[i].Version) } @@ -414,7 +418,6 @@ func testGetChanges(t *testing.T, s MetaStore) { require.NoError(t, err) require.Len(t, c, 4) for i := 0; i < 4; i++ { - require.Equal(t, uint(i+5), c[i].ID) require.Equal(t, "busybox", c[i].GUN) require.Equal(t, i+1, c[i].Version) } From 07a5c0483ef800274e007900a2365da3a618c7df Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Fri, 25 Aug 2017 20:08:41 -0700 Subject: [PATCH 2/3] defer the reversing Signed-off-by: David Lawrence (github: endophage) --- server/storage/rethinkdb.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/server/storage/rethinkdb.go b/server/storage/rethinkdb.go index e59356a66..f57e290a1 100644 --- a/server/storage/rethinkdb.go +++ b/server/storage/rethinkdb.go @@ -278,13 +278,16 @@ func (rdb RethinkDB) GetVersion(gun data.GUN, role data.RoleName, version int) ( // Delete removes all metadata for a given GUN. It does not return an // error if no metadata exists for the given GUN. func (rdb RethinkDB) Delete(gun data.GUN) error { - _, err := gorethink.DB(rdb.dbName).Table(RDBTUFFile{}.TableName()).GetAllByIndex( + resp, err := gorethink.DB(rdb.dbName).Table(RDBTUFFile{}.TableName()).GetAllByIndex( "gun", gun.String(), ).Delete().RunWrite(rdb.sess) if err != nil { return fmt.Errorf("unable to delete %s from database: %s", gun.String(), err.Error()) } - return rdb.writeChange(gun.String(), 0, "", changeCategoryDeletion) + if resp.Deleted > 0 { + return rdb.writeChange(gun.String(), 0, "", changeCategoryDeletion) + } + return nil } // deleteByTSChecksum removes all metadata by a timestamp checksum, used for rolling back a "transaction" @@ -347,7 +350,6 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string max = []interface{}{gorethink.Now().Sub(blackoutTime), gorethink.MaxVal} min = []interface{}{gorethink.MinVal, gorethink.MinVal} order gorethink.OrderByOpts - changes = make([]Change, 0, pageSize) reversed bool ) if filterName != "" { @@ -382,6 +384,8 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string pageSize = pageSize * -1 } + changes := make([]Change, 0, pageSize) + res, err := gorethink.DB(rdb.dbName). Table(Change{}.TableName()). OrderBy(order). @@ -398,12 +402,14 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string } defer res.Close() - if reversed { - // results are currently newest to oldest, should be oldest to newest - for i, j := 0, len(changes)-1; i < j; i, j = i+1, j-1 { - changes[i], changes[j] = changes[j], changes[i] + defer func() { + if reversed { + // results are currently newest to oldest, should be oldest to newest + for i, j := 0, len(changes)-1; i < j; i, j = i+1, j-1 { + changes[i], changes[j] = changes[j], changes[i] + } } - } + }() return changes, res.All(&changes) } From 55f635c775f5ee62753fa6c31d68446e94c2e9e5 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Mon, 28 Aug 2017 15:11:07 -0700 Subject: [PATCH 3/3] review comments Signed-off-by: David Lawrence (github: endophage) --- server/handlers/changefeed.go | 7 ++++++- server/handlers/default.go | 18 +++++++++++++++++ server/storage/errors.go | 10 ++++++++++ server/storage/memory.go | 2 +- server/storage/rethinkdb.go | 35 ++++++++++++++++++++++++---------- server/storage/sql_models.go | 2 +- server/storage/sqldb.go | 2 +- server/storage/storage_test.go | 20 +++++++++++-------- 8 files changed, 74 insertions(+), 22 deletions(-) diff --git a/server/handlers/changefeed.go b/server/handlers/changefeed.go index c42353ebe..1f5606432 100644 --- a/server/handlers/changefeed.go +++ b/server/handlers/changefeed.go @@ -43,7 +43,12 @@ func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) err func changefeed(logger ctxu.Logger, store storage.MetaStore, gun, changeID string, records int64) ([]byte, error) { changes, err := store.GetChanges(changeID, int(records), gun) - if err != nil { + switch err.(type) { + case nil: + // no error to return + case storage.ErrBadQuery: + return nil, errors.ErrInvalidParams.WithDetail(err) + default: logger.Errorf("%d GET could not retrieve records: %s", http.StatusInternalServerError, err.Error()) return nil, errors.ErrUnknown.WithDetail(err) } diff --git a/server/handlers/default.go b/server/handlers/default.go index 09b37e80e..182fe1f37 100644 --- a/server/handlers/default.go +++ b/server/handlers/default.go @@ -2,6 +2,8 @@ package handlers import ( "bytes" + "crypto/sha256" + "encoding/hex" "encoding/json" "io" "net/http" @@ -114,9 +116,24 @@ func atomicUpdateHandler(ctx context.Context, w http.ResponseWriter, r *http.Req logger.Errorf("500 POST error applying update request: %v", err) return errors.ErrUpdating.WithDetail(nil) } + + logTS(logger, gun.String(), updates) + return nil } +// logTS logs the timestamp update at Info level +func logTS(logger ctxu.Logger, gun string, updates []storage.MetaUpdate) { + for _, update := range updates { + if update.Role == data.CanonicalTimestampRole { + checksumBin := sha256.Sum256(update.Data) + checksum := hex.EncodeToString(checksumBin[:]) + logger.Infof("updated %s to timestamp version %d, checksum %s", gun, update.Version, checksum) + break + } + } +} + // GetHandler returns the json for a specified role and GUN. func GetHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() @@ -174,6 +191,7 @@ func DeleteHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) logger.Error("500 DELETE repository") return errors.ErrUnknown.WithDetail(err) } + logger.Infof("trust data deleted for %s", gun) return nil } diff --git a/server/storage/errors.go b/server/storage/errors.go index abee09a9f..07e3a1ca0 100644 --- a/server/storage/errors.go +++ b/server/storage/errors.go @@ -40,3 +40,13 @@ type ErrNoKey struct { func (err ErrNoKey) Error() string { return fmt.Sprintf("Error, no timestamp key found for %s", err.gun) } + +// ErrBadQuery is used when the parameters provided cannot be appropriately +// coerced. +type ErrBadQuery struct { + msg string +} + +func (err ErrBadQuery) Error() string { + return fmt.Sprintf("did not recognize parameters: %s", err.msg) +} diff --git a/server/storage/memory.go b/server/storage/memory.go index 641ffb514..abf6e68fb 100644 --- a/server/storage/memory.go +++ b/server/storage/memory.go @@ -224,7 +224,7 @@ func (st *MemStorage) GetChanges(changeID string, records int, filterName string } else { id, err = strconv.ParseInt(changeID, 10, 32) if err != nil { - return nil, err + return nil, ErrBadQuery{msg: fmt.Sprintf("change ID expected to be integer, provided ID was: %d", changeID)} } } var ( diff --git a/server/storage/rethinkdb.go b/server/storage/rethinkdb.go index f57e290a1..a3099adb9 100644 --- a/server/storage/rethinkdb.go +++ b/server/storage/rethinkdb.go @@ -14,6 +14,11 @@ import ( "gopkg.in/dancannon/gorethink.v3" ) +// RethinkDB has eventual consistency. This represents a 60 second blackout +// period of the most recent changes in the changefeed which will not be +// returned while the eventual consistency works itself out. +// It's a var not a const so that the tests can turn it down to zero rather +// than have to include a sleep. var blackoutTime = 60 // RDBTUFFile is a TUF file record @@ -35,12 +40,12 @@ func (r RDBTUFFile) TableName() string { // Change defines the the fields required for an object in the changefeed type Change struct { - ID string `gorethink:"id,omitempty"` + ID string `gorethink:"id,omitempty" gorm:"primary_key" sql:"not null"` CreatedAt time.Time `gorethink:"created_at"` - GUN string `gorethink:"gun"` - Version int `gorethink:"version"` - SHA256 string `gorethink:"sha256"` - Category string `gorethink:"category"` + GUN string `gorethink:"gun" gorm:"column:gun" sql:"type:varchar(255);not null"` + Version int `gorethink:"version" sql:"not null"` + SHA256 string `gorethink:"sha256" gorm:"column:sha256" sql:"type:varchar(64);"` + Category string `gorethink:"category" sql:"type:varchar(20);not null;"` } // TableName sets a specific table name for Changefeed @@ -342,7 +347,8 @@ func (rdb RethinkDB) writeChange(gun string, version int, sha256, category strin return err } -// GetChanges is not implemented for RethinkDB +// GetChanges returns up to pageSize changes starting from changeID. It uses the +// blackout to account for RethinkDB's eventual consistency model func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string) ([]Change, error) { var ( lower, upper, bound []interface{} @@ -386,8 +392,15 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string changes := make([]Change, 0, pageSize) + // Between returns a slice of results from the rethinkdb table. + // The results are ordered using BetweenOpts.Index, which will + // default to the index of the immediately preceding OrderBy. + // The lower and upper are the start and end points for the slice + // and the Left/RightBound values determine whether the lower and + // upper values are included in the result per normal set semantics + // of "open" and "closed" res, err := gorethink.DB(rdb.dbName). - Table(Change{}.TableName()). + Table(Change{}.TableName(), gorethink.TableOpts{ReadMode: "majority"}). OrderBy(order). Between( lower, @@ -414,10 +427,12 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string return changes, res.All(&changes) } +// bound creates the correct boundary based in the index that should be used for +// querying the changefeed. func (rdb RethinkDB) bound(changeID, filterName string) ([]interface{}, string) { - term := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at") + createdAtTerm := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at") if filterName != "" { - return []interface{}{filterName, term, changeID}, "rdb_gun_created_at_id" + return []interface{}{filterName, createdAtTerm, changeID}, "rdb_gun_created_at_id" } - return []interface{}{term, changeID}, "rdb_created_at_id" + return []interface{}{createdAtTerm, changeID}, "rdb_created_at_id" } diff --git a/server/storage/sql_models.go b/server/storage/sql_models.go index a7d9ee023..f57caeb3d 100644 --- a/server/storage/sql_models.go +++ b/server/storage/sql_models.go @@ -34,7 +34,7 @@ func (g TUFFile) TableName() string { // SQLChange defines the the fields required for an object in the changefeed type SQLChange struct { - ID uint `gorm:"primary_key" sql:";not null" json:",string"` + ID uint `gorm:"primary_key" sql:"not null" json:",string"` CreatedAt time.Time GUN string `gorm:"column:gun" sql:"type:varchar(255);not null"` Version int `sql:"not null"` diff --git a/server/storage/sqldb.go b/server/storage/sqldb.go index 413aeaebd..9ab07de1d 100644 --- a/server/storage/sqldb.go +++ b/server/storage/sqldb.go @@ -281,7 +281,7 @@ func (db *SQLStorage) GetChanges(changeID string, records int, filterName string } else { id, err = strconv.ParseInt(changeID, 10, 32) if err != nil { - return nil, err + return nil, ErrBadQuery{msg: fmt.Sprintf("change ID expected to be integer, provided ID was: %d", changeID)} } } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index 9e8bb69db..92a9349f5 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -386,14 +386,6 @@ func testGetChanges(t *testing.T, s MetaStore) { require.NoError(t, err) require.Len(t, c, 0) - //c, err = s.GetChanges(full[7].ID, -4, "") - //require.NoError(t, err) - //require.Len(t, c, 4) - //for i := 0; i < 4; i++ { - // require.Equal(t, "busybox", c[i].GUN) - // require.Equal(t, i+1, c[i].Version) - //} - c, err = s.GetChanges(full[6].ID, -4, "") require.NoError(t, err) require.Len(t, c, 4) @@ -437,6 +429,17 @@ func testGetChanges(t *testing.T, s MetaStore) { require.NoError(t, err) require.Equal(t, before, after) + _, err1 := s.GetChanges("1000", 0, "") + _, err2 := s.GetChanges("doesn't exist", 0, "") + if _, ok := s.(RethinkDB); ok { + require.Error(t, err1) + require.Error(t, err2) + } else { + require.NoError(t, err1) + require.Error(t, err2) + require.IsType(t, ErrBadQuery{}, err2) + } + // do a deletion and check is shows up. require.NoError(t, s.Delete("alpine")) c, err = s.GetChanges("-1", -1, "") @@ -453,4 +456,5 @@ func testGetChanges(t *testing.T, s MetaStore) { require.Len(t, c, 2) require.NotEqual(t, changeCategoryDeletion, c[0].Category) require.NotEqual(t, "alpine", c[0].GUN) + }