Skip to content

Commit

Permalink
specgen: re-enable unit tests (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Dec 5, 2024
1 parent b698375 commit fc5ffee
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 700 deletions.
14 changes: 0 additions & 14 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

package sdk

/*
import (
"context"
"fmt"
Expand Down Expand Up @@ -61,7 +59,6 @@ type benchmarkSource struct {
config map[string]string

// measures
configure time.Duration
open time.Duration
firstRead time.Duration
allReads time.Duration
Expand All @@ -75,13 +72,6 @@ func (bm *benchmarkSource) Run(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bm.configure = bm.measure(func() {
err := bm.source.Configure(ctx, bm.config)
if err != nil {
b.Fatal(err)
}
})
bm.open = bm.measure(func() {
err := bm.source.Open(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -174,7 +164,6 @@ func (*benchmarkSource) measure(f func()) time.Duration {
func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(0, "ns/op") // suppress ns/op metric, it is misleading in this benchmarkSource

b.ReportMetric(bm.configure.Seconds(), "configure")
b.ReportMetric(bm.open.Seconds(), "open")
b.ReportMetric(bm.stop.Seconds(), "stop")
b.ReportMetric(bm.teardown.Seconds(), "teardown")
Expand All @@ -185,6 +174,3 @@ func (bm *benchmarkSource) reportMetrics(b *testing.B) {
b.ReportMetric(bm.firstAck.Seconds(), "firstAck")
b.ReportMetric(float64(b.N-1)/bm.allAcks.Seconds(), "acks/s")
}
*/
20 changes: 8 additions & 12 deletions destination_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type DestinationWithBatch struct {
// Maximum size of batch before it gets written to the destination.
BatchSize int `json:"sdk.batch.size" default:"0" validate:"gt=-1"`
// Maximum delay before an incomplete batch is written to the destination.
BatchDelay time.Duration `json:"sdk.batch.delay" default:"0" validate:"gt=-1"`
BatchDelay time.Duration `json:"sdk.batch.delay" default:"0"`
}

// Wrap a Destination into the middleware.
Expand Down Expand Up @@ -164,8 +164,8 @@ func (d *destinationWithBatch) Open(ctx context.Context) error {
return nil
}

// setBatchEnabled stores the boolean in the context. If the context already
// contains the key it will update the boolean under that key and return the
// setBatchConfig stores a DestinationWithBatch instance in the context. If the context already
// contains the key it will update the DestinationWithBatch under that key and return the
// same context, otherwise it will return a new context with the stored value.
// This is used to signal to destinationPluginAdapter if the Destination is
// wrapped into DestinationWithBatchConfig middleware.
Expand All @@ -174,7 +174,7 @@ func (*destinationWithBatch) setBatchConfig(ctx context.Context, cfg Destination
if ok {
*ctxCfg = cfg
} else {
ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, cfg)
ctx = context.WithValue(ctx, ctxKeyBatchConfig{}, &cfg)
}
return ctx
}
Expand Down Expand Up @@ -425,26 +425,22 @@ type destinationWithSchemaExtraction struct {
Destination
config *DestinationWithSchemaExtraction

payloadEnabled bool
keyEnabled bool

payloadWarnOnce sync.Once
keyWarnOnce sync.Once
}

func (d *destinationWithSchemaExtraction) Write(ctx context.Context, records []opencdc.Record) (int, error) {
if d.keyEnabled {
for i := range records {
for i := range records {
if *d.config.KeyEnabled {
if err := d.decodeKey(ctx, &records[i]); err != nil {
if len(records) > 0 {
err = fmt.Errorf("record %d: %w", i, err)
}
return 0, err
}
}
}
if d.payloadEnabled {
for i := range records {

if *d.config.PayloadEnabled {
if err := d.decodePayload(ctx, &records[i]); err != nil {
if len(records) > 0 {
err = fmt.Errorf("record %d: %w", i, err)
Expand Down
Loading

0 comments on commit fc5ffee

Please sign in to comment.