Skip to content

Commit

Permalink
Merge pull request #26 from databendcloud/fix/lastoffset-and-retry
Browse files Browse the repository at this point in the history
fix: update lastOffset and add retry for ingestData
  • Loading branch information
hantmac committed Jun 18, 2024
2 parents ddffe11 + fb7417a commit c95b642
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 11 deletions.
2 changes: 1 addition & 1 deletion batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ _loop:
if firstMessageOffset == 0 {
firstMessageOffset = m.Offset
}
lastMessageOffset = m.Offset

data := string(m.Value)
data = strings.ReplaceAll(data, "\t", "")
Expand Down Expand Up @@ -167,7 +168,6 @@ _loop:
l.Errorf("Failed to commit message at partition %d, offset %d: %v", partition, ms.Offset, err)
return err
}
lastMessageOffset = ms.Offset
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion config/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
"minBytes": 1024,
"maxBytes": 1048576,
"maxWait": 10,
"useReplaceMode": false
"useReplaceMode": false,
"userStage": "~"
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Config struct {

// UseReplaceMode determines whether to use the REPLACE INTO statement to insert data.
// replace into will upsert data
UseReplaceMode bool `json:"useReplaceMode" default:"false"`
UseReplaceMode bool `json:"useReplaceMode" default:"false"`
UserStage string `json:"userStage" default:"~"`
}

func LoadConfig() (*Config, error) {
Expand Down
28 changes: 26 additions & 2 deletions consume_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime/debug"
"time"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"

"github.com/databendcloud/bend-ingest-kafka/config"
Expand Down Expand Up @@ -56,9 +57,13 @@ func (c *ConsumeWorker) stepBatch() error {
return err
}
} else {
if err := c.ig.IngestData(batch); err != nil {
err := DoRetry(
func() error {
return c.ig.IngestData(batch)
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to ingest data between %d-%d into Databend: %v\n", batch.FirstMessageOffset, batch.LastMessageOffset, err)
l.Errorf("Failed to ingest data between %d-%d into Databend: %v", batch.FirstMessageOffset, batch.LastMessageOffset, err)
l.Errorf("Failed to ingest data between %d-%d into Databend: %v after retry 5 attempts\n", batch.FirstMessageOffset, batch.LastMessageOffset, err)
return err
}
}
Expand Down Expand Up @@ -98,3 +103,22 @@ func (c *ConsumeWorker) Run(ctx context.Context) {
}
}
}

func DoRetry(f retry.RetryableFunc) error {
delay := time.Second
var attempts uint = 5
return retry.Do(
func() error {
return f()
},
retry.RetryIf(func(err error) bool {
if err != nil {
return true
}
return false
}),
retry.Delay(delay),
retry.Attempts(attempts),
retry.DelayType(retry.BackOffDelay),
)
}
2 changes: 2 additions & 0 deletions consume_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func TestConsumeKafka(t *testing.T) {
DataFormat: "json",
BatchMaxInterval: 10,
DisableVariantCheck: false,
UserStage: "~",
}
ig := NewDatabendIngester(cfg)
w := NewConsumeWorker(cfg, "worker1", ig)
Expand Down Expand Up @@ -182,6 +183,7 @@ func TestConsumerWithoutTransform(t *testing.T) {
DataFormat: "json",
BatchMaxInterval: 10,
DisableVariantCheck: true,
UserStage: "~",
}
ig := NewDatabendIngester(cfg)
if !cfg.IsJsonTransform {
Expand Down
12 changes: 6 additions & 6 deletions ingest_databend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (ig *databendIngester) IngestParquetData(messageBatch *message.MessagesBatc
}

func (ig *databendIngester) IngestData(messageBatch *message.MessagesBatch) error {
l := logrus.WithFields(logrus.Fields{"ingest_databend": "IngestData"})
l := logrus.WithFields(logrus.Fields{"ingest_databend": "IngestData", "lastOffset": messageBatch.LastMessageOffset})
startTime := time.Now()
if messageBatch == nil {
return nil
Expand All @@ -162,26 +162,26 @@ func (ig *databendIngester) IngestData(messageBatch *message.MessagesBatch) erro
var err error
batchJsonData, err = ig.reWriteTheJsonData(messageBatch)
if err != nil {
l.Errorf("re-write the json data failed: %v", err)
l.Errorf("re-write the json data failed: %v, lastOffset is: %d\n", err, messageBatch.LastMessageOffset)
return err
}
}

fileName, bytesSize, err := ig.generateNDJsonFile(batchJsonData)
if err != nil {
l.Errorf("generate NDJson file failed: %v", err)
l.Errorf("generate NDJson file failed: %v,lastOffset is %d\n", err, messageBatch.LastMessageOffset)
return err
}

stage, err := ig.uploadToStage(fileName)
if err != nil {
l.Errorf("upload to stage failed: %v", err)
l.Errorf("upload to stage failed: %v, lastOffset is: %d\n", err, messageBatch.LastMessageOffset)
return err
}

err = ig.copyInto(stage)
if err != nil {
l.Errorf("copy into failed: %v", err)
l.Errorf("copy into failed: %v, lastOffset is: %d\n", err, messageBatch.LastMessageOffset)
return err
}
ig.statsRecorder.RecordMetric(bytesSize, len(batchJsonData))
Expand Down Expand Up @@ -297,7 +297,7 @@ func (ig *databendIngester) uploadToStage(fileName string) (*godatabend.StageLoc
defer f.Close()
input := bufio.NewReader(f)
stage := &godatabend.StageLocation{
Name: "~",
Name: ig.databendIngesterCfg.UserStage,
Path: fmt.Sprintf("batch/%d-%s", time.Now().Unix(), filepath.Base(fileName)),
}

Expand Down
3 changes: 3 additions & 0 deletions ingest_databend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestIngestDataIsJsonTransform(t *testing.T) {
DatabendTable: "test_ingest",
BatchSize: 10,
BatchMaxInterval: 10,
UserStage: "~",
}
db, err := sql.Open("databend", cfg.DatabendDSN)
assert.NoError(t, err)
Expand Down Expand Up @@ -102,6 +103,7 @@ func TestIngestDataWithoutJsonTransform(t *testing.T) {
DatabendTable: "default.test_ingest_without",
BatchSize: 10,
BatchMaxInterval: 10,
UserStage: "~",
}
db, err := sql.Open("databend", cfg.DatabendDSN)
assert.NoError(t, err)
Expand Down Expand Up @@ -157,6 +159,7 @@ func TestIngestWithReplaceMode(t *testing.T) {
BatchSize: 10,
BatchMaxInterval: 10,
UseReplaceMode: true,
UserStage: "~",
}
db, err := sql.Open("databend", cfg.DatabendDSN)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func parseConfig() *config.Config {
flag.IntVar(&cfg.MaxBytes, "max-bytes", 20*1024*1024, "max bytes")
flag.IntVar(&cfg.MaxWait, "max-wait", 10, "max wait")
flag.BoolVar(&cfg.UseReplaceMode, "use-replace-mode", false, "use replace into mode")
flag.StringVar(&cfg.UserStage, "user-stage", "~", "user stage")

flag.Parse()
validateConfig(&cfg)
Expand Down

0 comments on commit c95b642

Please sign in to comment.