Skip to content

Commit

Permalink
Changes as suggested by @Darkren
Browse files Browse the repository at this point in the history
* Remove various empty lines and fix typos.
* Remove unnecessary chan close in appevent.Broadcaster.
* Make multiple calls to .OnTCPDial and .OnTCPClose safe for appevent.Subscriber
  • Loading branch information
evanlinjin committed May 26, 2020
1 parent 252e30c commit a3077bd
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 31 deletions.
6 changes: 2 additions & 4 deletions cmd/apps/helloworld/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ func main() {
func prepareSubscriptions() *appevent.Subscriber {
subs := appevent.NewSubscriber()

subs.TCPDial(func(data appevent.TCPDialData) {
subs.OnTCPDial(func(data appevent.TCPDialData) {
log.WithField("event_type", data.Type()).
WithField("event_data", data).
Info("Received event.")
})

subs.TCPClose(func(data appevent.TCPCloseData) {
subs.OnTCPClose(func(data appevent.TCPCloseData) {
log.WithField("event_type", data.Type()).
WithField("event_data", data).
Info("Received event.")
Expand Down Expand Up @@ -121,7 +121,6 @@ func handleServerConn(log logrus.FieldLogger, conn net.Conn) {
}

func runClient(appC *app.Client) {

var remotePK cipher.PubKey
if err := remotePK.UnmarshalText([]byte(*remote)); err != nil {
log.WithError(err).Fatal("Invalid remote public key.")
Expand All @@ -130,7 +129,6 @@ func runClient(appC *app.Client) {
var conn net.Conn

for i := 0; true; i++ {

time.Sleep(time.Second * 2)

if conn != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/appcommon/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ReadHello(r io.Reader) (Hello, error) {

var hello Hello
if err := json.Unmarshal(helloRaw, &hello); err != nil {
return Hello{}, fmt.Errorf("failed to unmarshal hellp data: %w", err)
return Hello{}, fmt.Errorf("failed to unmarshal hello data: %w", err)
}

return hello, nil
Expand Down
12 changes: 4 additions & 8 deletions pkg/app/appevent/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,18 @@ func (mc *Broadcaster) Broadcast(ctx context.Context, e *Event) error {
go notifyClient(ctx, e, client, errCh)
}

// Delete inactive clients and reset error channels.
// Delete inactive clients and associated error channels.
for client, errCh := range mc.clients {
err := <-errCh
close(errCh)

if err != nil {
if err := <-errCh; err != nil {
mc.log.
WithError(err).
WithField("close_error", client.Close()).
WithField("hello", client.Hello().String()).
Warn("Events RPC client closed due to error.")

delete(mc.clients, client)
continue
close(errCh)
}

mc.clients[client] = make(chan error, 1)
}

return nil
Expand Down
2 changes: 0 additions & 2 deletions pkg/app/appevent/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func TestBroadcaster_Broadcast(t *testing.T) {
// - Each of the n(C) RPCClients should receive n(E) event objects.
// - Received event objects should be in the order of sent.
t.Run("broadcast_events", func(t *testing.T) {

// Arrange: constants.
const nClients = 12
const nEvents = 52
Expand Down Expand Up @@ -99,7 +98,6 @@ func TestBroadcaster_Broadcast(t *testing.T) {
// Assert:
// - The RPCClient should have only received events that are of subscribed types.
t.Run("broadcast_only_subscribed_events", func(t *testing.T) {

// Arrange: constants/variables
const nEvents = 64
subs := map[string]bool{TCPDial: true}
Expand Down
38 changes: 22 additions & 16 deletions pkg/app/appevent/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ func NewSubscriber() *Subscriber {
}
}

// TCPDial subscribes to the TCPDial event channel.
// This should only be called once.
func (s *Subscriber) TCPDial(action func(data TCPDialData)) {
s.mx.Lock()
ch := make(chan *Event, s.chanSize)
s.m[TCPDial] = ch
s.mx.Unlock()
// OnTCPDial subscribes to the OnTCPDial event channel (if not already).
// And triggers the contained action func on each subsequent event.
func (s *Subscriber) OnTCPDial(action func(data TCPDialData)) {
evCh := s.ensureEventChan(TCPDial)

go func() {
for ev := range ch {
for ev := range evCh {
var data TCPDialData
ev.Unmarshal(&data)
action(data)
Expand All @@ -49,16 +46,13 @@ func (s *Subscriber) TCPDial(action func(data TCPDialData)) {
}()
}

// TCPClose subscribes to the TCPClose event channel.
// This should only be called once.
func (s *Subscriber) TCPClose(action func(data TCPCloseData)) {
s.mx.Lock()
ch := make(chan *Event, s.chanSize)
s.m[TCPClose] = ch
s.mx.Unlock()
// OnTCPClose subscribes to the OnTCPClose event channel (if not already).
// And triggers the contained action func on each subsequent event.
func (s *Subscriber) OnTCPClose(action func(data TCPCloseData)) {
evCh := s.ensureEventChan(TCPClose)

go func() {
for ev := range ch {
for ev := range evCh {
var data TCPCloseData
ev.Unmarshal(&data)
action(data)
Expand All @@ -67,6 +61,18 @@ func (s *Subscriber) TCPClose(action func(data TCPCloseData)) {
}()
}

func (s *Subscriber) ensureEventChan(eventType string) chan *Event {
s.mx.Lock()
ch, ok := s.m[eventType]
if !ok {
ch = make(chan *Event, s.chanSize)
s.m[eventType] = ch
}
s.mx.Unlock()

return ch
}

// Subscriptions returns a map of all subscribed event types.
func (s *Subscriber) Subscriptions() map[string]bool {
s.mx.RLock()
Expand Down
1 change: 1 addition & 0 deletions pkg/snet/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func New(conf Config, eb *appevent.Broadcaster) *Network {
data := appevent.TCPDialData{RemoteNet: network, RemoteAddr: addr}
event := appevent.NewEvent(appevent.TCPDial, data)
_ = eb.Broadcast(context.Background(), event) //nolint:errcheck
// @evanlinjin: An error is not returned here as this will cancel the session dial.
return nil
},
OnSessionDisconnect: func(network, addr string, _ error) {
Expand Down

0 comments on commit a3077bd

Please sign in to comment.