Skip to content

Commit

Permalink
Record stats inside indexing batch loop
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jun 10, 2024
1 parent a88111a commit 491961a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
6 changes: 3 additions & 3 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type":
type Stats struct {
Indexed int64 // total number of documents indexed
Deleted int64 // total number of documents deleted
Elapsed time.Duration // total time spent actually indexing
Elapsed time.Duration // total time spent actually indexing (excludes poll delay)
}

// Indexer is base interface for indexers
Expand Down Expand Up @@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger {
return slog.With("indexer", i.name)
}

// records a complete index and updates statistics
func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) {
// records indexing activity and updates statistics
func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) {
i.stats.Indexed += int64(indexed)
i.stats.Deleted += int64(deleted)
i.stats.Elapsed += elapsed
Expand Down
21 changes: 10 additions & 11 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)

// now index our docs
start := time.Now()
created, updated, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
if err != nil {
return "", fmt.Errorf("error indexing documents: %w", err)
}

i.recordComplete(created+updated, deleted, time.Since(start))

// if the index didn't previously exist or we are rebuilding, remap to our alias
if remapAlias {
err := i.updateAlias(physicalIndex)
Expand Down Expand Up @@ -153,7 +150,7 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM (
`

// IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, int, error) {
func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error {
totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0

var modifiedOn time.Time
Expand Down Expand Up @@ -193,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st

// no more rows? return
if err == sql.ErrNoRows {
return 0, 0, 0, nil
return nil

Check warning on line 193 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L193

Added line #L193 was not covered by tests
}
if err != nil {
return 0, 0, 0, err
return err

Check warning on line 196 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L196

Added line #L196 was not covered by tests
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON)
if err != nil {
return 0, 0, 0, err
return err

Check warning on line 203 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L203

Added line #L203 was not covered by tests
}

batchFetched++
Expand All @@ -226,14 +223,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
// write to elastic search in batches
if batchFetched%i.batchSize == 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, 0, err
return err

Check warning on line 226 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L226

Added line #L226 was not covered by tests
}
}
}

if subBatch.Len() > 0 {
if err := indexSubBatch(subBatch); err != nil {
return 0, 0, 0, err
return err

Check warning on line 233 in indexers/contacts.go

View check run for this annotation

Codecov / codecov/patch

indexers/contacts.go#L233

Added line #L233 was not covered by tests
}
}

Expand Down Expand Up @@ -268,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
log.Debug("indexed contact batch")
}

i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart))

// last modified stayed the same and we didn't add anything, seen it all, break out
if lastModified.Equal(queryModified) && batchCreated == 0 {
break
}
}

return totalCreated, totalUpdated, totalDeleted, nil
return nil
}

func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {
Expand Down

0 comments on commit 491961a

Please sign in to comment.