Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stats for schema size #5031

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions runner/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ var (
"event_delivery_time": {
60, 300, 900, 1800, 2100, 2700, 3900, 4500, 5400, 9900, 11100, 12600, 21600, 23400, 43200, 45000, 82800, 86400, 88200, // 1m, 5m, 15m, 30m, 35m, 45m, 1h5m, 1h15m, 1h30m, 2h45m, 3h5m, 3h30m, 6h, 6h30m, 12h, 12h30m, 23h, 24h, 24h30m
},
"warehouse_schema_size": {
float64(10 * bytesize.B), float64(100 * bytesize.B),
float64(1 * bytesize.KB), float64(10 * bytesize.KB), float64(100 * bytesize.KB),
float64(1 * bytesize.MB), float64(3 * bytesize.MB), float64(5 * bytesize.MB), float64(10 * bytesize.MB),
float64(25 * bytesize.MB), float64(50 * bytesize.MB), float64(100 * bytesize.MB), float64(1 * bytesize.GB),
},
}

customBuckets = map[string][]float64{
Expand Down
2 changes: 2 additions & 0 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/alerta"
Expand Down Expand Up @@ -170,6 +171,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
dto.Warehouse,
f.conf,
f.logger.Child("warehouse"),
f.statsFactory,
),

upload: dto.Upload,
Expand Down
29 changes: 27 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
"slices"
"sync"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

// deprecatedColumnsRegex
// This regex is used to identify deprecated columns in the warehouse
// Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840
Expand Down Expand Up @@ -54,15 +60,20 @@
schemaInWarehouseMu sync.RWMutex
unrecognizedSchemaInWarehouse model.Schema
unrecognizedSchemaInWarehouseMu sync.RWMutex

stats struct {
schemaSize stats.Histogram
}
}

func New(
db *sqlquerywrapper.DB,
warehouse model.Warehouse,
conf *config.Config,
logger logger.Logger,
statsFactory stats.Stats,
) *Schema {
return &Schema{
s := &Schema{
warehouse: warehouse,
schemaRepo: repo.NewWHSchemas(db),
stagingFileRepo: repo.NewStagingFiles(db),
Expand All @@ -71,6 +82,14 @@
skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false),
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
}
s.stats.schemaSize = statsFactory.NewTaggedStat("warehouse_schema_size", stats.HistogramType, stats.Tags{
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
"module": "warehouse",
"workspaceId": warehouse.WorkspaceID,
"destType": warehouse.Destination.DestinationDefinition.Name,
"sourceId": warehouse.Source.ID,
"destinationId": warehouse.Destination.ID,
})
return s
}

// ConsolidateStagingFilesUsingLocalSchema
Expand Down Expand Up @@ -247,7 +266,13 @@
// 1. Inserts the updated schema into the local schema table
// 2. Updates the local schema instance
func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error {
_, err := sh.schemaRepo.Insert(ctx, &model.WHSchema{
updatedSchemaInBytes, err := json.Marshal(updatedSchema)
if err != nil {
return fmt.Errorf("marshaling schema: %w", err)

Check warning on line 271 in warehouse/schema/schema.go

View check run for this annotation

Codecov / codecov/patch

warehouse/schema/schema.go#L271

Added line #L271 was not covered by tests
}
sh.stats.schemaSize.Observe(float64(len(updatedSchemaInBytes)))

_, err = sh.schemaRepo.Insert(ctx, &model.WHSchema{
UploadID: uploadId,
SourceID: sh.warehouse.Source.ID,
Namespace: sh.warehouse.Namespace,
Expand Down
40 changes: 39 additions & 1 deletion warehouse/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"

"github.com/samber/lo"
Expand Down Expand Up @@ -73,6 +76,7 @@ func (m *mockFetchSchemaRepo) FetchSchema(context.Context) (model.Schema, model.
}

func TestSchema_UpdateLocalSchema(t *testing.T) {
workspaceID := "test-workspace-id"
sourceID := "test_source_id"
destinationID := "test_destination_id"
namespace := "test_namespace"
Expand Down Expand Up @@ -151,8 +155,12 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
schemaMap: map[string]model.WHSchema{},
}

statsStore, err := memstats.New()
require.NoError(t, err)

s := Schema{
warehouse: model.Warehouse{
WorkspaceID: workspaceID,
Source: backendconfig.SourceT{
ID: sourceID,
},
Expand All @@ -165,10 +173,18 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
schemaRepo: mockRepo,
schemaInWarehouse: schemaInWarehouse,
}
tags := stats.Tags{
"module": "warehouse",
"workspaceId": s.warehouse.WorkspaceID,
"destType": s.warehouse.Destination.DestinationDefinition.Name,
"sourceId": s.warehouse.Source.ID,
"destinationId": s.warehouse.Destination.ID,
}
s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags)

ctx := context.Background()

err := s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema)
err = s.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema)
if tc.wantError == nil {
require.NoError(t, err)
require.Equal(t, tc.wantSchema, s.localSchema)
Expand All @@ -178,6 +194,9 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
require.Empty(t, s.localSchema)
require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
}
marshalledSchema, err := json.Marshal(tc.mockSchema.Schema)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())

err = s.UpdateLocalSchemaWithWarehouse(ctx, uploadID)
if tc.wantError == nil {
Expand All @@ -188,7 +207,11 @@ func TestSchema_UpdateLocalSchema(t *testing.T) {
require.Error(t, err, fmt.Sprintf("got error %v, want error %v", err, tc.wantError))
require.Empty(t, s.localSchema)
require.Empty(t, mockRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
require.EqualValues(t, float64(241), statsStore.Get("warehouse_schema_size", tags).LastValue())
}
marshalledSchema, err = json.Marshal(schemaInWarehouse)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())
})
}
}
Expand Down Expand Up @@ -1772,6 +1795,9 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
require.False(t, schemaChanged)
})
t.Run("schema changed", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)

testSchema := model.Schema{
tableName: model.TableSchema{
"test_int": "int",
Expand Down Expand Up @@ -1823,6 +1849,14 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
schemaRepo: mockSchemaRepo,
log: logger.NOP,
}
tags := stats.Tags{
"module": "warehouse",
"workspaceId": s.warehouse.WorkspaceID,
"destType": s.warehouse.Destination.DestinationDefinition.Name,
"sourceId": s.warehouse.Source.ID,
"destinationId": s.warehouse.Destination.ID,
}
s.stats.schemaSize = statsStore.NewTaggedStat("warehouse_schema_size", stats.HistogramType, tags)

mockFetchSchemaRepo := &mockFetchSchemaRepo{
err: nil,
Expand All @@ -1839,6 +1873,10 @@ func TestSchema_SyncRemoteSchema(t *testing.T) {
require.Equal(t, schemaInWarehouse, mockSchemaRepo.schemaMap[schemaKey(sourceID, destinationID, namespace)].Schema)
require.Equal(t, schemaInWarehouse, s.schemaInWarehouse)
require.Equal(t, schemaInWarehouse, s.unrecognizedSchemaInWarehouse)

marshalledSchema, err := json.Marshal(s.localSchema)
require.NoError(t, err)
require.EqualValues(t, float64(len(marshalledSchema)), statsStore.Get("warehouse_schema_size", tags).LastValue())
})
t.Run("schema not changed", func(t *testing.T) {
testSchema := model.Schema{
Expand Down
Loading