Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

(VDB-570) Handle duplicate storage diffs #93

Merged
merged 1 commit into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,5 @@ func composeAndExecute() {
func init() {
rootCmd.AddCommand(composeAndExecuteCmd)
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmulhol The format/resolution to supply this parameter in is unclear. Seconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m0ar duration takes a format like 5m30s - put some examples in the flags section of the composeAndExecute docs, but happy to augment/move that if you think that'd be helpful.

Copy link
Contributor

@m0ar m0ar May 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add it in the short-hand docs on this line as well? I'd never have guessed :D

}
2 changes: 1 addition & 1 deletion cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func execute() {
func init() {
rootCmd.AddCommand(executeCmd)
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5*time.Minute, "how often to recheck queued storage diffs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

}

type Exporter interface {
Expand Down
24 changes: 12 additions & 12 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ import (
)

var (
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
ipc string
levelDbPath string
cfgFile string
databaseConfig config.Database
genConfig config.Plugin
ipc string
levelDbPath string
queueRecheckInterval time.Duration
startingBlockNumber int64
storageDiffsPath string
syncAll bool
endingBlockNumber int64
recheckHeadersArg bool
startingBlockNumber int64
storageDiffsPath string
syncAll bool
endingBlockNumber int64
recheckHeadersArg bool
)

const (
pollingInterval = 7 * time.Second
validationWindow = 15
pollingInterval = 7 * time.Second
validationWindow = 15
)

var rootCmd = &cobra.Command{
Expand Down
3 changes: 3 additions & 0 deletions libraries/shared/storage/utils/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package utils

import (
"errors"
"fmt"
)

var ErrRowExists = errors.New("parsed row for storage diff already exists")

type ErrContractNotFound struct {
Contract string
}
Expand Down
4 changes: 2 additions & 2 deletions libraries/shared/watcher/storage_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
return
}
executeErr := storageTransformer.Execute(row)
if executeErr != nil {
if executeErr != nil && executeErr != utils.ErrRowExists {
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
queueErr := storageWatcher.Queue.Add(row)
if queueErr != nil {
Expand All @@ -100,7 +100,7 @@ func (storageWatcher StorageWatcher) processQueue() {
continue
}
executeErr := storageTransformer.Execute(row)
if executeErr == nil {
if executeErr == nil || executeErr == utils.ErrRowExists {
storageWatcher.deleteRow(row.Id)
}
}
Expand Down
23 changes: 23 additions & 0 deletions libraries/shared/watcher/storage_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("does not queue row if transformer execution fails because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Hour)

Expect(<-errs).To(BeNil())
Consistently(func() bool {
return mockQueue.AddCalled
}).Should(BeFalse())
close(done)
})

It("queues row for later processing if transformer execution fails", func(done Done) {
mockTransformer.ExecuteErr = fakes.FakeError

Expand Down Expand Up @@ -187,6 +199,17 @@ var _ = Describe("Storage Watcher", func() {
close(done)
})

It("deletes row from queue if transformer execution errors because row already exists", func(done Done) {
mockTransformer.ExecuteErr = utils.ErrRowExists

go storageWatcher.Execute(rows, errs, time.Nanosecond)

Eventually(func() int {
return mockQueue.DeletePassedId
}).Should(Equal(row.Id))
close(done)
})

It("logs error if deleting persisted row fails", func(done Done) {
mockQueue.DeleteErr = fakes.FakeError
tempFile, fileErr := ioutil.TempFile("", "log")
Expand Down