Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify usage of WatchSet methods and behaviour of First/Last #118

Merged
merged 2 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,15 @@ func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error)
}

// FirstWatch is used to return the first matching object for
// the given constraints on the index along with the watch channel
// the given constraints on the index along with the watch channel.
//
// Note that all values read in the transaction form a consistent snapshot
// from the time when the transaction was created.
//
// The watch channel is closed when a subsequent write transaction
// has updated the result of the query. Since each read transaction
// operates on an isolated snapshot, a new read transaction must be
// started to observe the changes that have been made.
func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
// Get the index value
indexSchema, val, err := txn.getIndexValue(table, index, args...)
Expand Down Expand Up @@ -541,7 +549,15 @@ func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan str
}

// LastWatch is used to return the last matching object for
// the given constraints on the index along with the watch channel
// the given constraints on the index along with the watch channel.
//
// Note that all values read in the transaction form a consistent snapshot
// from the time when the transaction was created.
//
// The watch channel is closed when a subsequent write transaction
// has updated the result of the query. Since each read transaction
// operates on an isolated snapshot, a new read transaction must be
// started to observe the changes that have been made.
func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
// Get the index value
indexSchema, val, err := txn.getIndexValue(table, index, args...)
Expand Down Expand Up @@ -569,14 +585,20 @@ func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan stru
}

// First is used to return the first matching object for
// the given constraints on the index
// the given constraints on the index.
//
// Note that all values read in the transaction form a consistent snapshot
// from the time when the transaction was created.
func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
_, val, err := txn.FirstWatch(table, index, args...)
return val, err
}

// Last is used to return the last matching object for
// the given constraints on the index
// the given constraints on the index.
//
// Note that all values read in the transaction form a consistent snapshot
// from the time when the transaction was created.
func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) {
_, val, err := txn.LastWatch(table, index, args...)
return val, err
Expand All @@ -589,6 +611,9 @@ func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, err
// null and fail to find a leaf node). This should only be used where the prefix
// given is capable of matching indexed entries directly, which typically only
// applies to a custom indexer. See the unit test for an example.
//
// Note that all values read in the transaction form a consistent snapshot
// from the time when the transaction was created.
func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
// Enforce that this only works on prefix indexes.
if !strings.HasSuffix(index, "_prefix") {
Expand Down
6 changes: 3 additions & 3 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-c
}
}

// Watch is used to wait for either the watch set to trigger or a timeout.
// Watch is used to wait for any channel of the watch set to trigger or a timeout.
// Returns true on timeout.
func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
if w == nil {
Expand All @@ -64,7 +64,7 @@ func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
return w.WatchCtx(ctx) == context.Canceled
}

// WatchCtx is used to wait for either the watch set to trigger or for the
// WatchCtx is used to wait for any channel of the watch set to trigger or for the
// context to be cancelled. Watch with a timeout channel can be mimicked by
// creating a context with a deadline. WatchCtx should be preferred over Watch.
func (w WatchSet) WatchCtx(ctx context.Context) error {
Expand Down Expand Up @@ -128,7 +128,7 @@ func (w WatchSet) watchMany(ctx context.Context) error {
}
}

// WatchCh returns a channel that is used to wait for either the watch set to trigger
// WatchCh returns a channel that is used to wait for any channel of the watch set to trigger
// or for the context to be cancelled. WatchCh creates a new goroutine each call, so
// callers may need to cache the returned channel to avoid creating extra goroutines.
func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
Expand Down