Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(async): dispatcher #1910

Merged
merged 111 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 104 commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
fdd36ff
dispatcher
ocnc2 Aug 6, 2024
9ddb355
reorg
ocnc2 Aug 6, 2024
6d6c6f3
sleep
ocnc2 Aug 7, 2024
1e6ad38
dispatch middleware
ocnc2 Aug 7, 2024
c61d794
wip
ocnc2 Aug 7, 2024
03cf52b
Merge branch 'main' into dispatcherv2
nidhi-singh02 Aug 7, 2024
9df8769
dispatch validator
ocnc2 Aug 7, 2024
b176c5d
Merge branch 'main' into dispatcherv2
ocnc2 Aug 7, 2024
bf8787b
types
ocnc2 Aug 7, 2024
c999d17
dispatcher pruner deposit
ocnc2 Aug 7, 2024
cc7567d
dispatch da service and add more routes
ocnc2 Aug 7, 2024
52f4458
Merge branch 'main' into dispatcherv2
ocnc2 Aug 7, 2024
9066944
mock
ocnc2 Aug 7, 2024
6eb7206
give chainservice a dispatcher
ocnc2 Aug 7, 2024
66d66ee
why was this even here
ocnc2 Aug 7, 2024
cc96b11
goodbye broker
ocnc2 Aug 7, 2024
70c0f36
tidy
ocnc2 Aug 7, 2024
c6a8a43
tmr
ocnc2 Aug 8, 2024
748d712
logger
ocnc2 Aug 8, 2024
e3db47b
Merge branch 'main' into dispatcherv2
ocnc2 Aug 8, 2024
7f2dbfa
checkpoint
ocnc2 Aug 8, 2024
1a4d885
merge main fr this time
ocnc2 Aug 8, 2024
a686a03
Merge remote-tracking branch 'origin/main' into dispatcherv2
ocnc2 Aug 8, 2024
c27ada8
bointer]
ocnc2 Aug 8, 2024
bc43819
cleanup with arch
ocnc2 Aug 8, 2024
a6172ed
Merge branch 'main' into dispatcherv2
ocnc2 Aug 8, 2024
9213b65
rename
ocnc2 Aug 8, 2024
b6cb638
fixes & cleanup
ocnc2 Aug 8, 2024
a0fcd08
checkpoint
ocnc2 Aug 9, 2024
da942ef
bet
ocnc2 Aug 9, 2024
82d08cd
lint
ocnc2 Aug 9, 2024
1792026
comments
ocnc2 Aug 9, 2024
9b4a337
pruner
ocnc2 Aug 9, 2024
125ad29
fixes
ocnc2 Aug 9, 2024
eb9c207
Merge branch 'main' into dispatcherv2
ocnc2 Aug 9, 2024
ac6d1cb
revert pruner change
ocnc2 Aug 9, 2024
6db6b49
Merge branch 'main' into dispatcherv2
ocnc2 Aug 9, 2024
0a0735a
pass dispatcher in deposit service
ocnc2 Aug 9, 2024
c84a80d
registry
ocnc2 Aug 9, 2024
a57efa2
provide
ocnc2 Aug 9, 2024
6806275
nil
ocnc2 Aug 9, 2024
687bf5f
bet
ocnc2 Aug 9, 2024
ec0bf81
merge main
ocnc2 Aug 9, 2024
64212d2
tidy
ocnc2 Aug 9, 2024
82e0a8e
merge main
ocnc2 Aug 9, 2024
fd10d25
tidy
ocnc2 Aug 9, 2024
3bd43d6
future + modular refactor
ocnc2 Aug 12, 2024
3f42ab8
bet
ocnc2 Aug 12, 2024
90e5f12
Merge branch 'main' into dispatcherv2
ocnc2 Aug 12, 2024
ae045ff
nit fixes
ocnc2 Aug 12, 2024
85b9c7b
interface
ocnc2 Aug 12, 2024
d0289d1
Merge remote-tracking branch 'origin/main' into dispatcherv2
ocnc2 Aug 12, 2024
6301814
bet
ocnc2 Aug 12, 2024
4e4440e
Merge branch 'main' into dispatcherv2
ocnc2 Aug 12, 2024
08fff98
nilaway
ocnc2 Aug 12, 2024
cd0b7be
interface
ocnc2 Aug 12, 2024
c77f234
Merge branch 'main' into dispatcherv2
ocnc2 Aug 14, 2024
a66708b
bet
ocnc2 Aug 14, 2024
584a23d
events
ocnc2 Aug 14, 2024
8ba8d51
bet
ocnc2 Aug 14, 2024
6abc5a8
sub
ocnc2 Aug 15, 2024
c231dd4
merge main
ocnc2 Aug 15, 2024
b2ca9be
validator
ocnc2 Aug 15, 2024
6235fb4
merge main again
ocnc2 Aug 15, 2024
f287b88
Merge branch 'main' into dispatcherv3
ocnc2 Aug 15, 2024
00d6466
da service
ocnc2 Aug 15, 2024
1f319a3
betbetbet
ocnc2 Aug 15, 2024
52bea8f
Merge branch 'main' into dispatcherv3
ocnc2 Aug 15, 2024
8073631
tidy
ocnc2 Aug 15, 2024
55241fe
builds
ocnc2 Aug 16, 2024
2488a15
bet
ocnc2 Aug 16, 2024
7c4a05d
omg im so dumb
ocnc2 Aug 16, 2024
5583791
temp
ocnc2 Aug 16, 2024
6989f61
Merge remote-tracking branch 'origin/main' into dispatcherv3
ocnc2 Aug 16, 2024
9b951e0
bet
ocnc2 Aug 16, 2024
d00a0c6
bet
ocnc2 Aug 16, 2024
4bd83fa
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 16, 2024
e940239
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 16, 2024
b254bf2
delete mock
ocnc2 Aug 16, 2024
2a30b9f
remove bundle
ocnc2 Aug 16, 2024
ce03370
bet
ocnc2 Aug 16, 2024
0775651
oops
ocnc2 Aug 16, 2024
77a8de3
mergegege
ocnc2 Aug 16, 2024
d9877d6
mergbemr again
ocnc2 Aug 16, 2024
c5022ae
name
ocnc2 Aug 16, 2024
fb16e2d
merge
ocnc2 Aug 16, 2024
5c7c518
tidy sync
ocnc2 Aug 16, 2024
7a69ea1
names
ocnc2 Aug 17, 2024
6916834
bet
ocnc2 Aug 18, 2024
00b4f20
oops
ocnc2 Aug 18, 2024
5bb13eb
tidy
ocnc2 Aug 18, 2024
aaf9bd6
delete async/event.go
ocnc2 Aug 18, 2024
a173b27
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 18, 2024
e3a7cef
rename events -> async
ocnc2 Aug 18, 2024
af595bc
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 19, 2024
9cf5fad
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 19, 2024
d481cbd
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 19, 2024
373e7fa
merge
ocnc2 Aug 19, 2024
bccddda
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 19, 2024
db73444
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 19, 2024
d814ad3
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 19, 2024
fa13257
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 19, 2024
f720b0a
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 20, 2024
f962cb8
allow concurrent notification attempts
ocnc2 Aug 20, 2024
e67ef72
bet
ocnc2 Aug 20, 2024
117a69a
errors
ocnc2 Aug 20, 2024
3cb0bfb
lint
ocnc2 Aug 20, 2024
596b6ef
lint
ocnc2 Aug 20, 2024
db0bd2f
Merge branch 'main' into dispatcher-fr
ocnc2 Aug 20, 2024
9494830
Merge remote-tracking branch 'origin/main' into dispatcher-fr
ocnc2 Aug 20, 2024
e5bad5b
Merge branch 'main' into dispatcher-fr
itsdevbear Aug 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions mod/async/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
module github.com/berachain/beacon-kit/mod/async

go 1.23.0

require (
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573
)

require (
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/getsentry/sentry-go v0.28.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
)
74 changes: 74 additions & 0 deletions mod/async/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0 h1:kCSrkb/uVXfMKJPKjf0c7nlJkwn5cNwMxtzRW4zNq2A=
github.com/berachain/beacon-kit/mod/errors v0.0.0-20240806211103-d1105603bfc0/go.mod h1:og0jtHZosPDTyhge9tMBlRItoZ4Iv3aZFM9n4QDpcdo=
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd h1:DYSjsq80Omqqlt+z2VcYsSxjZpLqCDRz7CvUDBrLDJE=
github.com/berachain/beacon-kit/mod/log v0.0.0-20240807213340-5779c7a563cd/go.mod h1:BilVBmqKhC4GXYCaIs8QnKaR14kpn3YmF5uYBdayF9I=
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573 h1:2yo+tgSE9kwk/yaapVFwa87mweovWt4HOrXr3SH3zhk=
github.com/berachain/beacon-kit/mod/primitives v0.0.0-20240816231924-221a061d3573/go.mod h1:Cv49avty5oqeCGw0zP8aLpTsVlhL9pfmWEuyoGzUuQA=
github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I=
github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30=
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/getsentry/sentry-go v0.28.1 h1:zzaSm/vHmGllRM6Tpx1492r0YDzauArdBfkJRtY6P5k=
github.com/getsentry/sentry-go v0.28.1/go.mod h1:1fQZ+7l7eeJ3wYi82q5Hg8GqAPgefRq+FP/QhafYVgg=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
31 changes: 31 additions & 0 deletions mod/async/pkg/broker/assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2024, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

package broker

// ensureType ensures that the provided entity is of type T.
// It returns a typed entity or an error if the type is not correct.
func ensureType[T any](e any) (T, error) {
typedE, ok := e.(T)
if !ok {
return *new(T), errIncompatibleAssignee(*new(T), e)
}
return typedE, nil
}
Comment on lines +23 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider enhancing error handling in ensureType.

The ensureType function correctly performs a type assertion. Consider providing more context in the error message returned by errIncompatibleAssignee to improve debuggability.

// Example enhancement: Include the expected and actual types in the error message.
func ensureType[T any](e any) (T, error) {
	typedE, ok := e.(T)
	if !ok {
		expectedType := reflect.TypeOf(*new(T)).String()
		actualType := reflect.TypeOf(e).String()
		return *new(T), fmt.Errorf("expected type %s, but got %s", expectedType, actualType)
	}
	return typedE, nil
}

132 changes: 91 additions & 41 deletions mod/async/pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,48 @@ package broker

import (
"context"
"sync"
"time"

"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// Broker broadcasts msgs to registered clients.
type Broker[T any] struct {
// name of the message broker.
name string
// clients is a map of registered clients.
clients map[chan T]struct{}
// Broker is responsible for broadcasting all events corresponding to the
// <eventID> to all registered client channels.
type Broker[T async.BaseEvent] struct {
// eventID is a unique identifier for the event that this broker is
// responsible for.
eventID async.EventID
// subscriptions is a map of subscribed subscriptions.
subscriptions map[chan T]struct{}
// msgs is the channel for publishing new messages.
msgs chan T
// timeout is the timeout for sending a msg to a client.
timeout time.Duration
// mu is the mutex for the clients map.
mu sync.Mutex
}

// New creates a new b.
func New[T any](name string) *Broker[T] {
// New creates a new broker publishing events of type T for the
// provided eventID.
func New[T async.BaseEvent](eventID string) *Broker[T] {
return &Broker[T]{
clients: make(map[chan T]struct{}),
msgs: make(chan T, defaultBufferSize),
timeout: defaultTimeout,
name: name,
eventID: async.EventID(eventID),
subscriptions: make(map[chan T]struct{}),
msgs: make(chan T, defaultBufferSize),
timeout: defaultBrokerTimeout,
mu: sync.Mutex{},
ocnc2 marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +49 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider Adding Comments for Clarity in New Function.

Adding comments to explain the purpose of each field in the New function could enhance readability, especially for new developers.

// New creates a new broker publishing events of type T for the provided eventID.
func New[T async.BaseEvent](eventID string) *Broker[T] {
	return &Broker[T]{
		eventID:       async.EventID(eventID), // Unique identifier for the event type
		subscriptions: make(map[chan T]struct{}), // Map to manage client subscriptions
		msgs:          make(chan T, defaultBufferSize), // Channel for message publishing
		timeout:       defaultBrokerTimeout, // Timeout for message delivery
		mu:            sync.Mutex{}, // Mutex for concurrency safety
	}
}

}
}

// Name returns the name of the msg broker.
func (b *Broker[T]) Name() string {
return b.name
// EventID returns the event ID that the broker is responsible for.
func (b *Broker[T]) EventID() async.EventID {
return b.eventID
}

// Start starts the broker loop.
func (b *Broker[T]) Start(ctx context.Context) error {
func (b *Broker[T]) Start(ctx context.Context) {
go b.start(ctx)
return nil
}

// start starts the broker loop.
Expand All @@ -64,47 +72,89 @@ func (b *Broker[T]) start(ctx context.Context) {
select {
case <-ctx.Done():
// close all leftover clients and break the broker loop
for client := range b.clients {
b.Unsubscribe(client)
}
b.shutdown()
return
case msg := <-b.msgs:
// broadcast published msg to all clients
for client := range b.clients {
// send msg to client (or discard msg after timeout)
select {
case client <- msg:
case <-time.After(b.timeout):
}
}
b.broadcast(msg)
}
}
}

// Publish publishes a msg to the b.
// Publish publishes a msg to all subscribers.
// Returns ErrTimeout on timeout.
func (b *Broker[T]) Publish(ctx context.Context, msg T) error {
func (b *Broker[T]) Publish(msg async.BaseEvent) error {
typedMsg, err := ensureType[T](msg)
if err != nil {
return err
}
ctx := msg.Context()
select {
case b.msgs <- msg:
case b.msgs <- typedMsg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// Subscribe registers a new client to the broker and returns it to the caller.
// Subscribe registers the provided channel to the broker,
// Returns ErrTimeout on timeout.
func (b *Broker[T]) Subscribe() (chan T, error) {
client := make(chan T)
b.clients[client] = struct{}{}
return client, nil
// Contract: the channel must be a Subscription[T], where T is the expected
// type of the event data.
func (b *Broker[T]) Subscribe(ch any) error {
client, err := ensureType[chan T](ch)
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
b.subscriptions[client] = struct{}{}
return nil
}

// Unsubscribe removes a client from the b.
// Returns ErrTimeout on timeout.
func (b *Broker[T]) Unsubscribe(client chan T) {
// Remove the client from the broker
delete(b.clients, client)
// close the client channel
// Unsubscribe removes a client from the broker.
// Returns an error if the provided channel is not of type chan T.
func (b *Broker[T]) Unsubscribe(ch any) error {
client, err := ensureType[chan T](ch)
if err != nil {
return err
}
b.mu.Lock()
defer b.mu.Unlock()
delete(b.subscriptions, client)
close(client)
return nil
}

// broadcast broadcasts a msg to all clients.
func (b *Broker[T]) broadcast(msg T) {
// create separate slice to avoid holding the lock during the entire
// broadcast
b.mu.Lock()
clients := make([]chan T, 0, len(b.subscriptions))
for client := range b.subscriptions {
clients = append(clients, client)
}
b.mu.Unlock()

// launch a goroutine for each client to allow for concurrent notification
// attempts, while respecting the timeout for each client individually
for _, client := range clients {
go func(c chan T) {
select {
case c <- msg:
case <-time.After(b.timeout):
// discard msg after timeout
}
}(client)
}
}

// shutdown closes all leftover clients.
func (b *Broker[T]) shutdown() {
for client := range b.subscriptions {
if err := b.Unsubscribe(client); err != nil {
panic(err)
}
}
}
8 changes: 5 additions & 3 deletions mod/async/pkg/broker/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ package broker

import "time"

// TODO: make timeout configurable thorugh config/context
const (
// defaultTimeout specifies the default timeout when the broker
// defaultBrokerTimeout specifies the default timeout when the publisher
// tries to send a message to a client, a message is published to the
// broker, or a client subscribes or unsubscribes.
defaultTimeout = time.Second
// publisher, or a client subscribes or unsubscribes.
defaultBrokerTimeout = time.Second
Comment on lines +25 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming approved; consider addressing the TODO comment.

The renaming of defaultTimeout to defaultBrokerTimeout improves clarity. Ensure that the TODO comment regarding configurability is tracked and addressed in future updates.

Would you like me to open a GitHub issue to track the configurability enhancement for the timeout?


// defaultBufferSize specifies the default size of the message buffer.
defaultBufferSize = 10
)
22 changes: 19 additions & 3 deletions mod/async/pkg/broker/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,24 @@
package broker

import (
"errors"
"github.com/berachain/beacon-kit/mod/errors"
)

// ErrTimeout is the error returned when a broker operation timed out.
var ErrTimeout = errors.New("timeout")
// errTimeout is the error returned when a dispatch operation timed out.
//
//nolint:gochecknoglobals // errors
var (
ErrIncompatible = errors.New("incompatible assignee")
// errIncompatibleAssignee is the error returned when the assignee is not
// compatible with the assigner.
errIncompatibleAssignee = func(
assigner interface{}, assignee interface{},
) error {
return errors.Wrapf(
ErrIncompatible,
"expected: %T, received: %T",
assigner,
assignee,
)
}
)
Loading
Loading