-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(async): dispatcher #1910
feat(async): dispatcher #1910
Changes from 102 commits
fdd36ff
9ddb355
6d6c6f3
1e6ad38
c61d794
03cf52b
9df8769
b176c5d
bf8787b
c999d17
cc7567d
52f4458
9066944
6eb7206
66d66ee
cc96b11
70c0f36
c6a8a43
748d712
e3db47b
7f2dbfa
1a4d885
a686a03
c27ada8
bc43819
a6172ed
9213b65
b6cb638
a0fcd08
da942ef
82d08cd
1792026
9b4a337
125ad29
eb9c207
ac6d1cb
6db6b49
0a0735a
c84a80d
a57efa2
6806275
687bf5f
ec0bf81
64212d2
82e0a8e
fd10d25
3bd43d6
3f42ab8
90e5f12
ae045ff
85b9c7b
d0289d1
6301814
4e4440e
08fff98
cd0b7be
c77f234
a66708b
584a23d
8ba8d51
6abc5a8
c231dd4
b2ca9be
6235fb4
f287b88
00d6466
1f319a3
52bea8f
8073631
55241fe
2488a15
7c4a05d
5583791
6989f61
9b951e0
d00a0c6
4bd83fa
e940239
b254bf2
2a30b9f
ce03370
0775651
77a8de3
d9877d6
c5022ae
fb16e2d
5c7c518
7a69ea1
6916834
00b4f20
5bb13eb
aaf9bd6
a173b27
e3a7cef
af595bc
9cf5fad
d481cbd
373e7fa
bccddda
db73444
d814ad3
fa13257
f720b0a
f962cb8
e67ef72
117a69a
3cb0bfb
596b6ef
db0bd2f
9494830
e5bad5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,23 @@ | ||
module github.com/berachain/beacon-kit/mod/async | ||
|
||
go 1.23.0 | ||
|
||
require ( | ||
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0 | ||
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd | ||
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573 | ||
) | ||
|
||
require ( | ||
github.com/cockroachdb/errors v1.11.3 // indirect | ||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect | ||
github.com/cockroachdb/redact v1.1.5 // indirect | ||
github.com/getsentry/sentry-go v0.28.1 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/kr/pretty v0.3.1 // indirect | ||
github.com/kr/text v0.2.0 // indirect | ||
github.com/pkg/errors v0.9.1 // indirect | ||
github.com/rogpeppe/go-internal v1.12.0 // indirect | ||
golang.org/x/sys v0.23.0 // indirect | ||
golang.org/x/text v0.17.0 // indirect | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0 h1:kCSrkb/uVXfMKJPKjf0c7nlJkwn5cNwMxtzRW4zNq2A= | ||
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0/go.mod h1:og0jtHZosPDTyhge9tMBlRItoZ4Iv3aZFM9n4QDpcdo= | ||
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd h1:DYSjsq80Omqqlt+z2VcYsSxjZpLqCDRz7CvUDBrLDJE= | ||
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd/go.mod h1:BilVBmqKhC4GXYCaIs8QnKaR14kpn3YmF5uYBdayF9I= | ||
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573 h1:2yo+tgSE9kwk/yaapVFwa87mweovWt4HOrXr3SH3zhk= | ||
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573/go.mod h1:Cv49avty5oqeCGw0zP8aLpTsVlhL9pfmWEuyoGzUuQA= | ||
github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= | ||
github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= | ||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= | ||
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= | ||
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= | ||
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= | ||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= | ||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= | ||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/getsentry/sentry-go v0.28.1 h1:zzaSm/vHmGllRM6Tpx1492r0YDzauArdBfkJRtY6P5k= | ||
github.com/getsentry/sentry-go v0.28.1/go.mod h1:1fQZ+7l7eeJ3wYi82q5Hg8GqAPgefRq+FP/QhafYVgg= | ||
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= | ||
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= | ||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= | ||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= | ||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= | ||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= | ||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= | ||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= | ||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= | ||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= | ||
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= | ||
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= | ||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= | ||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= | ||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= | ||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= | ||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= | ||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= | ||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= | ||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= | ||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | ||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | ||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= | ||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= | ||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | ||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | ||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | ||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= | ||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | ||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | ||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= | ||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | ||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= | ||
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= | ||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | ||
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= | ||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= | ||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | ||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= | ||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= | ||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= | ||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
// SPDX-License-Identifier: BUSL-1.1 | ||
// | ||
// Copyright (C) 2024, Berachain Foundation. All rights reserved. | ||
// Use of this software is governed by the Business Source License included | ||
// in the LICENSE file of this repository and at www.mariadb.com/bsl11. | ||
// | ||
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY | ||
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER | ||
// VERSIONS OF THE LICENSED WORK. | ||
// | ||
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF | ||
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF | ||
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE). | ||
// | ||
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON | ||
// AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, | ||
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF | ||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND | ||
// TITLE. | ||
|
||
package broker | ||
|
||
// ensureType ensures that the provided entity is of type T. | ||
// It returns a typed entity or an error if the type is not correct. | ||
func ensureType[T any](e any) (T, error) { | ||
typedE, ok := e.(T) | ||
if !ok { | ||
return *new(T), errIncompatibleAssignee(*new(T), e) | ||
} | ||
return typedE, nil | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,89 +22,124 @@ package broker | |
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/berachain/beacon-kit/mod/primitives/pkg/async" | ||
) | ||
|
||
// Broker broadcasts msgs to registered clients. | ||
type Broker[T any] struct { | ||
// name of the message broker. | ||
name string | ||
// clients is a map of registered clients. | ||
clients map[chan T]struct{} | ||
// Broker is responsible for broadcasting all events corresponding to the | ||
// <eventID> to all registered client channels. | ||
type Broker[T async.BaseEvent] struct { | ||
eventID async.EventID | ||
// subscriptions is a map of subscribed subscriptions. | ||
subscriptions map[chan T]struct{} | ||
// msgs is the channel for publishing new messages. | ||
msgs chan T | ||
// timeout is the timeout for sending a msg to a client. | ||
timeout time.Duration | ||
// mu is the mutex for the clients map. | ||
mu sync.Mutex | ||
} | ||
|
||
// New creates a new b. | ||
func New[T any](name string) *Broker[T] { | ||
// New creates a new broker publishing events of type T for the | ||
// provided eventID. | ||
func New[T async.BaseEvent](eventID string) *Broker[T] { | ||
return &Broker[T]{ | ||
clients: make(map[chan T]struct{}), | ||
msgs: make(chan T, defaultBufferSize), | ||
timeout: defaultTimeout, | ||
name: name, | ||
eventID: async.EventID(eventID), | ||
subscriptions: make(map[chan T]struct{}), | ||
msgs: make(chan T, defaultBufferSize), | ||
timeout: defaultBrokerTimeout, | ||
mu: sync.Mutex{}, | ||
ocnc2 marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+49
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider Adding Comments for Clarity in Adding comments to explain the purpose of each field in the // New creates a new broker publishing events of type T for the provided eventID.
func New[T async.BaseEvent](eventID string) *Broker[T] {
return &Broker[T]{
eventID: async.EventID(eventID), // Unique identifier for the event type
subscriptions: make(map[chan T]struct{}), // Map to manage client subscriptions
msgs: make(chan T, defaultBufferSize), // Channel for message publishing
timeout: defaultBrokerTimeout, // Timeout for message delivery
mu: sync.Mutex{}, // Mutex for concurrency safety
}
} |
||
} | ||
} | ||
|
||
// Name returns the name of the msg broker. | ||
func (b *Broker[T]) Name() string { | ||
return b.name | ||
// EventID returns the event ID that the broker is responsible for. | ||
func (p *Broker[T]) EventID() async.EventID { | ||
return p.eventID | ||
} | ||
|
||
// Start starts the broker loop. | ||
func (b *Broker[T]) Start(ctx context.Context) error { | ||
go b.start(ctx) | ||
return nil | ||
func (p *Broker[T]) Start(ctx context.Context) { | ||
go p.start(ctx) | ||
} | ||
|
||
// start starts the broker loop. | ||
func (b *Broker[T]) start(ctx context.Context) { | ||
func (p *Broker[T]) start(ctx context.Context) { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
// close all leftover clients and break the broker loop | ||
for client := range b.clients { | ||
b.Unsubscribe(client) | ||
} | ||
p.shutdown() | ||
return | ||
case msg := <-b.msgs: | ||
case msg := <-p.msgs: | ||
// broadcast published msg to all clients | ||
for client := range b.clients { | ||
// send msg to client (or discard msg after timeout) | ||
select { | ||
case client <- msg: | ||
case <-time.After(b.timeout): | ||
} | ||
} | ||
p.broadcast(msg) | ||
} | ||
} | ||
} | ||
|
||
// Publish publishes a msg to the b. | ||
// Publish publishes a msg to all subscribers. | ||
// Returns ErrTimeout on timeout. | ||
func (b *Broker[T]) Publish(ctx context.Context, msg T) error { | ||
func (p *Broker[T]) Publish(msg async.BaseEvent) error { | ||
typedMsg, err := ensureType[T](msg) | ||
if err != nil { | ||
return err | ||
} | ||
ctx := msg.Context() | ||
select { | ||
case b.msgs <- msg: | ||
case p.msgs <- typedMsg: | ||
return nil | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
|
||
// Subscribe registers a new client to the broker and returns it to the caller. | ||
// Subscribe registers the provided channel to the broker, | ||
// Returns ErrTimeout on timeout. | ||
func (b *Broker[T]) Subscribe() (chan T, error) { | ||
client := make(chan T) | ||
b.clients[client] = struct{}{} | ||
return client, nil | ||
// Contract: the channel must be a Subscription[T], where T is the expected | ||
// type of the event data. | ||
func (p *Broker[T]) Subscribe(ch any) error { | ||
client, err := ensureType[chan T](ch) | ||
if err != nil { | ||
return err | ||
} | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
p.subscriptions[client] = struct{}{} | ||
return nil | ||
} | ||
|
||
// Unsubscribe removes a client from the b. | ||
// Returns ErrTimeout on timeout. | ||
func (b *Broker[T]) Unsubscribe(client chan T) { | ||
// Remove the client from the broker | ||
delete(b.clients, client) | ||
// close the client channel | ||
// Unsubscribe removes a client from the broker. | ||
// Returns an error if the provided channel is not of type chan T. | ||
func (p *Broker[T]) Unsubscribe(ch any) error { | ||
client, err := ensureType[chan T](ch) | ||
if err != nil { | ||
return err | ||
} | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
delete(p.subscriptions, client) | ||
close(client) | ||
return nil | ||
} | ||
|
||
// broadcast broadcasts a msg to all clients. | ||
func (p *Broker[T]) broadcast(msg T) { | ||
for client := range p.subscriptions { | ||
// send msg to client (or discard msg after timeout) | ||
select { | ||
case client <- msg: | ||
case <-time.After(p.timeout): | ||
} | ||
} | ||
} | ||
|
||
// shutdown closes all leftover clients. | ||
func (p *Broker[T]) shutdown() { | ||
for client := range p.subscriptions { | ||
if err := p.Unsubscribe(client); err != nil { | ||
panic(err) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,11 +22,13 @@ package broker | |
|
||
import "time" | ||
|
||
// TODO: make timeout configurable thorugh config/context | ||
const ( | ||
// defaultTimeout specifies the default timeout when the broker | ||
// defaultBrokerTimeout specifies the default timeout when the publisher | ||
// tries to send a message to a client, a message is published to the | ||
// broker, or a client subscribes or unsubscribes. | ||
defaultTimeout = time.Second | ||
// publisher, or a client subscribes or unsubscribes. | ||
defaultBrokerTimeout = time.Second | ||
Comment on lines
+25
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renaming approved; consider addressing the TODO comment. The renaming of Would you like me to open a GitHub issue to track the configurability enhancement for the timeout? |
||
|
||
// defaultBufferSize specifies the default size of the message buffer. | ||
defaultBufferSize = 10 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider enhancing error handling in
ensureType
.The
ensureType
function correctly performs a type assertion. Consider providing more context in the error message returned byerrIncompatibleAssignee
to improve debuggability.