Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Dec 3, 2024
1 parent 44353b1 commit 8874345
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 33 deletions.
39 changes: 8 additions & 31 deletions integration_test/snowpipestreaming/snowpipestreaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,7 @@ func TestSnowPipeStreaming(t *testing.T) {
t.Log("Sending 5 events again")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "aborted", 10)

t.Log("Sending 5 events again, should succeeded")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "failed", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 20)

schema := whth.RetrieveRecordsFromWarehouse(t, sm.DB.DB, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
Expand Down Expand Up @@ -789,13 +785,8 @@ func TestSnowPipeStreaming(t *testing.T) {
t.Log("Sending 5 events again")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "aborted", 5)

t.Log("Sending 5 events again, should succeeded")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 25)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "failed", 5)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 20)

schema := whth.RetrieveRecordsFromWarehouse(t, sm.DB.DB, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
require.Equal(t, map[string]map[string]string{
Expand Down Expand Up @@ -866,13 +857,8 @@ func TestSnowPipeStreaming(t *testing.T) {
t.Log("Sending 5 events again")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "aborted", 5)

t.Log("Sending 5 events again, should succeeded")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 25)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "failed", 5)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 20)

schema := whth.RetrieveRecordsFromWarehouse(t, sm.DB.DB, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
require.Equal(t, map[string]map[string]string{
Expand Down Expand Up @@ -962,11 +948,7 @@ func TestSnowPipeStreaming(t *testing.T) {
t.Log("Sending 5 events again")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "aborted", 10)

t.Log("Sending 5 events again, should succeeded")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "failed", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 20)

schema := whth.RetrieveRecordsFromWarehouse(t, sm.DB.DB, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
Expand Down Expand Up @@ -1045,13 +1027,8 @@ func TestSnowPipeStreaming(t *testing.T) {
t.Log("Sending 5 events again")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 10)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "aborted", 5)

t.Log("Sending 5 events again, should succeeded")
require.NoError(t, sendEvents(5, eventFormat, "writekey1", url))
requireGatewayJobsCount(t, ctx, postgresContainer.DB, "succeeded", 15)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 25)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "failed", 5)
requireBatchRouterJobsCount(t, ctx, postgresContainer.DB, "succeeded", 20)

schema := whth.RetrieveRecordsFromWarehouse(t, sm.DB.DB, fmt.Sprintf(`SELECT table_name, column_name, data_type FROM INFORMATION_SCHEMA.COLUMNS WHERE table_schema = '%s';`, namespace))
require.Equal(t, map[string]map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func mustRead(r io.Reader) []byte {
return data
}

// CreateChannel creates a new channel with the given request.
func (a *API) CreateChannel(ctx context.Context, channelReq *model.CreateChannelRequest) (*model.ChannelResponse, error) {
reqJSON, err := json.Marshal(channelReq)
if err != nil {
Expand Down Expand Up @@ -72,6 +73,9 @@ func (a *API) CreateChannel(ctx context.Context, channelReq *model.CreateChannel
return &res, nil
}

// DeleteChannel deletes the channel with the given ID.
// If sync is true, the server waits for the flushing of all records in the channel, then do the soft delete.
// If sync is false, the server do the soft delete immediately and we need to wait for the flushing of all records.
func (a *API) DeleteChannel(ctx context.Context, channelID string, sync bool) error {
deleteChannelURL := a.clientURL + "/channels/" + channelID
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, deleteChannelURL, nil)
Expand All @@ -98,6 +102,7 @@ func (a *API) DeleteChannel(ctx context.Context, channelID string, sync bool) er
}
}

// GetChannel retrieves the channel with the given ID.
func (a *API) GetChannel(ctx context.Context, channelID string) (*model.ChannelResponse, error) {
getChannelURL := a.clientURL + "/channels/" + channelID
req, err := http.NewRequestWithContext(ctx, http.MethodGet, getChannelURL, nil)
Expand All @@ -123,6 +128,7 @@ func (a *API) GetChannel(ctx context.Context, channelID string) (*model.ChannelR
return &res, nil
}

// Insert inserts the given rows into the channel with the given ID.
func (a *API) Insert(ctx context.Context, channelID string, insertRequest *model.InsertRequest) (*model.InsertResponse, error) {
reqJSON, err := json.Marshal(insertRequest)
if err != nil {
Expand Down Expand Up @@ -153,6 +159,7 @@ func (a *API) Insert(ctx context.Context, channelID string, insertRequest *model
return &res, nil
}

// GetStatus retrieves the status of the channel with the given ID.
func (a *API) GetStatus(ctx context.Context, channelID string) (*model.StatusResponse, error) {
statusURL := a.clientURL + "/channels/" + channelID + "/status"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, statusURL, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (m *Manager) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
}
if len(failedInfos) > 0 {
statusResponse.HasFailed = true
statusResponse.FailedJobURLs = stringify.Any(infos)
statusResponse.FailedJobParameters = stringify.Any(infos)
} else {
statusResponse.HasFailed = false
statusResponse.HasWarning = false
Expand Down Expand Up @@ -476,7 +476,7 @@ func (m *Manager) GetUploadStats(input common.GetUploadStatsInput) common.GetUpl
m.logger.Infon("Getting import stats for snowpipe streaming destination")

var infos []*importInfo
err := json.Unmarshal([]byte(input.FailedJobURLs), &infos)
err := json.Unmarshal([]byte(input.FailedJobParameters), &infos)
if err != nil {
return common.GetUploadStatsResponse{
StatusCode: http.StatusBadRequest,
Expand Down

0 comments on commit 8874345

Please sign in to comment.