Skip to content

Commit

Permalink
solved confilcts
Browse files Browse the repository at this point in the history
  • Loading branch information
stanipetrosyan committed Nov 10, 2024
2 parents 96e2527 + 2adf48b commit f3bea60
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 84 deletions.
18 changes: 9 additions & 9 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSubscriberHandler(t *testing.T) {
wg.Done()
})

message := CreateMessage().SetBody("Hi There")
message := NewMessageBuilder().SetPayload("Hi There").Build()
eventBus.Channel("my-channel").Publisher().Publish(message)
wg.Wait()
})
Expand All @@ -37,7 +37,7 @@ func TestSubscriberHandler(t *testing.T) {
wg.Done()
})

message := CreateMessage().SetBody("Hi There")
message := NewMessageBuilder().SetPayload("Hi There").Build()
eventBus.Channel("newaddress").Publisher().Publish(message)
wg.Wait()
})
Expand All @@ -52,10 +52,10 @@ func TestRequestHandler(t *testing.T) {

eventBus.Channel("my-channel").Subscriber().Listen(func(context Context) {
assert.Equal(t, "Hi There", context.Result().Extract())
context.Reply(CreateMessage().SetBody("Hello there!"))
context.Reply(NewMessageBuilder().SetPayload("Hello there!").Build())
})

message := CreateMessage().SetBody("Hi There")
message := NewMessageBuilder().SetPayload("Hi There").Build()
eventBus.Channel("my-channel").Publisher().Request(message, func(context Context) {
assert.Equal(t, "Hello there!", context.Result().Extract())
wg.Done()
Expand All @@ -81,7 +81,7 @@ func TestProcessorHandler(t *testing.T) {
return message.Extract() == "Hi There"
})

message := CreateMessage().SetBody("Hi There")
message := NewMessageBuilder().SetPayload("Hi There").Build()
eventBus.Channel("my-channel").Publisher().Publish(message)
wg.Wait()
})
Expand All @@ -93,13 +93,13 @@ func TestMessageOptions(t *testing.T) {

wg.Add(1)
eventBus.Channel("address").Subscriber().Listen(func(context Context) {
assert.True(t, context.Result().Options().Headers().Contains("key"))
assert.Equal(t, "value", context.Result().Options().Headers().Header("key"))
assert.True(t, context.Result().ExtractHeaders().Contains("key"))
assert.Equal(t, "value", context.Result().ExtractHeaders().Get("key"))
wg.Done()
})

options := NewMessageOptions().SetHeaders(NewHeaders().Add("key", "value"))
message := CreateMessage().SetBody("Hi There").SetOptions(options)
options := NewMessageHeadersBuilder().SetHeader("key", "value").Build()
message := NewMessageBuilder().SetPayload("Hi There").SetHeaders(options).Build()
eventBus.Channel("address").Publisher().Publish(message)
wg.Wait()
}
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func TestClient(t *testing.T) {
conn, err := listener.Accept()
assert.Nil(t, err)

msg := request{Channel: "channel", Message: CreateMessage().SetBody("Hello there")}
message := NewMessageBuilder().SetPayload("Hello there").Build()
msg := request{Channel: "channel", Message: message}
json.NewEncoder(conn).Encode(msg)
}
}()
Expand Down
8 changes: 4 additions & 4 deletions examples/eventbus/publish-subscribe/publish-subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
})

eventbus.Channel("topic1").Processor(func(message goeventbus.Message) bool {
return message.Options().Headers().Contains("header")
return message.ExtractHeaders().Contains("header")
})

eventbus.Channel("topic2").Subscriber().Listen(func(dc goeventbus.Context) {
Expand All @@ -39,8 +39,8 @@ func main() {
}

func publishTo(address string, data string) {
options := goeventbus.NewMessageOptions().SetHeaders(goeventbus.NewHeaders().Add("header", "value"))
message := goeventbus.CreateMessage().SetBody(data).SetOptions(options)
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
message := goeventbus.NewMessageBuilder().SetPayload(data).SetHeaders(options).Build()
publisher := eventbus.Channel(address).Publisher()

for {
Expand All @@ -50,5 +50,5 @@ func publishTo(address string, data string) {
}

func printMessage(data goeventbus.Message) {
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.Options().Headers())
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.ExtractHeaders())
}
8 changes: 4 additions & 4 deletions examples/eventbus/request-response/request-response.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func main() {

eventbus.Channel("topic1").Subscriber().Listen(func(context goeventbus.Context) {
printMessage(context.Result())
message := goeventbus.CreateMessage().SetBody("Hello from subscriber")
message := goeventbus.NewMessageBuilder().SetPayload("Hello from subscriber").Build()
context.Reply(message)
})

Expand All @@ -26,8 +26,8 @@ func main() {
}

func requestTo(address, data string) {
options := goeventbus.NewMessageOptions().SetHeaders(goeventbus.NewHeaders().Add("header", "value"))
message := goeventbus.CreateMessage().SetBody(data).SetOptions(options)
options := goeventbus.NewMessageHeadersBuilder().SetHeader("header", "value").Build()
message := goeventbus.NewMessageBuilder().SetPayload(data).SetHeaders(options).Build()
publisher := eventbus.Channel(address).Publisher()

for {
Expand All @@ -39,5 +39,5 @@ func requestTo(address, data string) {
}

func printMessage(data goeventbus.Message) {
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.Options().Headers())
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.ExtractHeaders())
}
2 changes: 1 addition & 1 deletion examples/networkbus/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ func main() {
}

func printMessage(data goeventbus.Message) {
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.Options().Headers())
fmt.Printf("Message %s, Headers %s\n", data.Extract(), data.ExtractHeaders())
}
2 changes: 1 addition & 1 deletion examples/networkbus/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
go server.Listen()

for {
message := goeventbus.CreateMessage().SetBody("Hello World!")
message := goeventbus.NewMessageBuilder().SetPayload("Hello World!").Build()
server.Publish("hello", message)
time.Sleep(time.Second * 2)
}
Expand Down
44 changes: 27 additions & 17 deletions message.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
package goeventbus

type Message struct {
Data interface{}
MessageOptions MessageOptions
Payload interface{}
Headers MessageHeaders
}

// Returns an empty message
func CreateMessage() Message {
return Message{}
type MessageBuilder interface {
SetPayload(payload any) MessageBuilder
SetHeaders(headers MessageHeaders) MessageBuilder
Build() Message
}

// Set the body of the message. Paramater can be any type
func (m Message) SetBody(data any) Message {
m.Data = data
return m
type defaultMessageBuilder struct {
message Message
}

// Set the options of the message. For create a new options see: NewMessageOptions()
func (m Message) SetOptions(options MessageOptions) Message {
m.MessageOptions = options
return m
func NewMessageBuilder() MessageBuilder {
return &defaultMessageBuilder{message: Message{}}
}

func (mb *defaultMessageBuilder) SetPayload(payload any) MessageBuilder {
mb.message.Payload = payload
return mb
}

func (mb *defaultMessageBuilder) SetHeaders(headers MessageHeaders) MessageBuilder {
mb.message.Headers = headers
return mb
}

func (mb *defaultMessageBuilder) Build() Message {
return mb.message
}

// Returns data of the message
func (m Message) Extract() any {
return m.Data
return m.Payload
}

// Returns options of the message
func (m Message) Options() MessageOptions {
return m.MessageOptions
func (m Message) ExtractHeaders() MessageHeaders {
return m.Headers
}
46 changes: 46 additions & 0 deletions message_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package goeventbus

type Headers struct {
headers map[string]string
}

type MessageHeaders struct {
headers Headers
}

type MessageHeadersBuilder interface {
SetHeader(key string, value string) MessageHeadersBuilder
Build() MessageHeaders
}

type defaultMessageHeadersBuilder struct {
messageHeaders MessageHeaders
}

func NewMessageHeadersBuilder() MessageHeadersBuilder {
return &defaultMessageHeadersBuilder{messageHeaders: MessageHeaders{Headers{headers: map[string]string{}}}}
}

func (hb *defaultMessageHeadersBuilder) SetHeader(key string, value string) MessageHeadersBuilder {
hb.messageHeaders.headers.headers[key] = value
return hb
}

func (hb *defaultMessageHeadersBuilder) Build() MessageHeaders {
return hb.messageHeaders
}

func NewMessageHeaders() MessageHeaders {
return MessageHeaders{
headers: Headers{},
}
}

func (h MessageHeaders) Get(key string) string {
return h.headers.headers[key]
}

func (h MessageHeaders) Contains(key string) bool {
_, exist := h.headers.headers[key]
return exist
}
46 changes: 0 additions & 46 deletions message_options.go

This file was deleted.

2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestServer(t *testing.T) {

time.Sleep(time.Millisecond)

msg := CreateMessage().SetBody("Hello there")
msg := NewMessageBuilder().SetPayload("Hello there").Build()
server.Publish("my-channel", msg)
wg.Wait()
}

0 comments on commit f3bea60

Please sign in to comment.