Skip to content

Commit

Permalink
chore: some more addition of test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Dec 4, 2024
1 parent 27bc956 commit 01bd7f9
Show file tree
Hide file tree
Showing 5 changed files with 1,062 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"context"
stdjson "encoding/json"
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -38,11 +37,6 @@ import (

var json = jsoniter.ConfigCompatibleWithStandardLibrary

var (
errInvalidStatusResponse = errors.New("invalid status response")
errDestinationConfigNil = errors.New("destination config is nil")
)

func New(
conf *config.Config,
logger logger.Logger,
Expand Down Expand Up @@ -185,8 +179,9 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
})

var (
importingJobIDs, failedJobIDs []int64
importInfos, discardImportInfos []*importInfo
importingJobIDs, failedJobIDs []int64
importInfos []*importInfo
discardImportInfo *importInfo
)
for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
Expand All @@ -203,12 +198,15 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
importingJobIDs = append(importingJobIDs, info.jobIDs...)
importInfos = append(importInfos, imInfo)

if discardImInfo != nil {
discardImportInfos = append(discardImportInfos, discardImInfo)
if discardImInfo != nil && discardImportInfo == nil {
discardImportInfo = discardImInfo
} else if discardImInfo != nil {
discardImportInfo.Count += discardImInfo.Count
discardImportInfo.Offset = discardImInfo.Offset
}
}
if len(discardImportInfos) > 0 {
importInfos = append(importInfos, discardImportInfos[len(discardImportInfos)-1])
if discardImportInfo != nil {
importInfos = append(importInfos, discardImportInfo)
}

var importParameters stdjson.RawMessage
Expand All @@ -218,12 +216,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
return m.abortJobs(asyncDest, fmt.Errorf("failed to marshal import id: %w", err).Error())
}

importParameters, err = json.Marshal(common.ImportParameters{
ImportId: string(importIDBytes),
})
if err != nil {
return m.abortJobs(asyncDest, fmt.Errorf("failed to marshal import parameters: %w", err).Error())
}
importParameters = stdjson.RawMessage(`{"importId":` + string(importIDBytes) + `}`)
}
m.logger.Infon("Uploaded data to snowpipe streaming destination")

Expand Down Expand Up @@ -296,9 +289,10 @@ func (m *Manager) sendEventsToSnowpipe(
log.Infon("Prepared channel", logger.NewStringField("channelID", channelResponse.ChannelID))

formattedTS := m.now().Format(misc.RFC3339Milli)
discardInfos := lo.FlatMap(info.events, func(tableEvent *event, _ int) []discardInfo {
return getDiscardedRecordsFromEvent(tableEvent, channelResponse.SnowpipeSchema, info.tableName, formattedTS)
})
var discardInfos []discardInfo
for _, tableEvent := range info.events {
discardInfos = append(discardInfos, getDiscardedRecordsFromEvent(tableEvent, channelResponse.SnowpipeSchema, info.tableName, formattedTS)...)
}

insertReq := &model.InsertRequest{
Rows: lo.Map(info.events, func(event *event, _ int) model.Row {
Expand Down Expand Up @@ -467,7 +461,7 @@ func (m *Manager) pollForImportInfo(ctx context.Context, info *importInfo) (bool
logger.NewBoolField("completed", statusRes.Offset == info.Offset),
)
if !statusRes.Valid || !statusRes.Success {
return false, errInvalidStatusResponse
return false, fmt.Errorf("invalid status response with valid: %t, success: %t", statusRes.Valid, statusRes.Success)
}
return statusRes.Offset != info.Offset, nil
}
Expand Down
Loading

0 comments on commit 01bd7f9

Please sign in to comment.