diff --git a/streams/options.go b/streams/options.go index d6dda5c..0fe6060 100644 --- a/streams/options.go +++ b/streams/options.go @@ -94,6 +94,7 @@ type ProduceOptions struct { Timestamp time.Time Metadata map[string]string Processed map[string]bool + Context context.Context } func ProduceWithTimestamp(t time.Time) ProduceOption { @@ -108,11 +109,24 @@ func ProduceWithMetadata(md map[string]string) ProduceOption { } } +func ProduceWithProcessed(p map[string]bool) ProduceOption { + return func(o *ProduceOptions) { + o.Processed = p + } +} + +func ProduceWithContext(ctx context.Context) ProduceOption { + return func(o *ProduceOptions) { + o.Context = ctx + } +} + func NewProduceOptions(opts ...ProduceOption) ProduceOptions { options := ProduceOptions{ Timestamp: time.Now(), Metadata: map[string]string{}, Processed: map[string]bool{}, + Context: context.Background(), } for _, fn := range opts {