diff --git a/vmware-event-router/internal/processor/aws/aws_event_bridge.go b/vmware-event-router/internal/processor/aws/aws_event_bridge.go index 652b23e3..47b59d1d 100644 --- a/vmware-event-router/internal/processor/aws/aws_event_bridge.go +++ b/vmware-event-router/internal/processor/aws/aws_event_bridge.go @@ -29,10 +29,40 @@ const ( defaultBatchSize = 10 // max 10 input events per batch sent to AWS ) +// rules pattern to event bus mapping +type patternMap struct { + sync.RWMutex + subjects map[string]string +} + +// matches checks whether the given subject is in the pattern map and returns +// the associated event bus +func (pm *patternMap) matches(subject string) (string, bool) { + pm.RLock() + defer pm.RUnlock() + bus, matched := pm.subjects[subject] + return bus, matched +} + +// addRule adds a subject from the specified event bus to the pattern map +func (pm *patternMap) addSubject(subject, bus string) { + pm.Lock() + defer pm.Unlock() + pm.subjects[subject] = bus +} + +// init initializes the pattern map +func (pm *patternMap) init() { + pm.Lock() + defer pm.Unlock() + pm.subjects = map[string]string{} +} + // EventBridgeProcessor implements the Processor interface type EventBridgeProcessor struct { session session.Session eventbridgeiface.EventBridgeAPI + patternMap *patternMap // options verbose bool @@ -40,9 +70,8 @@ type EventBridgeProcessor struct { batchSize int *log.Logger - mu sync.RWMutex - patternMap map[string]string // rules pattern to event bus mapping - stats metrics.EventStats + mu sync.RWMutex + stats metrics.EventStats } // assert we implement Processor interface @@ -62,7 +91,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve resyncInterval: defaultResyncInterval, batchSize: defaultBatchSize, Logger: logger, - patternMap: make(map[string]string), + patternMap: &patternMap{}, } // apply options @@ -119,6 +148,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve nextToken *string ) + eventBridge.patternMap.init() for !found { rules, err := eventBridge.ListRulesWithContext(ctx, &eventbridge.ListRulesInput{ EventBusName: aws.String(cfg.EventBus), // explicitly passing eventbus name because list assumes "default" otherwise @@ -148,7 +178,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve } for _, s := range e.Detail.Subject { eventBridge.Printf("adding rule event forwarding pattern %q to processor", s) - eventBridge.patternMap[s] = *rule.EventBusName + eventBridge.patternMap.addSubject(s, *rule.EventBusName) } found = true break arnLoop @@ -190,60 +220,62 @@ func (eb *EventBridgeProcessor) Process(ctx context.Context, ce cloudevents.Even eb.Printf("processing event (ID %s): %v", ce.ID(), ce) } + subject := ce.Subject() eb.mu.Lock() - defer eb.mu.Unlock() + // initialize invocation stats + if _, ok := eb.stats.Invocations[subject]; !ok { + eb.stats.Invocations[subject] = &metrics.InvocationDetails{} + } + eb.mu.Unlock() - if _, ok := eb.patternMap[ce.Subject()]; !ok { - // no event bridge rule pattern (subscription) for event, skip - if eb.verbose { - eb.Printf("pattern rule does not match, skipping event (ID %s): %v", ce.ID(), ce) + if bus, ok := eb.patternMap.matches(subject); ok { + jsonBytes, err := json.Marshal(ce) + if err != nil { + msg := fmt.Errorf("could not marshal event %v: %v", ce, err) + eb.Println(msg) + return processor.NewError(config.ProcessorEventBridge, msg) } - return nil - } - - jsonBytes, err := json.Marshal(ce) - if err != nil { - msg := fmt.Errorf("could not marshal event %v: %v", ce, err) - eb.Println(msg) - return processor.NewError(config.ProcessorEventBridge, msg) - } + jsonString := string(jsonBytes) + entry := eventbridge.PutEventsRequestEntry{ + Detail: aws.String(jsonString), + EventBusName: aws.String(bus), + Source: aws.String(ce.Source()), + DetailType: aws.String(subject), + } - jsonString := string(jsonBytes) - entry := eventbridge.PutEventsRequestEntry{ - Detail: aws.String(jsonString), - EventBusName: aws.String(eb.patternMap[ce.Subject()]), - Source: aws.String(ce.Source()), - DetailType: aws.String(ce.Subject()), - } + // TODO: add batching (metrics stats currently assume single item) + input := eventbridge.PutEventsInput{ + Entries: []*eventbridge.PutEventsRequestEntry{&entry}, + } - // check for existing topic entry in metrics - if _, ok := eb.stats.Invocations[ce.Subject()]; !ok { - eb.stats.Invocations[ce.Subject()] = &metrics.InvocationDetails{} - } + eb.Printf("sending event %s", ce.ID()) + resp, err := eb.PutEventsWithContext(ctx, &input) - // TODO: add batching (metrics stats currently assume single item) - input := eventbridge.PutEventsInput{ - Entries: []*eventbridge.PutEventsRequestEntry{&entry}, - } + eb.mu.Lock() + defer eb.mu.Unlock() + if err != nil { + msg := fmt.Errorf("could not send event %v: %v", ce, err) + eb.Println(msg) + eb.stats.Invocations[subject].Failure() + return processor.NewError(config.ProcessorEventBridge, msg) + } - eb.Printf("sending event %s", ce.ID()) - resp, err := eb.PutEventsWithContext(ctx, &input) + if eb.verbose { + eb.Printf("successfully sent event %v: %v", ce, resp) + } else { + eb.Printf("successfully sent event %s", ce.ID()) + } - if err != nil { - msg := fmt.Errorf("could not send event %v: %v", ce, err) - eb.Println(msg) - eb.stats.Invocations[ce.Subject()].Failure() - return processor.NewError(config.ProcessorEventBridge, msg) + eb.stats.Invocations[subject].Success() + return nil } + // no event bridge rule pattern (subscription) for event, skip if eb.verbose { - eb.Printf("successfully sent event %v: %v", ce, resp) - } else { - eb.Printf("successfully sent event %s", ce.ID()) + eb.Printf("pattern rule does not match, skipping event (ID %s): %v", ce.ID(), ce) } - eb.stats.Invocations[ce.Subject()].Success() return nil } @@ -267,10 +299,8 @@ func (eb *EventBridgeProcessor) syncPatternMap(ctx context.Context, eventbus, ru } func (eb *EventBridgeProcessor) syncRules(ctx context.Context, eventbus, ruleARN string) error { - eb.mu.Lock() - // clear pattern map - eb.patternMap = make(map[string]string) - eb.mu.Unlock() + // reset pattern map + eb.patternMap.init() var ( found bool @@ -305,12 +335,10 @@ func (eb *EventBridgeProcessor) syncRules(ctx context.Context, eventbus, ruleARN eb.Println("warning: rule event pattern does not contain any subjects") } - eb.mu.Lock() for _, s := range e.Detail.Subject { eb.Printf("adding rule event forwarding pattern %q to processor", s) - eb.patternMap[s] = *rule.EventBusName + eb.patternMap.addSubject(s, *rule.EventBusName) } - eb.mu.Unlock() found = true break arnLoop