Skip to content

Commit

Permalink
fix: match endpoints and intent/interest by route prefix.
Browse files Browse the repository at this point in the history
  • Loading branch information
foxis committed May 2, 2024
1 parent 56a09df commit ae0347a
Showing 1 changed file with 53 additions and 4 deletions.
57 changes: 53 additions & 4 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ var (
_ Endpoint = (*Container)(nil)
)

// Container stores endpoints, collects all intents and interests and acts as an aggregate Endpoint.
// Container does not link intents and interests.
// Container implements an aggregate Endpoint that stores and manages multiple endpoints,
// and it coordinates the linking of intents and interests across these endpoints.
//
// Actions:
// - Add/Remove endpoints look for existing intents and interests and registers them to respective routers.
Expand All @@ -30,6 +30,7 @@ type Container struct {
interestRouters map[string]*InterestRouter
}

// NewContainer creates a new Container with a given name and size.
func NewContainer(name string, size int) *Container {
return &Container{
BaseEndpoint: NewEndpointBase(name, size),
Expand Down Expand Up @@ -109,8 +110,38 @@ func (t *Container) initializeEndpoint(ep Endpoint) error {

// registerEndpointWithRouters links the endpoint with all existing routers.
func (t *Container) registerEndpointWithRouters(ep Endpoint) {
canPublish := func(Intent) bool { return true }
canSubscribe := func(Interest) bool { return true }
if rep, ok := ep.(RemoteEndpoint); ok {
// Local Intent (created via API)
// - Only if route prefix matches LocalPeer path
// Remote Intent (created via remote)
// - always false, because remote intent is local to the endpoint that created it
canPublish = func(i Intent) bool {
if _, ok := i.(RemoteIntent); ok {
return false
}
return rep.Local().HasPrefix(i.Route())
}

// Local Interest (created via API)
// - Only if route prefix matches RemotePeer path
// Remote Interest (created via remote)
// - always false, because remote interest is local to the endpoint that created it
canSubscribe = func(i Interest) bool {
if _, ok := i.(RemoteInterest); ok {
return false
}
return rep.Remote().HasPrefix(i.Route())
}
}

// Add endpoint to intents
for _, ir := range t.intentRouters {
if !canPublish(ir) {
continue
}

intent, err := ep.Publish(ir.Route())
if err != nil {
t.Log.Warn("AddIntent.Publish", "err", err, "route", ir.Route())
Expand All @@ -124,6 +155,10 @@ func (t *Container) registerEndpointWithRouters(ep Endpoint) {

// Add enpoint to interests
for _, ir := range t.interestRouters {
if !canSubscribe(ir) {
continue
}

interest, err := ep.Subscribe(ir.Route())
if err != nil {
t.Log.Warn("AddIntent.Subscribe", "err", err, "route", ir.Route())
Expand Down Expand Up @@ -203,8 +238,15 @@ func (t *Container) Publish(route Route, opt ...PubOpt) (Intent, error) {
func (t *Container) publish(route Route, opt ...PubOpt) (Intent, error) {
intents := make([]Intent, 0, len(t.endpoints))
// Advertise intents even if we are already publishing
for _, t := range t.endpoints {
intent, err := t.Publish(route)
for _, ep := range t.endpoints {
if rep, ok := ep.(RemoteEndpoint); ok {
// We can only publish as part of local peer prefix
if !rep.Local().HasPrefix(route) {
continue
}
}

intent, err := ep.Publish(route)
if err != nil {
closeAll(intents...)
return nil, err
Expand Down Expand Up @@ -245,6 +287,13 @@ func (t *Container) subscribe(route Route, opt ...SubOpt) (Interest, error) {
// Advertise interests anyway (even if we are already subscribed)
interests := make([]Interest, 0, len(t.endpoints))
for _, ep := range t.endpoints {
if rep, ok := ep.(RemoteEndpoint); ok {
// We can only subscribe to remote peers as part of remote peer prefix
if !rep.Remote().HasPrefix(route) {
continue
}
}

interest, err := ep.Subscribe(route)
if err != nil {
closeAll(interests...)
Expand Down

0 comments on commit ae0347a

Please sign in to comment.