Skip to content

Commit

Permalink
fix property test
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasgere committed Apr 3, 2024
1 parent 8aa7064 commit b70396e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
2 changes: 1 addition & 1 deletion go/database/log/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/database/log/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FOR UPDATE;
INSERT INTO record_log (collection_id, "offset", record) values($1, $2, $3);

-- name: GetRecordsForCollection :many
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset > $2 ORDER BY r.offset DESC limit $3 ;
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset >= $2 ORDER BY r.offset ASC limit $3 ;

-- name: GetAllCollectionsToCompact :many
with summary as (
Expand Down
100 changes: 61 additions & 39 deletions go/pkg/log/server/property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

type ModelState struct {
CollectionEnumerationOffset map[types.UniqueID]int64
CollectionData map[types.UniqueID][][]byte
CollectionData map[types.UniqueID][]*coordinatorpb.OperationRecord
CollectionCompactionOffset map[types.UniqueID]int64
}

Expand All @@ -40,89 +40,111 @@ func (suite *LogServerTestSuite) SetupSuite() {
lr := repository.NewLogRepository(conn)
suite.logServer = NewLogServer(lr)
suite.model = ModelState{
CollectionData: map[types.UniqueID][][]byte{},
CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{},
CollectionCompactionOffset: map[types.UniqueID]int64{},
}
}

func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {

ctx := context.Background()
// Generate collection ids
collections := make([]types.UniqueID, 10)
for i := 0; i < len(collections); i++ {
collections[i] = types.NewUniqueID()
}

logsGen := rapid.SliceOf(rapid.SliceOf(rapid.Byte()))

gen := rapid.Custom(func(t *rapid.T) types.UniqueID {
return collections[rapid.IntRange(0, len(collections)-1).Draw(t, "collectionId")]
collectionGen := rapid.Custom(func(t *rapid.T) types.UniqueID {
return collections[rapid.IntRange(0, len(collections)-1).Draw(t, "collection_id")]
})

recordGen := rapid.SliceOf(rapid.Custom(func(t *rapid.T) *coordinatorpb.OperationRecord {
data := rapid.SliceOf(rapid.Byte()).Draw(t, "record_data")
id := rapid.String().Draw(t, "record_id")
return &coordinatorpb.OperationRecord{
Id: id,
Vector: &coordinatorpb.Vector{
Vector: data,
},
}
}))
rapid.Check(suite.t, func(t *rapid.T) {
t.Repeat(map[string]func(*rapid.T){
"pushLogs": func(t *rapid.T) {

c := gen.Draw(t, "collectionPosition")
data := logsGen.Draw(t, "logs")
logs := make([]*coordinatorpb.OperationRecord, len(data))
for i, record := range data {
logs[i] = &coordinatorpb.OperationRecord{
Vector: &coordinatorpb.Vector{
Vector: record,
},
}
}
r, err := suite.logServer.PushLogs(context.Background(), &logservicepb.PushLogsRequest{
c := collectionGen.Draw(t, "collection")
records := recordGen.Draw(t, "record")
r, err := suite.logServer.PushLogs(ctx, &logservicepb.PushLogsRequest{
CollectionId: c.String(),
Records: logs,
Records: records,
})
if err != nil {
t.Fatal(err)
}
if int32(len(data)) != r.RecordCount {
t.Fatal("record count mismatch", len(data), r.RecordCount)
if int32(len(records)) != r.RecordCount {
t.Fatal("record count mismatch", len(records), r.RecordCount)
}
suite.model.CollectionData[c] = append(suite.model.CollectionData[c], data...)
suite.model.CollectionData[c] = append(suite.model.CollectionData[c], records...)
},
"getAllCollectionsToCompact": func(t *rapid.T) {
result, err := suite.logServer.GetAllCollectionInfoToCompact(context.Background(), &logservicepb.GetAllCollectionInfoToCompactRequest{})
result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{})
assert.NoError(suite.t, err)
for _, collection := range result.AllCollectionInfo {
id, err := types.Parse(collection.CollectionId)
if err != nil {
t.Fatal(err)
}
newCompactationIndex := rapid.Int64Range(suite.model.CollectionCompactionOffset[id], int64(len(suite.model.CollectionData))).Draw(t, "new_position")
_, err = suite.logServer.UpdateCollectionLogOffset(context.Background(), &logservicepb.UpdateCollectionLogOffsetRequest{
compactionOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[id], int64(len(suite.model.CollectionData))).Draw(t, "new_position")
_, err = suite.logServer.UpdateCollectionLogOffset(ctx, &logservicepb.UpdateCollectionLogOffsetRequest{
CollectionId: id.String(),
LogId: newCompactationIndex,
LogOffset: compactionOffset,
})
if err != nil {
t.Fatal(err)
}
suite.model.CollectionCompactionOffset[id] = newCompactationIndex
suite.model.CollectionCompactionOffset[id] = compactionOffset
}
},
"pullLogs": func(t *rapid.T) {
c := gen.Draw(t, "collectionPosition")
index := rapid.Int64Range(suite.model.CollectionCompactionOffset[c], int64(len(suite.model.CollectionData))).Draw(t, "id")
response, err := suite.logServer.PullLogs(context.Background(), &logservicepb.PullLogsRequest{
c := collectionGen.Draw(t, "collection")
startOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[c], int64(len(suite.model.CollectionData))).Draw(t, "start_offset")
// If start offset is 0, we need to set it to 1 as the offset is 1 based
if startOffset == 0 {
startOffset = 1
}
batchSize := rapid.Int32Range(1, 20).Draw(t, "batch_size")
response, err := suite.logServer.PullLogs(ctx, &logservicepb.PullLogsRequest{
CollectionId: c.String(),
StartFromOffset: index,
BatchSize: 10,
StartFromOffset: startOffset,
BatchSize: batchSize,
})
if err != nil {
t.Fatal(err)
}
for _, log := range response.Records {
expect := string(suite.model.CollectionData[c][log.LogOffset-1])
result := string(log.Record.Vector.Vector)
if expect != result {
t.Fatalf("expect %s, got %s", expect, result)
// Verify that record returned is matching the expected record
for _, record := range response.Records {
expectedRecord := suite.model.CollectionData[c][record.LogOffset-1]
if string(expectedRecord.Vector.Vector) != string(record.Record.Vector.Vector) {
t.Fatalf("expect record vector %s, got %s", string(expectedRecord.Vector.Vector), string(record.Record.Vector.Vector))
}
if expectedRecord.Id != record.Record.Id {
t.Fatalf("expect record id %s, got %s", expectedRecord.Id, record.Record.Id)
}
}

// Verify that the first and last record offset is correct
if len(response.Records) > 0 {
lastRecord := response.Records[len(response.Records)-1]
firstRecord := response.Records[0]
//
expectedLastOffset := startOffset + int64(batchSize) - 1
if expectedLastOffset > int64(len(suite.model.CollectionData[c])) {
expectedLastOffset = int64(len(suite.model.CollectionData[c]))
}
if lastRecord.LogOffset != expectedLastOffset {
t.Fatalf("expect last record %d, got %d", lastRecord.LogOffset, expectedLastOffset)
}
if firstRecord.LogOffset != startOffset {
t.Fatalf("expect first record %d, got %d", startOffset, firstRecord.LogOffset)
}
}
},
})
})
Expand Down

0 comments on commit b70396e

Please sign in to comment.