Skip to content

Commit

Permalink
Merge pull request #1214 from endophage/changefeed
Browse files Browse the repository at this point in the history
changefeed for rethinkdb
  • Loading branch information
endophage authored Aug 28, 2017
2 parents f2744d5 + 55f635c commit ab64f58
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 76 deletions.
7 changes: 6 additions & 1 deletion server/handlers/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) err

func changefeed(logger ctxu.Logger, store storage.MetaStore, gun, changeID string, records int64) ([]byte, error) {
changes, err := store.GetChanges(changeID, int(records), gun)
if err != nil {
switch err.(type) {
case nil:
// no error to return
case storage.ErrBadQuery:
return nil, errors.ErrInvalidParams.WithDetail(err)
default:
logger.Errorf("%d GET could not retrieve records: %s", http.StatusInternalServerError, err.Error())
return nil, errors.ErrUnknown.WithDetail(err)
}
Expand Down
18 changes: 18 additions & 0 deletions server/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package handlers

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -114,9 +116,24 @@ func atomicUpdateHandler(ctx context.Context, w http.ResponseWriter, r *http.Req
logger.Errorf("500 POST error applying update request: %v", err)
return errors.ErrUpdating.WithDetail(nil)
}

logTS(logger, gun.String(), updates)

return nil
}

// logTS logs the timestamp update at Info level
func logTS(logger ctxu.Logger, gun string, updates []storage.MetaUpdate) {
for _, update := range updates {
if update.Role == data.CanonicalTimestampRole {
checksumBin := sha256.Sum256(update.Data)
checksum := hex.EncodeToString(checksumBin[:])
logger.Infof("updated %s to timestamp version %d, checksum %s", gun, update.Version, checksum)
break
}
}
}

// GetHandler returns the json for a specified role and GUN.
func GetHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
Expand Down Expand Up @@ -174,6 +191,7 @@ func DeleteHandler(ctx context.Context, w http.ResponseWriter, r *http.Request)
logger.Error("500 DELETE repository")
return errors.ErrUnknown.WithDetail(err)
}
logger.Infof("trust data deleted for %s", gun)
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions server/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ type ErrNoKey struct {
func (err ErrNoKey) Error() string {
return fmt.Sprintf("Error, no timestamp key found for %s", err.gun)
}

// ErrBadQuery is used when the parameters provided cannot be appropriately
// coerced.
type ErrBadQuery struct {
msg string
}

func (err ErrBadQuery) Error() string {
return fmt.Sprintf("did not recognize parameters: %s", err.msg)
}
6 changes: 3 additions & 3 deletions server/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (st *MemStorage) UpdateCurrent(gun data.GUN, update MetaUpdate) error {
// the MemStorage. Behaviour is undefined otherwise
func (st *MemStorage) writeChange(gun data.GUN, version int, checksum string) {
c := Change{
ID: uint(len(st.changes) + 1),
ID: strconv.Itoa(len(st.changes) + 1),
GUN: gun.String(),
Version: version,
SHA256: checksum,
Expand Down Expand Up @@ -200,7 +200,7 @@ func (st *MemStorage) Delete(gun data.GUN) error {
}
delete(st.checksums, gun.String())
c := Change{
ID: uint(len(st.changes) + 1),
ID: strconv.Itoa(len(st.changes) + 1),
GUN: gun.String(),
Category: changeCategoryDeletion,
CreatedAt: time.Now(),
Expand All @@ -224,7 +224,7 @@ func (st *MemStorage) GetChanges(changeID string, records int, filterName string
} else {
id, err = strconv.ParseInt(changeID, 10, 32)
if err != nil {
return nil, err
return nil, ErrBadQuery{msg: fmt.Sprintf("change ID expected to be integer, provided ID was: %d", changeID)}
}
}
var (
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

// drop all tables, if they exist
gormDB.DropTable(&TUFFile{})
gormDB.DropTable(&Change{})
gormDB.DropTable(&SQLChange{})
}
cleanup1()
dbStore := SetupSQLDB(t, "mysql", dburl)
Expand Down
2 changes: 1 addition & 1 deletion server/storage/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func init() {

// drop all tables, if they exist
gormDB.DropTable(&TUFFile{})
gormDB.DropTable(&Change{})
gormDB.DropTable(&SQLChange{})
}
cleanup1()
dbStore := SetupSQLDB(t, notary.PostgresBackend, dburl)
Expand Down
8 changes: 8 additions & 0 deletions server/storage/rethink_realdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func rethinkDBSetup(t *testing.T) (RethinkDB, func()) {
cleanup()
require.NoError(t, rethinkdb.SetupDB(session, dbName, []rethinkdb.Table{
TUFFilesRethinkTable,
ChangeRethinkTable,
}))
return NewRethinkDBStorage(dbName, "", "", session), cleanup
}
Expand Down Expand Up @@ -169,3 +170,10 @@ func TestRethinkTUFMetaStoreGetCurrent(t *testing.T) {

testTUFMetaStoreGetCurrent(t, dbStore)
}

func TestRethinkDBGetChanges(t *testing.T) {
dbStore, cleanup := rethinkDBSetup(t)
defer cleanup()

testGetChanges(t, dbStore)
}
Loading

0 comments on commit ab64f58

Please sign in to comment.