Skip to content

Commit

Permalink
chore: some more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 28, 2024
1 parent ffee7ae commit 0ccab91
Show file tree
Hide file tree
Showing 22 changed files with 1,145 additions and 993 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
go-version-file: 'go.mod'
- run: go version
- run: go mod download # Not required, used to segregate module download vs test times
- run: make test exclude="/rudder-server/(jobsdb|integration_test|processor|regulation-worker|router|services|suppression-backup-service|warehouse)"
- run: FORCE_RUN_INTEGRATION_TESTS=true make test exclude="/rudder-server/(jobsdb|integration_test|processor|regulation-worker|router|services|suppression-backup-service|warehouse)"
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
Expand Down
1,887 changes: 947 additions & 940 deletions integration_test/snowpipestreaming/snowpipestreaming_test.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
rudder-snowpipe-clients:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/rudder-snowpipe-clients:develop"
image: "rudderstack/rudder-snowpipe-clients:develop"
ports:
- "9078"
healthcheck:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
transformer:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/develop-rudder-transformer:latest"
image: "rudderstack/develop-rudder-transformer:latest"
ports:
- "9090:9090"
healthcheck:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package snowpipestreaming
import (
"context"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
)

type apiAdapter struct {
logger logger.Logger

stats struct {
createChannelCount stats.Counter
deleteChannelCount stats.Counter
Expand All @@ -24,9 +27,16 @@ type apiAdapter struct {
api
}

func newApiAdapter(api api, statsFactory stats.Stats, destination *backendconfig.DestinationT) *apiAdapter {
adapter := &apiAdapter{}
adapter.api = api
func newApiAdapter(
logger logger.Logger,
statsFactory stats.Stats,
api api,
destination *backendconfig.DestinationT,
) api {
adapter := &apiAdapter{
logger: logger,
api: api,
}

tags := stats.Tags{
"module": "batch_router",
Expand All @@ -47,12 +57,23 @@ func newApiAdapter(api api, statsFactory stats.Stats, destination *backendconfig
}

func (a *apiAdapter) CreateChannel(ctx context.Context, req *model.CreateChannelRequest) (*model.ChannelResponse, error) {
a.logger.Infon("Creating channel",
logger.NewStringField("rudderIdentifier", req.RudderIdentifier),
logger.NewStringField("partition", req.Partition),
logger.NewStringField("database", req.TableConfig.Database),
logger.NewStringField("namespace", req.TableConfig.Schema),
logger.NewStringField("table", req.TableConfig.Table),
)
defer a.stats.createChannelCount.Increment()
defer a.stats.createChannelResponseTime.RecordDuration()()
return a.api.CreateChannel(ctx, req)
}

func (a *apiAdapter) DeleteChannel(ctx context.Context, channelID string, sync bool) error {
a.logger.Infon("Deleting channel",
logger.NewStringField("channelId", channelID),
logger.NewBoolField(", sync", sync),
)
defer a.stats.deleteChannelCount.Increment()
defer a.stats.deleteChannelResponseTime.RecordDuration()()
return a.api.DeleteChannel(ctx, channelID, sync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Manager) createChannel(
m.channelCache.Store(tableName, resp)
return resp, nil
default:
return nil, fmt.Errorf("creating channel: %v", err)
return nil, fmt.Errorf("creating channel with code %s, message: %s and error: %s", resp.Code, resp.SnowflakeAPIMessage, resp.Error)
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func (m *Manager) recreateChannel(
return channelResponse, nil
}

func (m *Manager) deleteChannel(ctx context.Context, tableName string, channelID string) error {
func (m *Manager) deleteChannel(ctx context.Context, tableName, channelID string) error {
m.channelCache.Delete(tableName)
if err := m.api.DeleteChannel(ctx, channelID, true); err != nil {
return fmt.Errorf("deleting channel: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"strconv"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/samber/lo"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestAPI(t *testing.T) {

ctx := context.Background()

namespace := testhelper.RandSchema(whutils.SNOWFLAKE)
namespace := testhelper.RandSchema()
table := "TEST_TABLE"
tableSchema := whutils.ModelTableSchema{
"ID": "string", "NAME": "string", "EMAIL": "string", "AGE": "int", "ACTIVE": "boolean", "DOB": "datetime",
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestAPI(t *testing.T) {

ctx := context.Background()

namespace := testhelper.RandSchema(whutils.SNOWFLAKE)
namespace := testhelper.RandSchema()
table := "TEST_TABLE"
tableSchema := whutils.ModelTableSchema{
"ID": "string", "NAME": "string", "EMAIL": "string", "AGE": "int", "ACTIVE": "boolean", "DOB": "datetime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func TestInsert(t *testing.T) {
RowIndex: 1,
ExtraColNames: []string{"UNKNOWN"},
NullValueForNotNullColNames: nil,
Message: "The given row cannot be converted to the internal format: Extra columns: [UNKNOWN]. Columns not present in the table shouldn't be specified, rowIndex:1"},
Message: "The given row cannot be converted to the internal format: Extra columns: [UNKNOWN]. Columns not present in the table shouldn't be specified, rowIndex:1",
},
},
Code: "ERR_SCHEMA_CONFLICT",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
reType = regexp.MustCompile(`(.+?)\([^)]*\)`)
)
var reType = regexp.MustCompile(`(.+?)\([^)]*\)`)

type (
CreateChannelRequest struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"net/http"
"time"

"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/stringify"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/logger"

Expand Down Expand Up @@ -47,13 +49,24 @@ func (m *Manager) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
logger.NewStringField("table", info.Table),
obskit.Error(err),
)
if deleteErr := m.deleteChannel(ctx, info.Table, info.ChannelID); deleteErr != nil {
m.logger.Warnn("Failed to delete channel",
logger.NewStringField("channelId", info.ChannelID),
logger.NewStringField("offset", info.Offset),
logger.NewStringField("table", info.Table),
obskit.Error(deleteErr),
)
}
}
return nil
})
}
_ = g.Wait()

if err := g.Wait(); err != nil {
failedExists := lo.ContainsBy(uploadInfos, func(item uploadInfo) bool {
return item.Failed
})
if failedExists {
return common.PollStatusResponse{
InProgress: false,
StatusCode: http.StatusOK,
Expand All @@ -62,7 +75,6 @@ func (m *Manager) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
FailedJobURLs: stringify.Any(uploadInfos),
}
}

return common.PollStatusResponse{
InProgress: false,
StatusCode: http.StatusOK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func New(
}

m.api = newApiAdapter(
snowpipeapi.New(m.config.clientURL, m.requestDoer),
m.logger,
statsFactory,
snowpipeapi.New(m.config.clientURL, m.requestDoer),
destination,
)
return m
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package snowpipestreaming

import (
"testing"

"github.com/stretchr/testify/assert"

whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func TestFindNewColumns(t *testing.T) {
tests := []struct {
name string
eventSchema whutils.ModelTableSchema
snowPipeSchema whutils.ModelTableSchema
expected []whutils.ColumnInfo
}{
{
name: "new column with different data type in event schema",
eventSchema: whutils.ModelTableSchema{
"new_column": "STRING",
"existing_column": "FLOAT",
},
snowPipeSchema: whutils.ModelTableSchema{
"existing_column": "INT",
},
expected: []whutils.ColumnInfo{
{Name: "new_column", Type: "STRING"},
},
},
{
name: "new and existing columns with multiple data types",
eventSchema: whutils.ModelTableSchema{
"new_column1": "STRING",
"new_column2": "BOOLEAN",
"existing_column": "INT",
},
snowPipeSchema: whutils.ModelTableSchema{
"existing_column": "INT",
"another_existing_column": "FLOAT",
},
expected: []whutils.ColumnInfo{
{Name: "new_column1", Type: "STRING"},
{Name: "new_column2", Type: "BOOLEAN"},
},
},
{
name: "all columns in event schema are new",
eventSchema: whutils.ModelTableSchema{
"new_column1": "STRING",
"new_column2": "BOOLEAN",
"new_column3": "FLOAT",
},
snowPipeSchema: whutils.ModelTableSchema{},
expected: []whutils.ColumnInfo{
{Name: "new_column1", Type: "STRING"},
{Name: "new_column2", Type: "BOOLEAN"},
{Name: "new_column3", Type: "FLOAT"},
},
},
{
name: "case sensitivity check",
eventSchema: whutils.ModelTableSchema{
"ColumnA": "STRING",
"columna": "BOOLEAN",
},
snowPipeSchema: whutils.ModelTableSchema{
"columna": "BOOLEAN",
},
expected: []whutils.ColumnInfo{
{Name: "ColumnA", Type: "STRING"},
},
},
{
name: "all columns match with identical types",
eventSchema: whutils.ModelTableSchema{
"existing_column1": "STRING",
"existing_column2": "FLOAT",
},
snowPipeSchema: whutils.ModelTableSchema{
"existing_column1": "STRING",
"existing_column2": "FLOAT",
},
expected: []whutils.ColumnInfo{},
},
{
name: "event schema is empty, SnowPipe schema has columns",
eventSchema: whutils.ModelTableSchema{},
snowPipeSchema: whutils.ModelTableSchema{
"existing_column": "STRING",
},
expected: []whutils.ColumnInfo{},
},
{
name: "SnowPipe schema is nil",
eventSchema: whutils.ModelTableSchema{
"new_column": "STRING",
},
snowPipeSchema: nil,
expected: []whutils.ColumnInfo{
{Name: "new_column", Type: "STRING"},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := findNewColumns(tt.eventSchema, tt.snowPipeSchema)
assert.ElementsMatch(t, tt.expected, result)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.9"

services:
rudder-snowpipe-clients:
image: "hub.dev-rudder.rudderlabs.com/dockerhub-proxy/rudderstack/rudder-snowpipe-clients:develop"
image: "rudderstack/rudder-snowpipe-clients:develop"
ports:
- "9078"
healthcheck:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/stretchr/testify/require"

whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
)

const (
Expand Down Expand Up @@ -45,15 +44,13 @@ func GetSnowPipeTestCredentials(key string) (*TestCredentials, error) {
return &credentials, nil
}

func RandSchema(provider string) string {
func RandSchema() string {
hex := strings.ToLower(rand.String(12))
namespace := fmt.Sprintf("test_%s_%d", hex, time.Now().Unix())
return whutils.ToProviderCase(provider, whutils.ToSafeNamespace(provider,
namespace,
))
return strings.ToUpper(namespace)
}

func DropSchema(t *testing.T, db *sql.DB, namespace string) {
func DropSchema(t testing.TB, db *sql.DB, namespace string) {
t.Helper()
t.Log("dropping schema", namespace)

Expand Down
Loading

0 comments on commit 0ccab91

Please sign in to comment.