Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH]: Add property test log service #1969

Merged
merged 22 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/chroma-coordinator-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: ariga/setup-atlas@v0
- name: Build and test
run: cd go && make test
env:
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

**/__pycache__

go/bin/
go/**/testdata/
go/coordinator/bin/

Expand Down
5 changes: 5 additions & 0 deletions go/bin/migrate_up_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
atlas schema apply \
-u "$1" \
--to file://database/log/schema/ \
--dev-url "docker://postgres/15/dev" \
--auto-approve
10 changes: 8 additions & 2 deletions go/database/log/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/database/log/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FOR UPDATE;
INSERT INTO record_log (collection_id, "offset", record) values($1, $2, $3);

-- name: GetRecordsForCollection :many
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset > $2 ORDER BY r.offset DESC limit $3 ;
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset >= $2 and r.timestamp <= $4 ORDER BY r.offset ASC limit $3 ;

-- name: GetAllCollectionsToCompact :many
with summary as (
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
return
}

func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int) (records []log.RecordLog, err error) {
func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int, timestamp int) (records []log.RecordLog, err error) {
records, err = r.queries.GetRecordsForCollection(ctx, log.GetRecordsForCollectionParams{
CollectionID: collectionId,
Offset: offset,
Limit: int32(batchSize),
Timestamp: int32(timestamp),
})
return
}
Expand Down
158 changes: 158 additions & 0 deletions go/pkg/log/server/property_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package server

import (
"context"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
"github.com/chroma-core/chroma/go/pkg/types"
libs2 "github.com/chroma-core/chroma/go/shared/libs"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"pgregory.net/rapid"
"testing"
"time"
)

type ModelState struct {
CollectionEnumerationOffset map[types.UniqueID]int64
CollectionData map[types.UniqueID][]*coordinatorpb.OperationRecord
CollectionCompactionOffset map[types.UniqueID]int64
}

type LogServerTestSuite struct {
suite.Suite
logServer logservicepb.LogServiceServer
model ModelState
t *testing.T
}

func (suite *LogServerTestSuite) SetupSuite() {
ctx := context.Background()
connectionString, err := libs2.StartPgContainer(ctx)
assert.NoError(suite.t, err, "Failed to start pg container")
var conn *pgx.Conn
conn, err = libs2.NewPgConnection(ctx, connectionString)
assert.NoError(suite.t, err, "Failed to create new pg connection")
err = libs2.RunMigration(ctx, connectionString)
assert.NoError(suite.t, err, "Failed to run migration")
lr := repository.NewLogRepository(conn)
suite.logServer = NewLogServer(lr)
suite.model = ModelState{
CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{},
CollectionCompactionOffset: map[types.UniqueID]int64{},
}
}

func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {
ctx := context.Background()
// Generate collection ids
collections := make([]types.UniqueID, 10)
for i := 0; i < len(collections); i++ {
collections[i] = types.NewUniqueID()
}

collectionGen := rapid.Custom(func(t *rapid.T) types.UniqueID {
return collections[rapid.IntRange(0, len(collections)-1).Draw(t, "collection_id")]
})
recordGen := rapid.SliceOf(rapid.Custom(func(t *rapid.T) *coordinatorpb.OperationRecord {
data := rapid.SliceOf(rapid.Byte()).Draw(t, "record_data")
id := rapid.String().Draw(t, "record_id")
return &coordinatorpb.OperationRecord{
Id: id,
Vector: &coordinatorpb.Vector{
Vector: data,
},
}
}))
rapid.Check(suite.t, func(t *rapid.T) {
t.Repeat(map[string]func(*rapid.T){
"pushLogs": func(t *rapid.T) {
c := collectionGen.Draw(t, "collection")
records := recordGen.Draw(t, "record")
r, err := suite.logServer.PushLogs(ctx, &logservicepb.PushLogsRequest{
CollectionId: c.String(),
Records: records,
})
if err != nil {
t.Fatal(err)
}
if int32(len(records)) != r.RecordCount {
t.Fatal("record count mismatch", len(records), r.RecordCount)
}
suite.model.CollectionData[c] = append(suite.model.CollectionData[c], records...)
},
"getAllCollectionsToCompact": func(t *rapid.T) {
result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more sanity check for getAllCollectionsToCompact? One check could be the checking the number of collections to compact matches the number of collections in the model.

assert.NoError(suite.t, err)
for _, collection := range result.AllCollectionInfo {
id, err := types.Parse(collection.CollectionId)
if err != nil {
t.Fatal(err)
}
compactionOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[id], int64(len(suite.model.CollectionData))).Draw(t, "new_position")
_, err = suite.logServer.UpdateCollectionLogOffset(ctx, &logservicepb.UpdateCollectionLogOffsetRequest{
CollectionId: id.String(),
LogOffset: compactionOffset,
})
if err != nil {
t.Fatal(err)
}
suite.model.CollectionCompactionOffset[id] = compactionOffset
}
},
"pullLogs": func(t *rapid.T) {
c := collectionGen.Draw(t, "collection")
startOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[c], int64(len(suite.model.CollectionData))).Draw(t, "start_offset")
// If start offset is 0, we need to set it to 1 as the offset is 1 based
if startOffset == 0 {
startOffset = 1
}
batchSize := rapid.Int32Range(1, 20).Draw(t, "batch_size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions:

  • What is the behavior if batchSize <= 1? Especially batchSize <= 0?
  • What is the reasoning behind using (1, 20) as the batchSize range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was just way to have random number, I don't know what it would be in production.

response, err := suite.logServer.PullLogs(ctx, &logservicepb.PullLogsRequest{
CollectionId: c.String(),
StartFromOffset: startOffset,
BatchSize: batchSize,
EndTimestamp: time.Now().Unix(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also generate the EndTimestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, once I have it deploy to staging, i would like to pass even more time on making the property test even better, like parallelism, more logs, use timestamp etc

})
if err != nil {
t.Fatal(err)
}
// Verify that record returned is matching the expected record
for _, record := range response.Records {
expectedRecord := suite.model.CollectionData[c][record.LogOffset-1]
if string(expectedRecord.Vector.Vector) != string(record.Record.Vector.Vector) {
t.Fatalf("expect record vector %s, got %s", string(expectedRecord.Vector.Vector), string(record.Record.Vector.Vector))
}
if expectedRecord.Id != record.Record.Id {
t.Fatalf("expect record id %s, got %s", expectedRecord.Id, record.Record.Id)
}
}

// Verify that the first and last record offset is correct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we verify the offset in the middle of the batch as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the number count and the first and last offset is correct, I expect the offset to be correct for all of them.

if len(response.Records) > 0 {
lastRecord := response.Records[len(response.Records)-1]
firstRecord := response.Records[0]
//
expectedLastOffset := startOffset + int64(batchSize) - 1
if expectedLastOffset > int64(len(suite.model.CollectionData[c])) {
expectedLastOffset = int64(len(suite.model.CollectionData[c]))
}
if lastRecord.LogOffset != expectedLastOffset {
t.Fatalf("expect last record %d, got %d", lastRecord.LogOffset, expectedLastOffset)
}
if firstRecord.LogOffset != startOffset {
t.Fatalf("expect first record %d, got %d", startOffset, firstRecord.LogOffset)
}
}
},
})
})
}

func TestLogServerTestSuite(t *testing.T) {
testSuite := new(LogServerTestSuite)
testSuite.t = t
suite.Run(t, testSuite)
}
2 changes: 1 addition & 1 deletion go/pkg/log/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *logServer) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequ
if err != nil {
return
}
records, err := s.lr.PullRecords(ctx, collectionID.String(), req.StartFromOffset, int(req.BatchSize))
records, err := s.lr.PullRecords(ctx, collectionID.String(), req.StartFromOffset, int(req.BatchSize), int(req.EndTimestamp))
if err != nil {
return
}
Expand Down
5 changes: 2 additions & 3 deletions go/shared/libs/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ func StartPgContainer(ctx context.Context) (connectionString string, err error)

func RunMigration(ctx context.Context, connectionString string) (err error) {
cmd := exec.Command("/bin/sh", "bin/migrate_up_test.sh", connectionString)

_, dir, _, _ := runtime.Caller(0)

cmd.Dir = path.Join(dir, "../../../")
byte, err := cmd.Output()
var byte []byte
byte, err = cmd.CombinedOutput()
fmt.Println(string(byte))
return
}
Loading