diff --git a/core/core.go b/core/core.go index 0186451e8..eba59bebe 100644 --- a/core/core.go +++ b/core/core.go @@ -22,7 +22,6 @@ import ( "github.com/0xProject/0x-mesh/ethereum/blockwatch" "github.com/0xProject/0x-mesh/ethereum/ethrpcclient" "github.com/0xProject/0x-mesh/ethereum/ratelimit" - "github.com/0xProject/0x-mesh/expirationwatch" "github.com/0xProject/0x-mesh/keys" "github.com/0xProject/0x-mesh/loghooks" "github.com/0xProject/0x-mesh/orderfilter" @@ -188,31 +187,22 @@ type Config struct { EthereumRPCClient ethclient.RPCClient `envvar:"-"` } -type snapshotInfo struct { - // Snapshot *db.Snapshot - CreatedAt time.Time - ExpirationTimestamp time.Time -} - type App struct { - config Config - privateConfig privateConfig - peerID peer.ID - privKey p2pcrypto.PrivKey - node *p2p.Node - chainID int - blockWatcher *blockwatch.Watcher - orderWatcher *orderwatch.Watcher - orderValidator *ordervalidator.OrderValidator - orderFilter *orderfilter.Filter - snapshotExpirationWatcher *expirationwatch.Watcher - muIdToSnapshotInfo sync.Mutex - idToSnapshotInfo map[string]snapshotInfo - ethRPCRateLimiter ratelimit.RateLimiter - ethRPCClient ethrpcclient.Client - db *db.DB - ordersyncService *ordersync.Service - contractAddresses *ethereum.ContractAddresses + config Config + privateConfig privateConfig + peerID peer.ID + privKey p2pcrypto.PrivKey + node *p2p.Node + chainID int + blockWatcher *blockwatch.Watcher + orderWatcher *orderwatch.Watcher + orderValidator *ordervalidator.OrderValidator + orderFilter *orderfilter.Filter + ethRPCRateLimiter ratelimit.RateLimiter + ethRPCClient ethrpcclient.Client + db *db.DB + ordersyncService *ordersync.Service + contractAddresses *ethereum.ContractAddresses // started is closed to signal that the App has been started. Some methods // will block until after the App is started. @@ -376,26 +366,21 @@ func newWithPrivateConfig(config Config, pConfig privateConfig) (*App, error) { return nil, fmt.Errorf("invalid custom order filter: %s", err.Error()) } - // Initialize remaining fields. - snapshotExpirationWatcher := expirationwatch.New() - app := &App{ - started: make(chan struct{}), - config: config, - privateConfig: pConfig, - privKey: privKey, - peerID: peerID, - chainID: config.EthereumChainID, - blockWatcher: blockWatcher, - orderWatcher: orderWatcher, - orderValidator: orderValidator, - orderFilter: orderFilter, - snapshotExpirationWatcher: snapshotExpirationWatcher, - idToSnapshotInfo: map[string]snapshotInfo{}, - ethRPCRateLimiter: ethRPCRateLimiter, - ethRPCClient: ethClient, - db: database, - contractAddresses: &contractAddresses, + started: make(chan struct{}), + config: config, + privateConfig: pConfig, + privKey: privKey, + peerID: peerID, + chainID: config.EthereumChainID, + blockWatcher: blockWatcher, + orderWatcher: orderWatcher, + orderValidator: orderValidator, + orderFilter: orderFilter, + ethRPCRateLimiter: ethRPCRateLimiter, + ethRPCClient: ethClient, + db: database, + contractAddresses: &contractAddresses, } log.WithFields(map[string]interface{}{ @@ -525,29 +510,6 @@ func (app *App) Start(ctx context.Context) error { ethRPCRateLimiterErrChan <- app.ethRPCRateLimiter.Start(innerCtx, rateLimiterCheckpointInterval) }() - // Set up the snapshot expiration watcher pruning logic - wg.Add(1) - go func() { - defer wg.Done() - defer func() { - log.Debug("closing snapshot expiration watcher") - }() - ticker := time.NewTicker(expirationPollingInterval) - for { - select { - case <-innerCtx.Done(): - return - case now := <-ticker.C: - expiredSnapshots := app.snapshotExpirationWatcher.Prune(now) - for _, expiredSnapshot := range expiredSnapshots { - app.muIdToSnapshotInfo.Lock() - delete(app.idToSnapshotInfo, expiredSnapshot.ID) - app.muIdToSnapshotInfo.Unlock() - } - } - } - }() - // Start the order watcher. orderWatcherErrChan := make(chan error, 1) wg.Add(1) diff --git a/expirationwatch/expiration_watcher.go b/expirationwatch/expiration_watcher.go deleted file mode 100644 index f5915b9ac..000000000 --- a/expirationwatch/expiration_watcher.go +++ /dev/null @@ -1,103 +0,0 @@ -package expirationwatch - -import ( - "sync" - "time" - - "github.com/albrow/stringset" - "github.com/ocdogan/rbt" - log "github.com/sirupsen/logrus" -) - -// ExpiredItem represents an expired item returned from the Watcher -type ExpiredItem struct { - ExpirationTimestamp time.Time - ID string -} - -// Watcher watches the expiration of items -type Watcher struct { - expiredItems chan []ExpiredItem - rbTreeMu sync.RWMutex - rbTree *rbt.RbTree -} - -// New instantiates a new expiration watcher -func New() *Watcher { - rbTree := rbt.NewRbTree() - return &Watcher{ - expiredItems: make(chan []ExpiredItem, 10), - rbTree: rbTree, - } -} - -// Add adds a new item identified by an ID to the expiration watcher -func (w *Watcher) Add(expirationTimestamp time.Time, id string) { - key := rbt.Int64Key(expirationTimestamp.Unix()) - w.rbTreeMu.Lock() - defer w.rbTreeMu.Unlock() - value, ok := w.rbTree.Get(&key) - var ids stringset.Set - if !ok { - ids = stringset.New() - } else { - ids = value.(stringset.Set) - } - ids.Add(id) - w.rbTree.Insert(&key, ids) -} - -// Remove removes the item with a specified id from the expiration watcher -func (w *Watcher) Remove(expirationTimestamp time.Time, id string) { - key := rbt.Int64Key(expirationTimestamp.Unix()) - w.rbTreeMu.Lock() - defer w.rbTreeMu.Unlock() - value, ok := w.rbTree.Get(&key) - if !ok { - // Due to the asynchronous nature of the Watcher and OrderWatcher, there are - // race-conditions where we try to remove an item from the Watcher after it - // has already been removed. - log.WithFields(log.Fields{ - "id": id, - }).Trace("Attempted to remove item from Watcher that no longer exists") - return // Noop - } else { - ids := value.(stringset.Set) - ids.Remove(id) - if len(ids) == 0 { - w.rbTree.Delete(&key) - } else { - w.rbTree.Insert(&key, ids) - } - } -} - -// Prune checks for any expired items given a timestamp and removes any expired -// items from the expiration watcher and returns them to the caller -func (w *Watcher) Prune(timestamp time.Time) []ExpiredItem { - pruned := []ExpiredItem{} - for { - w.rbTreeMu.RLock() - key, value := w.rbTree.Min() - w.rbTreeMu.RUnlock() - if key == nil { - break - } - expirationTimeSeconds := int64(*key.(*rbt.Int64Key)) - expirationTime := time.Unix(expirationTimeSeconds, 0) - if timestamp.Before(expirationTime) { - break - } - ids := value.(stringset.Set) - for id := range ids { - pruned = append(pruned, ExpiredItem{ - ExpirationTimestamp: expirationTime, - ID: id, - }) - } - w.rbTreeMu.Lock() - w.rbTree.Delete(key) - w.rbTreeMu.Unlock() - } - return pruned -} diff --git a/expirationwatch/expiration_watcher_test.go b/expirationwatch/expiration_watcher_test.go deleted file mode 100644 index 7a7e55275..000000000 --- a/expirationwatch/expiration_watcher_test.go +++ /dev/null @@ -1,137 +0,0 @@ -package expirationwatch - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestPrunesExpiredItems(t *testing.T) { - watcher := New() - - current := time.Now().Truncate(time.Second) - expiryEntryOne := ExpiredItem{ - ExpirationTimestamp: current.Add(-3 * time.Second), - ID: "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e", - } - watcher.Add(expiryEntryOne.ExpirationTimestamp, expiryEntryOne.ID) - - expiryEntryTwo := ExpiredItem{ - ExpirationTimestamp: current.Add(-1 * time.Second), - ID: "0x12ab7edd34515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d3bee521", - } - watcher.Add(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - pruned := watcher.Prune(current) - assert.Len(t, pruned, 2, "two expired items should get pruned") - assert.Equal(t, expiryEntryOne, pruned[0]) - assert.Equal(t, expiryEntryTwo, pruned[1]) -} - -func TestPrunesTwoExpiredItemsWithSameExpiration(t *testing.T) { - watcher := New() - - current := time.Now().Truncate(time.Second) - expiration := current.Add(-3 * time.Second) - expiryEntryOne := ExpiredItem{ - ExpirationTimestamp: expiration, - ID: "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e", - } - watcher.Add(expiryEntryOne.ExpirationTimestamp, expiryEntryOne.ID) - - expiryEntryTwo := ExpiredItem{ - ExpirationTimestamp: expiration, - ID: "0x12ab7edd34515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d3bee521", - } - watcher.Add(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - pruned := watcher.Prune(current) - assert.Len(t, pruned, 2, "two expired items should get pruned") - hashes := map[string]bool{ - expiryEntryOne.ID: true, - expiryEntryTwo.ID: true, - } - for _, expiredItem := range pruned { - assert.True(t, hashes[expiredItem.ID]) - } -} - -func TestPrunesBarelyExpiredItem(t *testing.T) { - watcher := New() - - current := time.Now().Truncate(time.Second) - expiryEntryOne := ExpiredItem{ - ExpirationTimestamp: current, - ID: "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e", - } - watcher.Add(expiryEntryOne.ExpirationTimestamp, expiryEntryOne.ID) - - pruned := watcher.Prune(current) - assert.Len(t, pruned, 1, "one expired item should get pruned") - assert.Equal(t, expiryEntryOne, pruned[0]) -} - -func TestKeepsUnexpiredItem(t *testing.T) { - watcher := New() - - id := "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e" - current := time.Now().Truncate(time.Second) - watcher.Add(current.Add(10*time.Second), id) - - pruned := watcher.Prune(current) - assert.Equal(t, 0, len(pruned), "Doesn't prune unexpired item") -} - -func TestReturnsEmptyIfNoItems(t *testing.T) { - watcher := New() - - pruned := watcher.Prune(time.Now()) - assert.Len(t, pruned, 0, "Returns empty array when no items tracked") -} - -func TestRemoveOnlyItemWithSpecificExpirationTime(t *testing.T) { - watcher := New() - - current := time.Now().Truncate(time.Second) - expiryEntryOne := ExpiredItem{ - ExpirationTimestamp: current.Add(-3 * time.Second), - ID: "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e", - } - watcher.Add(expiryEntryOne.ExpirationTimestamp, expiryEntryOne.ID) - - expiryEntryTwo := ExpiredItem{ - ExpirationTimestamp: current.Add(-1 * time.Second), - ID: "0x12ab7edd34515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d3bee521", - } - watcher.Add(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - watcher.Remove(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - pruned := watcher.Prune(current) - assert.Len(t, pruned, 1, "two expired items should get pruned") - assert.Equal(t, expiryEntryOne, pruned[0]) -} -func TestRemoveItemWhichSharesExpirationTimeWithOtherItems(t *testing.T) { - watcher := New() - - current := time.Now().Truncate(time.Second) - singleExpirationTimestamp := current.Add(-3 * time.Second) - expiryEntryOne := ExpiredItem{ - ExpirationTimestamp: singleExpirationTimestamp, - ID: "0x8e209dda7e515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d97385e", - } - watcher.Add(expiryEntryOne.ExpirationTimestamp, expiryEntryOne.ID) - - expiryEntryTwo := ExpiredItem{ - ExpirationTimestamp: singleExpirationTimestamp, - ID: "0x12ab7edd34515025d0c34aa61a0d1156a631248a4318576a2ce0fb408d3bee521", - } - watcher.Add(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - watcher.Remove(expiryEntryTwo.ExpirationTimestamp, expiryEntryTwo.ID) - - pruned := watcher.Prune(current) - assert.Len(t, pruned, 1, "two expired items should get pruned") - assert.Equal(t, expiryEntryOne, pruned[0]) -}