-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
peb iterator #34
base: master
Are you sure you want to change the base?
peb iterator #34
Conversation
frairon
commented
May 31, 2024
- delete telly
- implement disktail
return nil, fmt.Errorf("error opening storage %s: %w", path, err) | ||
} | ||
|
||
ring := &DiskTail[T]{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldnt call the tailer ring
:)
).ErrorOrNil() | ||
} | ||
|
||
// Run starts the ring. It's only allowed to be called once per instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ring
disktail/disktail.go
Outdated
itemHandler := handler | ||
|
||
if skipDuplicates { | ||
keys := make(map[string]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map could grow really big, but our main use case is iterating over a couple thousand users with a key size of 24, so this should not be a problem.
time.Sleep(100 * time.Millisecond) | ||
} | ||
t.Fatalf("operation timed out") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would also add a test for the cleaner
storedOffsets, err := r.getOffsets(parts) | ||
if err != nil { | ||
return fmt.Errorf("error reading local offsets: %w", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be moved above errgroup.WithContext(ctx)
disktail/disktail.go
Outdated
} | ||
|
||
type IterStats struct { | ||
sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Mutex
is public, but it shouldn't.
A caller of (r *DiskTail[T]) Iterate
could pass already locked IterStats
, deadlocking the whole thing.
itemsIterated int64 | ||
// number of unique keys. Note that this value will be 0 if duplicates are included when | ||
// iterating. | ||
uniqueKeys int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use atomic.Int64
? It's there since 1.19.
// track oldest timestamp for stats | ||
timestamp, _, _ := decodeKey(key) | ||
if stats.oldestItem.IsZero() || timestamp.Before(stats.oldestItem) { | ||
stats.oldestItem = timestamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read (func (is *IterStats) OldestItem() time.Time
) is protected by the lock.
But the write isn't?
|
||
func (r *DiskTail[T]) Iterate(ctx context.Context, skipDuplicates bool, stats *IterStats, handler func(item *Item[T]) HandleResult) error { | ||
if stats == nil { | ||
stats = &IterStats{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Locking on these stats
doesn't make much sense? They're not visible to the caller.
disktail/disktail.go
Outdated
// skip it | ||
return Continue | ||
} | ||
keys[item.Key] = struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map is not protected by the lock?
Is there a difference between uniqueKeys
and map regarding access?
} | ||
} | ||
|
||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for { | |
for ctx.Err() == nil { |
and remove the condition below
disktail/disktail.go
Outdated
if !it.Prev() { | ||
return nil | ||
} else { | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove the negation and swap then & else.
Fits the skip it
comment better IMO.