Skip to content

Commit

Permalink
Optimize pattern map locking
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Gasch <mgasch@vmware.com>
  • Loading branch information
Michael Gasch committed Nov 4, 2020
1 parent 642404d commit 0dd4cdc
Showing 1 changed file with 80 additions and 52 deletions.
132 changes: 80 additions & 52 deletions vmware-event-router/internal/processor/aws/aws_event_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,49 @@ const (
defaultBatchSize = 10 // max 10 input events per batch sent to AWS
)

// rules pattern to event bus mapping
type patternMap struct {
sync.RWMutex
subjects map[string]string
}

// matches checks whether the given subject is in the pattern map and returns
// the associated event bus
func (pm *patternMap) matches(subject string) (string, bool) {
pm.RLock()
defer pm.RUnlock()
bus, matched := pm.subjects[subject]
return bus, matched
}

// addRule adds a subject from the specified event bus to the pattern map
func (pm *patternMap) addSubject(subject, bus string) {
pm.Lock()
defer pm.Unlock()
pm.subjects[subject] = bus
}

// init initializes the pattern map
func (pm *patternMap) init() {
pm.Lock()
defer pm.Unlock()
pm.subjects = map[string]string{}
}

// EventBridgeProcessor implements the Processor interface
type EventBridgeProcessor struct {
session session.Session
eventbridgeiface.EventBridgeAPI
patternMap *patternMap

// options
verbose bool
resyncInterval time.Duration
batchSize int
*log.Logger

mu sync.RWMutex
patternMap map[string]string // rules pattern to event bus mapping
stats metrics.EventStats
mu sync.RWMutex
stats metrics.EventStats
}

// assert we implement Processor interface
Expand All @@ -62,7 +91,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve
resyncInterval: defaultResyncInterval,
batchSize: defaultBatchSize,
Logger: logger,
patternMap: make(map[string]string),
patternMap: &patternMap{},
}

// apply options
Expand Down Expand Up @@ -119,6 +148,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve
nextToken *string
)

eventBridge.patternMap.init()
for !found {
rules, err := eventBridge.ListRulesWithContext(ctx, &eventbridge.ListRulesInput{
EventBusName: aws.String(cfg.EventBus), // explicitly passing eventbus name because list assumes "default" otherwise
Expand Down Expand Up @@ -148,7 +178,7 @@ func NewEventBridgeProcessor(ctx context.Context, cfg *config.ProcessorConfigEve
}
for _, s := range e.Detail.Subject {
eventBridge.Printf("adding rule event forwarding pattern %q to processor", s)
eventBridge.patternMap[s] = *rule.EventBusName
eventBridge.patternMap.addSubject(s, *rule.EventBusName)
}
found = true
break arnLoop
Expand Down Expand Up @@ -190,60 +220,62 @@ func (eb *EventBridgeProcessor) Process(ctx context.Context, ce cloudevents.Even
eb.Printf("processing event (ID %s): %v", ce.ID(), ce)
}

subject := ce.Subject()
eb.mu.Lock()
defer eb.mu.Unlock()
// initialize invocation stats
if _, ok := eb.stats.Invocations[subject]; !ok {
eb.stats.Invocations[subject] = &metrics.InvocationDetails{}
}
eb.mu.Unlock()

if _, ok := eb.patternMap[ce.Subject()]; !ok {
// no event bridge rule pattern (subscription) for event, skip
if eb.verbose {
eb.Printf("pattern rule does not match, skipping event (ID %s): %v", ce.ID(), ce)
if bus, ok := eb.patternMap.matches(subject); ok {
jsonBytes, err := json.Marshal(ce)
if err != nil {
msg := fmt.Errorf("could not marshal event %v: %v", ce, err)
eb.Println(msg)
return processor.NewError(config.ProcessorEventBridge, msg)
}

return nil
}

jsonBytes, err := json.Marshal(ce)
if err != nil {
msg := fmt.Errorf("could not marshal event %v: %v", ce, err)
eb.Println(msg)
return processor.NewError(config.ProcessorEventBridge, msg)
}
jsonString := string(jsonBytes)
entry := eventbridge.PutEventsRequestEntry{
Detail: aws.String(jsonString),
EventBusName: aws.String(bus),
Source: aws.String(ce.Source()),
DetailType: aws.String(subject),
}

jsonString := string(jsonBytes)
entry := eventbridge.PutEventsRequestEntry{
Detail: aws.String(jsonString),
EventBusName: aws.String(eb.patternMap[ce.Subject()]),
Source: aws.String(ce.Source()),
DetailType: aws.String(ce.Subject()),
}
// TODO: add batching (metrics stats currently assume single item)
input := eventbridge.PutEventsInput{
Entries: []*eventbridge.PutEventsRequestEntry{&entry},
}

// check for existing topic entry in metrics
if _, ok := eb.stats.Invocations[ce.Subject()]; !ok {
eb.stats.Invocations[ce.Subject()] = &metrics.InvocationDetails{}
}
eb.Printf("sending event %s", ce.ID())
resp, err := eb.PutEventsWithContext(ctx, &input)

// TODO: add batching (metrics stats currently assume single item)
input := eventbridge.PutEventsInput{
Entries: []*eventbridge.PutEventsRequestEntry{&entry},
}
eb.mu.Lock()
defer eb.mu.Unlock()
if err != nil {
msg := fmt.Errorf("could not send event %v: %v", ce, err)
eb.Println(msg)
eb.stats.Invocations[subject].Failure()
return processor.NewError(config.ProcessorEventBridge, msg)
}

eb.Printf("sending event %s", ce.ID())
resp, err := eb.PutEventsWithContext(ctx, &input)
if eb.verbose {
eb.Printf("successfully sent event %v: %v", ce, resp)
} else {
eb.Printf("successfully sent event %s", ce.ID())
}

if err != nil {
msg := fmt.Errorf("could not send event %v: %v", ce, err)
eb.Println(msg)
eb.stats.Invocations[ce.Subject()].Failure()
return processor.NewError(config.ProcessorEventBridge, msg)
eb.stats.Invocations[subject].Success()
return nil
}

// no event bridge rule pattern (subscription) for event, skip
if eb.verbose {
eb.Printf("successfully sent event %v: %v", ce, resp)
} else {
eb.Printf("successfully sent event %s", ce.ID())
eb.Printf("pattern rule does not match, skipping event (ID %s): %v", ce.ID(), ce)
}

eb.stats.Invocations[ce.Subject()].Success()
return nil
}

Expand All @@ -267,10 +299,8 @@ func (eb *EventBridgeProcessor) syncPatternMap(ctx context.Context, eventbus, ru
}

func (eb *EventBridgeProcessor) syncRules(ctx context.Context, eventbus, ruleARN string) error {
eb.mu.Lock()
// clear pattern map
eb.patternMap = make(map[string]string)
eb.mu.Unlock()
// reset pattern map
eb.patternMap.init()

var (
found bool
Expand Down Expand Up @@ -305,12 +335,10 @@ func (eb *EventBridgeProcessor) syncRules(ctx context.Context, eventbus, ruleARN
eb.Println("warning: rule event pattern does not contain any subjects")
}

eb.mu.Lock()
for _, s := range e.Detail.Subject {
eb.Printf("adding rule event forwarding pattern %q to processor", s)
eb.patternMap[s] = *rule.EventBusName
eb.patternMap.addSubject(s, *rule.EventBusName)
}
eb.mu.Unlock()

found = true
break arnLoop
Expand Down

0 comments on commit 0dd4cdc

Please sign in to comment.