Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: search for state/sequence number by timestamp #38

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions replication/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
osm/replication [![Godoc Reference](https://godoc.org/github.com/paulmach/osm/replication?status.svg)](https://godoc.org/github.com/paulmach/osm/replication)
===============
# osm/replication [![Godoc Reference](https://pkg.go.dev/badge/github.com/paulmach/osm)](https://pkg.go.dev/github.com/paulmach/osm/replication)

Package `replication` handles fetching the Minute, Hour, Day and Changeset replication
and the associated state value from [Planet OSM](http://planet.osm.org).
Expand All @@ -18,3 +17,15 @@ Once you know the change number you want, fetch the change using:
```go
change, err := replication.Minute(ctx, num)
```

## Finding sequences numbers by timestamp

It's also possible to find the sequence number by timestamp.
These calls make multiple requests for state to find the one matching the given timestamp.

```go
MinuteStateAt(ctx context.Context, timestamp time.Time) (MinuteSeqNum, *State, error)
HourStateAt(ctx context.Context, timestamp time.Time) (HourSeqNum, *State, error)
DayStateAt(ctx context.Context, timestamp time.Time) (DaySeqNum, *State, error)
ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum, *State, error)
```
2 changes: 2 additions & 0 deletions replication/changesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func (ds *Datasource) CurrentChangesetState(ctx context.Context) (ChangesetSeqNu
}

// ChangesetState returns the state for the given changeset replication.
// There are no state files before 2007990. In that case a 404 error is returned.
// Delegates to the DefaultDatasource and uses its http.Client to make the request.
func ChangesetState(ctx context.Context, n ChangesetSeqNum) (*State, error) {
return DefaultDatasource.ChangesetState(ctx, n)
}

// ChangesetState returns the state for the given changeset replication.
// There are no state files before 2007990. In that case a 404 error is returned.
func (ds *Datasource) ChangesetState(ctx context.Context, n ChangesetSeqNum) (*State, error) {
return ds.fetchChangesetState(ctx, n)
}
Expand Down
315 changes: 315 additions & 0 deletions replication/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
package replication

import (
"context"
"time"
)

// the valid minimum state number on planet.osm.org
const (
minMinute = 1 // up to 2012-09-12T08:15:45Z
minHour = 1 // up to 2013-07-14T12:00:00Z
minDay = 1 // up to 2012-09-13T00:00:00Z

// There are changes before this, but no state.
minChangeset = 2007990 // 2016-09-07 10:45:02.148547780 Z
)

type stater struct {
Min uint64
Current func(context.Context) (*State, error)
State func(context.Context, uint64) (*State, error)
}

// MinuteStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
// Delegates to the DefaultDatasource and uses its http.Client to make the request.
func MinuteStateAt(ctx context.Context, timestamp time.Time) (MinuteSeqNum, *State, error) {
return DefaultDatasource.MinuteStateAt(ctx, timestamp)
}

// MinuteStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
func (ds *Datasource) MinuteStateAt(ctx context.Context, timestamp time.Time) (MinuteSeqNum, *State, error) {
s := &stater{
Min: minMinute,
Current: func(ctx context.Context) (*State, error) {
_, s, err := ds.CurrentMinuteState(ctx)
return s, err
},
State: func(ctx context.Context, n uint64) (*State, error) {
return ds.MinuteState(ctx, MinuteSeqNum(n))
},
}
state, err := searchTimestamp(ctx, s, timestamp)
if err != nil {
return 0, nil, err
}

return MinuteSeqNum(state.SeqNum), state, nil
}

// HourStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
// Delegates to the DefaultDatasource and uses its http.Client to make the request.
func HourStateAt(ctx context.Context, timestamp time.Time) (HourSeqNum, *State, error) {
return DefaultDatasource.HourStateAt(ctx, timestamp)
}

// HourStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
func (ds *Datasource) HourStateAt(ctx context.Context, timestamp time.Time) (HourSeqNum, *State, error) {
s := &stater{
Min: minHour,
Current: func(ctx context.Context) (*State, error) {
_, s, err := ds.CurrentHourState(ctx)
return s, err
},
State: func(ctx context.Context, n uint64) (*State, error) {
return ds.HourState(ctx, HourSeqNum(n))
},
}
state, err := searchTimestamp(ctx, s, timestamp)
if err != nil {
return 0, nil, err
}

return HourSeqNum(state.SeqNum), state, nil
}

// DayStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
// Delegates to the DefaultDatasource and uses its http.Client to make the request.
func DayStateAt(ctx context.Context, timestamp time.Time) (DaySeqNum, *State, error) {
return DefaultDatasource.DayStateAt(ctx, timestamp)
}

// DayStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
func (ds *Datasource) DayStateAt(ctx context.Context, timestamp time.Time) (DaySeqNum, *State, error) {
s := &stater{
Min: minDay,
Current: func(ctx context.Context) (*State, error) {
_, s, err := ds.CurrentDayState(ctx)
return s, err
},
State: func(ctx context.Context, n uint64) (*State, error) {
return ds.DayState(ctx, DaySeqNum(n))
},
}
state, err := searchTimestamp(ctx, s, timestamp)
if err != nil {
return 0, nil, err
}

return DaySeqNum(state.SeqNum), state, nil
}

// ChangesetStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
// Delegates to the DefaultDatasource and uses its http.Client to make the request.
func ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum, *State, error) {
return DefaultDatasource.ChangesetStateAt(ctx, timestamp)
}

// ChangesetStateAt will return the replication state/sequence number that contains
// data for the given timestamp. This would be the first replication state written
// after the timestamp. If the timestamp is after all current replication state
// the most recent will be returned. The caller can check for this case using
// state.Before(givenTimestamp).
//
// This call can do 20+ requests to the binary search the replication states.
// Use sparingly or use a mirror.
func (ds *Datasource) ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum, *State, error) {
s := &stater{
Min: minDay,
Current: func(ctx context.Context) (*State, error) {
_, s, err := ds.CurrentChangesetState(ctx)
return s, err
},
State: func(ctx context.Context, n uint64) (*State, error) {
return ds.ChangesetState(ctx, ChangesetSeqNum(n))
},
}
state, err := searchTimestamp(ctx, s, timestamp)
if err != nil {
return 0, nil, err
}

return ChangesetSeqNum(state.SeqNum), state, nil
}

func searchTimestamp(ctx context.Context, s *stater, timestamp time.Time) (*State, error) {
// get the current timestamp from the server
upper, err := s.Current(ctx)
if NotFound(err) {
return nil, err // current state not found?
} else if err != nil {
return nil, err
}

if timestamp.After(upper.Timestamp) {
return upper, nil // given time is in the future or something
}

lower, err := s.State(ctx, s.Min)
if err != nil && !NotFound(err) {
return nil, err
}

if lower == nil {
// now we need to find a lower bound state manually.
// This can have edge cases if there are missing sequence numbers.
var err error
lower, upper, err = findBound(ctx, s, upper, timestamp)
if err != nil {
return nil, err
}
}

if lower.SeqNum+1 >= upper.SeqNum {
return lower, nil // edge case if there are only one or two sequence numbers
}

return findInRange(ctx, s, lower, upper, timestamp)
}

func findBound(ctx context.Context, s *stater, upper *State, timestamp time.Time) (*State, *State, error) {
var (
lowerID uint64 = 1
lower *State
err error
)

// we need to find the lower bound
for lower == nil {
lower, err = s.State(ctx, lowerID)

if err != nil && !NotFound(err) {
return nil, nil, err
}

if lower != nil && lower.Timestamp.After(timestamp) {
if lower.SeqNum+1 >= upper.SeqNum {
return lower, upper, nil // edge case if there are only two sequence numbers
}

// in our search for lower we found a new upper bound
upper = lower
lower = nil
lowerID = 1
}

if lower != nil {
break
}

// no lower yet, so try a higher id (binary search wise)
newID := (lowerID + upper.SeqNum) / 2
if newID <= lowerID {
// nothing suitable found, so upper is probably the best we can do
return upper, upper, nil
}
lowerID = newID
}

return lower, upper, nil
}

func findInRange(ctx context.Context, s *stater, lower, upper *State, timestamp time.Time) (*State, error) {
// we do a binary search through the range to find the sequence number
for lower.SeqNum+1 < upper.SeqNum {
// could do better here
splitID := (lower.SeqNum + upper.SeqNum) / 2

split, err := s.State(ctx, splitID)
if err != nil && !NotFound(err) {
return nil, err
}

if split == nil {
// file missing, search the next towards lower
sID := splitID - 1

for split == nil && lower.SeqNum < splitID {
split, err = s.State(ctx, sID)
if err != nil && !NotFound(err) {
return nil, err
}

sID--
}
}

if split == nil {
// still missing? search the next towards upper
sID := splitID + 1

for split == nil && splitID < upper.SeqNum {
split, err = s.State(ctx, sID)
if err != nil && !NotFound(err) {
return nil, err
}

sID++
}
}

if split == nil {
// still nothing
return lower, nil
}

// set the new boundary
if timestamp.After(split.Timestamp) {
lower = split
} else {
upper = split
}
}

// timestamp is now between lower and upper, we want to return the upper.
return upper, nil
}
Loading