Skip to content

Commit

Permalink
Add bucket filter for notification events (#1167)
Browse files Browse the repository at this point in the history
  • Loading branch information
mccalltd authored May 27, 2023
1 parent 6e130ca commit 96a918e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 0 deletions.
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
type EventConfig struct {
pubsubProjectID string
pubsubTopic string
bucket string
prefix string
list []string
}
Expand All @@ -71,6 +72,7 @@ func Load(args []string) (Config, error) {
fs.UintVar(&cfg.Port, "port", 4443, "port to bind to")
fs.StringVar(&cfg.event.pubsubProjectID, "event.pubsub-project-id", "", "project ID containing the pubsub topic")
fs.StringVar(&cfg.event.pubsubTopic, "event.pubsub-topic", "", "pubsub topic name to publish events on")
fs.StringVar(&cfg.event.bucket, "event.bucket", "", "if not empty, only objects in this bucket will generate trigger events")
fs.StringVar(&cfg.event.prefix, "event.object-prefix", "", "if not empty, only objects having this prefix will generate trigger events")
fs.StringVar(&eventList, "event.list", eventFinalize, "comma separated list of events to publish on cloud function URl. Options are: finalize, delete, and metadataUpdate")
fs.StringVar(&cfg.bucketLocation, "location", "US-CENTRAL1", "location for buckets")
Expand Down Expand Up @@ -150,6 +152,7 @@ func (c *Config) ToFakeGcsOptions() fakestorage.Options {
eventOptions := notification.EventManagerOptions{
ProjectID: c.event.pubsubProjectID,
TopicName: c.event.pubsubTopic,
Bucket: c.event.bucket,
ObjectPrefix: c.event.prefix,
}
if c.event.pubsubProjectID != "" && c.event.pubsubTopic != "" {
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func TestToFakeGcsOptions(t *testing.T) {
event: EventConfig{
pubsubProjectID: "test-project",
pubsubTopic: "gcs-events",
bucket: "my-bucket",
prefix: "uploads/",
list: []string{"finalize", "delete"},
},
Expand All @@ -171,6 +172,7 @@ func TestToFakeGcsOptions(t *testing.T) {
EventOptions: notification.EventManagerOptions{
ProjectID: "test-project",
TopicName: "gcs-events",
Bucket: "my-bucket",
ObjectPrefix: "uploads/",
NotifyOn: notification.EventNotificationOptions{
Finalize: true,
Expand Down
8 changes: 8 additions & 0 deletions internal/notification/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type EventManagerOptions struct {
ProjectID string
// TopicName is the pubsub topic name to publish events on.
TopicName string
// Bucket is the name of the bucket to publish events from.
Bucket string
// ObjectPrefix, if not empty, only objects having this prefix will generate
// trigger events.
ObjectPrefix string
Expand All @@ -65,6 +67,8 @@ type PubsubEventManager struct {
notifyOn EventNotificationOptions
// writer is where logs are written to.
writer io.Writer
// bucket, if not empty, only objects from this bucker will generate trigger events.
bucket string
// objectPrefix, if not empty, only objects having this prefix will generate
// trigger events.
objectPrefix string
Expand All @@ -76,6 +80,7 @@ func NewPubsubEventManager(options EventManagerOptions, w io.Writer) (*PubsubEve
manager := &PubsubEventManager{
writer: w,
notifyOn: options.NotifyOn,
bucket: options.Bucket,
objectPrefix: options.ObjectPrefix,
}
if options.ProjectID != "" && options.TopicName != "" {
Expand All @@ -100,6 +105,9 @@ func (m *PubsubEventManager) Trigger(o *backend.StreamingObject, eventType Event
if m.publisher == nil {
return
}
if m.bucket != "" && o.BucketName != m.bucket {
return
}
if m.objectPrefix != "" && !strings.HasPrefix(o.Name, m.objectPrefix) {
return
}
Expand Down
38 changes: 38 additions & 0 deletions internal/notification/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
name string
notifyOn EventNotificationOptions
eventType string
bucket string
prefix string
metadata map[string]string
}{
Expand All @@ -42,6 +43,37 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
EventNotificationOptions{},
"",
"",
"",
nil,
},
{
"Finalize enabled, no bucket",
EventNotificationOptions{
Finalize: true,
},
string(EventFinalize),
"",
"",
nil,
},
{
"Finalize enabled, matching bucket",
EventNotificationOptions{
Finalize: true,
},
string(EventFinalize),
"some-bucket",
"",
nil,
},
{
"Finalize enabled, non-matching bucket",
EventNotificationOptions{
Finalize: true,
},
"",
"some-unmatched-bucket",
"",
nil,
},
{
Expand All @@ -51,6 +83,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
},
string(EventFinalize),
"",
"",
nil,
},
{
Expand All @@ -59,6 +92,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
Finalize: true,
},
string(EventFinalize),
"",
"files/",
nil,
},
Expand All @@ -68,6 +102,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
Finalize: true,
},
"",
"",
"uploads/",
nil,
},
Expand All @@ -78,6 +113,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
},
string(EventDelete),
"",
"",
nil,
},
{
Expand All @@ -87,6 +123,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
},
string(EventMetadata),
"",
"",
newMetadata,
},
}
Expand All @@ -107,6 +144,7 @@ func TestPubsubEventManager_Trigger(t *testing.T) {
obj := bufferedObj.StreamingObject()
eventManager := PubsubEventManager{
notifyOn: test.notifyOn,
bucket: test.bucket,
objectPrefix: test.prefix,
}
publisher := &mockPublisher{}
Expand Down

0 comments on commit 96a918e

Please sign in to comment.