Skip to content

Commit

Permalink
Merge pull request #31 from w-h-a/produce-interface
Browse files Browse the repository at this point in the history
refactor: pass raw bytes to produce
  • Loading branch information
w-h-a authored Jul 5, 2024
2 parents 98de594 + be5aef7 commit f68835d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
20 changes: 18 additions & 2 deletions streams/domain.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package streams

import (
"encoding/json"
"errors"
"fmt"
"time"

"google.golang.org/protobuf/proto"
)

type Ack func() error
Expand All @@ -21,8 +23,22 @@ type Event struct {
nack Nack
}

func (e *Event) Marshal(v interface{}) ([]byte, error) {
protoMessage, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to marshal: %v is not a proto message", v)
}

return proto.Marshal(protoMessage)
}

func (e *Event) Unmarshal(v interface{}) error {
return json.Unmarshal(e.Payload, v)
protoMessage, ok := v.(proto.Message)
if !ok {
return fmt.Errorf("failed to unmarshal: %v is not a proto message", v)
}

return proto.Unmarshal(e.Payload, protoMessage)
}

func (e *Event) SetAck(f Ack) {
Expand Down
2 changes: 1 addition & 1 deletion streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ type Streams interface {
Subscribe(id string, opts ...SubscribeOption) error
Unsubscribe(id string) error
Consume(id string) (Subscriber, error)
Produce(topic string, data interface{}, opts ...ProduceOption) error
Produce(topic string, data []byte, opts ...ProduceOption) error
String() string
}

0 comments on commit f68835d

Please sign in to comment.