diff --git a/Dockerfile.pubsub-emulator b/Dockerfile.pubsub-emulator new file mode 100644 index 000000000..f4aca0eb0 --- /dev/null +++ b/Dockerfile.pubsub-emulator @@ -0,0 +1,3 @@ +# gcloud sdk for pubsub emulator with netcat added for the startup health check +FROM google/cloud-sdk:443.0.0@sha256:a59335d227a98b41ecdf6ff3d69923b8aef0703694e58992ecb75c7147140d37 +RUN apt-get install -y netcat diff --git a/cmd/rekor-server/app/root.go b/cmd/rekor-server/app/root.go index 34ba087cd..04a47fcd9 100644 --- a/cmd/rekor-server/app/root.go +++ b/cmd/rekor-server/app/root.go @@ -93,6 +93,10 @@ func init() { Memory and file-based signers should only be used for testing.`) rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key") + rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set. Supported providers: [gcppubsub]") + rootCmd.PersistentFlags().Bool("rekor_server.publish_events_protobuf", false, "Whether to publish events in Protobuf wire format. Applies to all enabled event types.") + rootCmd.PersistentFlags().Bool("rekor_server.publish_events_json", false, "Whether to publish events in CloudEvents JSON format. Applies to all enabled event types.") + rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to") rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint") diff --git a/cmd/rekor-server/app/serve.go b/cmd/rekor-server/app/serve.go index a8cc161c3..fa396a065 100644 --- a/cmd/rekor-server/app/serve.go +++ b/cmd/rekor-server/app/serve.go @@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{ Short: "start http server with configured api", Long: `Starts a http server and serves the configured api`, Run: func(cmd *cobra.Command, args []string) { - // Setup the logger to dev/prod log.ConfigureLogger(viper.GetString("log_type")) @@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{ log.Logger.Error(err) } }() - //TODO: make this a config option for server to load via viper field //TODO: add command line option to print versions supported in binary @@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{ hashedrekord.KIND: {hashedrekord_v001.APIVERSION}, dsse.KIND: {dsse_v001.APIVERSION}, } - for k, v := range pluggableTypeMap { log.Logger.Infof("Loading support for pluggable type '%v'", k) log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 02ca48e92..f1b8d712d 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -20,7 +20,8 @@ services: context: . target: "test" environment: - - TMPDIR=/var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294 + TMPDIR: /var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294 + PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085 command: [ "rekor-server", "-test.coverprofile=rekor-server.cov", @@ -34,7 +35,29 @@ services: "--enable_attestation_storage", "--attestation_storage_bucket=file:///var/run/attestations", "--max_request_body_size=32792576", + "--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry", + "--rekor_server.publish_events_json=true", ] ports: - "3000:3000" - "2112:2112" + depends_on: + - gcp-pubsub-emulator + gcp-pubsub-emulator: + image: gcp-pubsub-emulator + ports: + - "8085:8085" + command: + - gcloud + - beta + - emulators + - pubsub + - start + - --host-port=0.0.0.0:8085 + - --project=test-project + healthcheck: + test: ["CMD", "nc", "-zv", "localhost", "8085"] + interval: 10s + timeout: 3s + retries: 3 + start_period: 10s diff --git a/docker-compose.yml b/docker-compose.yml index f75c3dea1..dfcd99611 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -114,4 +114,3 @@ services: timeout: 3s retries: 3 start_period: 5s - diff --git a/go.mod b/go.mod index 0be6ca9ad..908f7c150 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( ) require ( + cloud.google.com/go/pubsub v1.33.0 github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18 github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7 github.com/go-redis/redismock/v9 v9.0.3 @@ -183,7 +184,7 @@ require ( golang.org/x/term v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/api v0.135.0 // indirect + google.golang.org/api v0.135.0 google.golang.org/appengine v1.6.7 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 1d7863b08..e452c46ea 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g= +cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= diff --git a/pkg/api/api.go b/pkg/api/api.go index 12925b6bd..b033a022a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/sigstore/rekor/pkg/log" + "github.com/sigstore/rekor/pkg/pubsub" "github.com/sigstore/rekor/pkg/sharding" "github.com/sigstore/rekor/pkg/signer" "github.com/sigstore/rekor/pkg/storage" @@ -39,6 +40,8 @@ import ( "github.com/sigstore/sigstore/pkg/cryptoutils" "github.com/sigstore/sigstore/pkg/signature" "github.com/sigstore/sigstore/pkg/signature/options" + + _ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation ) func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) { @@ -63,6 +66,9 @@ type API struct { signer signature.Signer // stops checkpoint publishing checkpointPublishCancel context.CancelFunc + // Publishes notifications when new entries are added to the log. May be + // nil if no publisher is configured. + newEntryPublisher pubsub.Publisher } func NewAPI(treeID uint) (*API, error) { @@ -112,6 +118,18 @@ func NewAPI(treeID uint) (*API, error) { pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b) + var newEntryPublisher pubsub.Publisher + if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" { + if !viper.GetBool("rekor_server.publish_events_protobuf") && !viper.GetBool("rekor_server.publish_events_json") { + return nil, fmt.Errorf("%q is configured but neither %q or %q are enabled", "new_entry_publisher", "publish_events_protobuf", "publish_events_json") + } + newEntryPublisher, err = pubsub.Get(ctx, p) + if err != nil { + return nil, fmt.Errorf("init event publisher: %w", err) + } + log.ContextLogger(ctx).Infof("Initialized new entry event publisher: %s", p) + } + return &API{ // Transparency Log Stuff logClient: logClient, @@ -121,6 +139,8 @@ func NewAPI(treeID uint) (*API, error) { pubkey: string(pubkey), pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]), signer: rekorSigner, + // Utility functionality not required for operation of the core service + newEntryPublisher: newEntryPublisher, }, nil } @@ -166,4 +186,8 @@ func ConfigureAPI(treeID uint) { func StopAPI() { api.checkpointPublishCancel() + + if api.newEntryPublisher != nil { + api.newEntryPublisher.Close() + } } diff --git a/pkg/api/entries.go b/pkg/api/entries.go index 94401e1c7..daa8d8423 100644 --- a/pkg/api/entries.go +++ b/pkg/api/entries.go @@ -36,10 +36,14 @@ import ( "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc/codes" + "github.com/sigstore/rekor/pkg/events" + "github.com/sigstore/rekor/pkg/events/newentry" "github.com/sigstore/rekor/pkg/generated/models" "github.com/sigstore/rekor/pkg/generated/restapi/operations/entries" "github.com/sigstore/rekor/pkg/log" + "github.com/sigstore/rekor/pkg/pubsub" "github.com/sigstore/rekor/pkg/sharding" + "github.com/sigstore/rekor/pkg/tle" "github.com/sigstore/rekor/pkg/trillianclient" "github.com/sigstore/rekor/pkg/types" "github.com/sigstore/rekor/pkg/util" @@ -290,7 +294,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl RootHash: swag.String(hex.EncodeToString(root.RootHash)), LogIndex: swag.Int64(queuedLeaf.LeafIndex), Hashes: hashes, - Checkpoint: stringPointer(string(scBytes)), + Checkpoint: swag.String(string(scBytes)), } logEntryAnon.Verification = &models.LogEntryAnonVerification{ @@ -301,9 +305,66 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl logEntry := models.LogEntry{ entryID: logEntryAnon, } + + if api.newEntryPublisher != nil { + // Publishing notifications should not block the API response. + go func() { + verifiers, err := entry.Verifiers() + if err != nil { + incPublishEvent(newentry.Name, "", false) + log.ContextLogger(ctx).Errorf("Could not get verifiers for log entry %s: %v", entryID, err) + return + } + var subjects []string + for _, v := range verifiers { + subjects = append(subjects, v.Subjects()...) + } + + pbEntry, err := tle.GenerateTransparencyLogEntry(logEntryAnon) + if err != nil { + incPublishEvent(newentry.Name, "", false) + log.ContextLogger(ctx).Error(err) + return + } + event, err := newentry.New(entryID, pbEntry, subjects) + if err != nil { + incPublishEvent(newentry.Name, "", false) + log.ContextLogger(ctx).Error(err) + return + } + if viper.GetBool("rekor_server.publish_events_protobuf") { + go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeProtobuf) + } + if viper.GetBool("rekor_server.publish_events_json") { + go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeJSON) + } + }() + } + return logEntry, nil } +func publishEvent(ctx context.Context, publisher pubsub.Publisher, event *events.Event, contentType events.EventContentType) { + err := publisher.Publish(context.WithoutCancel(ctx), event, contentType) + incPublishEvent(event.Type().Name(), contentType, err == nil) + if err != nil { + log.ContextLogger(ctx).Error(err) + } +} + +func incPublishEvent(event string, contentType events.EventContentType, ok bool) { + status := "SUCCESS" + if !ok { + status = "ERROR" + } + labels := map[string]string{ + "event": event, + "status": status, + "content_type": string(contentType), + } + metricPublishEvents.With(labels).Inc() +} + // CreateLogEntryHandler creates new entry into log func CreateLogEntryHandler(params entries.CreateLogEntryParams) middleware.Responder { httpReq := params.HTTPRequest diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 8f0efb9d6..64c91b493 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -29,6 +29,11 @@ var ( Help: "The total number of new log entries", }) + metricPublishEvents = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "rekor_publish_events", + Help: "The status of publishing events to Pub/Sub", + }, []string{"event", "content_type", "status"}) + MetricLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "rekor_api_latency", Help: "Api Latency on calls", diff --git a/pkg/events/doc.go b/pkg/events/doc.go new file mode 100644 index 000000000..42ce55fd9 --- /dev/null +++ b/pkg/events/doc.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events provides methods for working with CloudEvents. +package events + +// The version of the CloudEvents specification the package adheres to. +const CloudEventsSpecVersion = "1.0" diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 000000000..0f5bff74e --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,198 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package events +package events + +import ( + "encoding/json" + "fmt" + "time" + + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" + + pb "github.com/sigstore/protobuf-specs/gen/pb-go/events/v1" +) + +// The content type of an event. +type EventContentType string + +const ( + ContentTypeProtobuf EventContentType = "application/protobuf" + ContentTypeJSON EventContentType = "application/json" +) + +// Keys that cannot be used in the optional event attributes map due to +// collisions with mandatory fields in the CloudEvents spec. +var reservedFields = map[string]struct{}{ + "datacontenttype": {}, + "data": {}, + "id": {}, + "source": {}, + "specversion": {}, + "type": {}, +} + +// Event is an instance of a certain event type. +type Event struct { + ty *EventType + id string + msg protoreflect.ProtoMessage + attrs map[string]any +} + +// Type returns the underlying event type. +func (e Event) Type() *EventType { + return e.ty +} + +// ID returns the identifier for the event. +func (e Event) ID() string { + return e.id +} + +// Attributes returns the attributes attached to the event. These attributes +// are optional and this function may return an empty map. +func (e Event) Attributes() map[string]any { + return e.attrs +} + +// Message returns the underlying message (aka data) of the event. +func (e Event) Message() protoreflect.ProtoMessage { + return e.msg +} + +// New creates a new event of a given type. +// +// The "id" and "msg" parameters are required. The "attributes" parameter +// supports the following value types: +// - string +// - int +// - bool +// - []byte +// - time.Time +func (t *EventType) New(id string, msg protoreflect.ProtoMessage, attributes map[string]any) (*Event, error) { + if id == "" { + return nil, fmt.Errorf("id must be set") + } + if msg == nil { + return nil, fmt.Errorf("msg must be set") + } + ty := msg.ProtoReflect().Descriptor().FullName() + if tty := t.Descriptor().FullName(); ty != tty { + return nil, fmt.Errorf("msg type %q does not match expected type %q", ty, tty) + } + + for name, value := range attributes { + if _, ok := reservedFields[name]; ok { + return nil, fmt.Errorf("attribute name %q is one of the reserved CloudEvents names %v", name, reservedFields) + } + switch v := value.(type) { + case string, bool, int, []byte, time.Time: + // Allowed types are a no-op. + default: + return nil, fmt.Errorf("unsupported attribute type for %q: %T", name, v) + } + } + + return &Event{ty: t, id: id, msg: msg, attrs: attributes}, nil +} + +// MarshalJSON serializes the event to JSON, following the CloudEvents +// specification. +func (e *Event) MarshalJSON() ([]byte, error) { + data, err := protojson.Marshal(e.msg) + if err != nil { + return nil, fmt.Errorf("marshal data to JSON: %w", err) + } + + event := map[string]any{ + "specversion": CloudEventsSpecVersion, + "id": e.ID(), + "source": e.Type().Source(), + "type": e.Type().Name(), + "datacontenttype": ContentTypeJSON, + "data": string(data), + } + for k, v := range e.attrs { + event[k] = v + } + + return json.Marshal(event) +} + +// MarshalProto serializes the event to the CloudEvents Protobuf wire format. +func (e *Event) MarshalProto() ([]byte, error) { + msg, err := e.Proto() + if err != nil { + return nil, fmt.Errorf("build proto: %w", err) + } + return proto.Marshal(msg) +} + +// Proto returns the CloudEvents protobuf for the event. +func (e *Event) Proto() (*pb.CloudEvent, error) { + data, err := proto.Marshal(e.msg) + if err != nil { + return nil, fmt.Errorf("marshal data: %w", err) + } + + attrs := make(map[string]*pb.CloudEvent_CloudEventAttributeValue) + for name, value := range e.attrs { + switch v := value.(type) { + case string: + attrs[name] = &pb.CloudEvent_CloudEventAttributeValue{ + Attr: &pb.CloudEvent_CloudEventAttributeValue_CeString{CeString: v}, + } + case bool: + attrs[name] = &pb.CloudEvent_CloudEventAttributeValue{ + Attr: &pb.CloudEvent_CloudEventAttributeValue_CeBoolean{CeBoolean: v}, + } + case int: + attrs[name] = &pb.CloudEvent_CloudEventAttributeValue{ + Attr: &pb.CloudEvent_CloudEventAttributeValue_CeInteger{CeInteger: int32(v)}, + } + case time.Time: + attrs[name] = &pb.CloudEvent_CloudEventAttributeValue{ + Attr: &pb.CloudEvent_CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: timestamppb.New(v), + }, + } + case []byte: + attrs[name] = &pb.CloudEvent_CloudEventAttributeValue{ + Attr: &pb.CloudEvent_CloudEventAttributeValue_CeBytes{CeBytes: v}, + } + } + } + + event := &pb.CloudEvent{ + SpecVersion: CloudEventsSpecVersion, + Id: e.ID(), + Source: e.Type().Source(), + Type: e.Type().Name(), + Attributes: attrs, + Data: &pb.CloudEvent_ProtoData{ + ProtoData: &anypb.Any{ + Value: data, + TypeUrl: string(e.Type().Descriptor().FullName()), + }, + }, + } + + return event, nil +} diff --git a/pkg/events/newentry/new_entry.go b/pkg/events/newentry/new_entry.go new file mode 100644 index 000000000..a1d19eda1 --- /dev/null +++ b/pkg/events/newentry/new_entry.go @@ -0,0 +1,47 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package newentry + +import ( + "strings" + "time" + + "github.com/sigstore/rekor/pkg/events" + "golang.org/x/exp/slices" + + rekor_pb "github.com/sigstore/protobuf-specs/gen/pb-go/rekor/v1" +) + +const ( + Name = "dev.sigstore.rekor.events.v1.NewEntry" + Source = "/createLogEntry" +) + +var ty *events.EventType + +func init() { + empty := &rekor_pb.TransparencyLogEntry{} + ty = events.RegisterType(Name, Source, empty.ProtoReflect().Descriptor()) +} + +func New(id string, entry *rekor_pb.TransparencyLogEntry, subjects []string) (*events.Event, error) { + slices.Sort(subjects) // Must be sorted for consistency. + attrs := map[string]any{ + "time": time.Unix(entry.GetIntegratedTime(), 0), + "rekor_entry_kind": entry.GetKindVersion().GetKind(), + "rekor_signing_subjects": strings.Join(subjects, ","), + } + return ty.New(id, entry, attrs) +} diff --git a/pkg/events/newentry/new_entry_test.go b/pkg/events/newentry/new_entry_test.go new file mode 100644 index 000000000..0f528a6b7 --- /dev/null +++ b/pkg/events/newentry/new_entry_test.go @@ -0,0 +1,114 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package newentry + +import ( + "math" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/sigstore/rekor/pkg/events" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" + + events_pb "github.com/sigstore/protobuf-specs/gen/pb-go/events/v1" + rekor_pb "github.com/sigstore/protobuf-specs/gen/pb-go/rekor/v1" +) + +func TestBuildNewEntryEvent(t *testing.T) { + t.Parallel() + + decamillennium := time.Date(9999, 12, 31, 24, 59, 59, math.MaxInt, time.UTC).Unix() + testEntry := &rekor_pb.TransparencyLogEntry{ + IntegratedTime: decamillennium, + KindVersion: &rekor_pb.KindVersion{ + Kind: "test_kind", + }, + } + marshalledEntry, err := proto.Marshal(testEntry) + if err != nil { + t.Fatal(err) + } + + testCases := []struct { + desc string + entryID string + subjects []string + entry *rekor_pb.TransparencyLogEntry + + wantError bool + want *events_pb.CloudEvent + }{ + { + desc: "missing ID", + subjects: []string{"test@rekor.dev"}, + entry: testEntry, + wantError: true, + }, + { + desc: "valid", + entryID: "test-id", + subjects: []string{"test@rekor.dev", "foo@bar.baz"}, // Output should be sorted. + entry: testEntry, + want: &events_pb.CloudEvent{ + SpecVersion: events.CloudEventsSpecVersion, + Id: "test-id", + Source: Source, + Type: Name, + Attributes: map[string]*events_pb.CloudEvent_CloudEventAttributeValue{ + "time": {Attr: &events_pb.CloudEvent_CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: ×tamppb.Timestamp{Seconds: decamillennium}, + }}, + "rekor_entry_kind": {Attr: &events_pb.CloudEvent_CloudEventAttributeValue_CeString{ + CeString: "test_kind", + }}, + "rekor_signing_subjects": {Attr: &events_pb.CloudEvent_CloudEventAttributeValue_CeString{ + CeString: "foo@bar.baz,test@rekor.dev", + }}, + }, + Data: &events_pb.CloudEvent_ProtoData{ + ProtoData: &anypb.Any{ + Value: marshalledEntry, + TypeUrl: string(testEntry.ProtoReflect().Descriptor().FullName()), + }, + }, + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + event, err := New(tc.entryID, tc.entry, tc.subjects) + gotErr := err != nil + if gotErr != tc.wantError { + t.Fatalf("New() err = %v, want %v", gotErr, tc.wantError) + } + if err != nil { + return + } + msg, err := event.Proto() + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(msg, tc.want, protocmp.Transform()); diff != "" { + t.Errorf("New() unexpected diff:\n%s", diff) + } + }) + } +} diff --git a/pkg/events/types.go b/pkg/events/types.go new file mode 100644 index 000000000..2dd13f458 --- /dev/null +++ b/pkg/events/types.go @@ -0,0 +1,100 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "fmt" + "sync" + + "google.golang.org/protobuf/reflect/protoreflect" +) + +var ( + registeredEventTypes = map[string]*EventType{} + mu sync.Mutex +) + +// EventType describes the type of an event. +type EventType struct { + name string + source string + desc protoreflect.MessageDescriptor +} + +// Name returns the unique name for the event type. +func (t EventType) Name() string { + return t.name +} + +// Source returns the source for the event type. +func (t EventType) Source() string { + return t.source +} + +// Descriptor returns the message descriptor for messages of the event type. +func (t EventType) Descriptor() protoreflect.MessageDescriptor { + return t.desc +} + +// RegisterType registers a new event type. It is intended for use in init() +// functions for each event type. It will panic if errors are encountered. +func RegisterType(name, source string, desc protoreflect.MessageDescriptor) *EventType { + mu.Lock() + defer mu.Unlock() + + if name == "" { + panic("event name must be set") + } + if source == "" { + panic("event source must be set") + } + if desc == nil { + panic("event descriptor must be set") + } + if _, ok := registeredEventTypes[name]; ok { + panic("event has already been registered: " + name) + } + ty := &EventType{ + name: name, + source: source, + desc: desc, + } + registeredEventTypes[name] = ty + return ty +} + +// EventNotFoundError indicates that no matching PubSub provider was found. +type EventNotFoundError struct { + name string +} + +func (e *EventNotFoundError) Error() string { + return fmt.Sprintf("event type not found: %s", e.name) +} + +// Get returns an event type by name. +func Get(name string) (*EventType, error) { + v, ok := registeredEventTypes[name] + if !ok { + return nil, &EventNotFoundError{name: name} + } + return v, nil +} + +// RegisteredTypes returns a map of all registered event types. The key is the +// event type name. +func RegisteredTypes() map[string]*EventType { + return registeredEventTypes +} diff --git a/pkg/pubsub/doc.go b/pkg/pubsub/doc.go new file mode 100644 index 000000000..7263e044e --- /dev/null +++ b/pkg/pubsub/doc.go @@ -0,0 +1,17 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package pubsub provides an interface and implementations for publishing +// notifications for Rekor updates to a Pub/Sub system. +package pubsub diff --git a/pkg/pubsub/gcp/publisher.go b/pkg/pubsub/gcp/publisher.go new file mode 100644 index 000000000..873a9f9a9 --- /dev/null +++ b/pkg/pubsub/gcp/publisher.go @@ -0,0 +1,159 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package gcp implements the pubsub.Publisher with Google Cloud Pub/Sub. +package gcp + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "regexp" + "sync" + "time" + + "github.com/sigstore/rekor/pkg/events" + sigpubsub "github.com/sigstore/rekor/pkg/pubsub" + + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" +) + +func init() { + sigpubsub.AddProvider(URIIdentifier, func(ctx context.Context, topicResourceID string) (sigpubsub.Publisher, error) { + return New(ctx, topicResourceID) + }) +} + +const URIIdentifier = "gcppubsub://" + +var ( + // Copied from https://github.com/google/go-cloud/blob/master/pubsub/gcppubsub/gcppubsub.go + re = regexp.MustCompile(`^gcppubsub://projects/([^/]+)/topics/([^/]+)$`) + // Minimal set of permissions needed to check if the server can publish to the configured topic. + // https://cloud.google.com/pubsub/docs/access-control#required_permissions + requiredIAMPermissions = []string{ + "pubsub.topics.get", + "pubsub.topics.publish", + } +) + +type Publisher struct { + client *pubsub.Client + topic string + wg *sync.WaitGroup +} + +func New(ctx context.Context, topicResourceID string, opts ...option.ClientOption) (*Publisher, error) { + projectID, topic, err := parseRef(topicResourceID) + if err != nil { + return nil, fmt.Errorf("parse ref: %w", err) + } + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, fmt.Errorf("create pubsub client for project %q: %w", projectID, err) + } + + // The PubSub emulator does not support IAM methods, and will block the + // server start up if they are called. If the environment variable is set, + // skip this check. + if os.Getenv("PUBSUB_EMULATOR_HOST") == "" { + if _, err := client.Topic(topic).IAM().TestPermissions(ctx, requiredIAMPermissions); err != nil { + return nil, fmt.Errorf("insufficient permissions for topic %q: %w", topic, err) + } + } + + return &Publisher{ + client: client, + topic: topic, + wg: new(sync.WaitGroup), + }, nil +} + +func (p *Publisher) Publish(ctx context.Context, event *events.Event, encoding events.EventContentType) error { + p.wg.Add(1) + defer p.wg.Done() + + var data []byte + var err error + switch encoding { + case events.ContentTypeProtobuf: + data, err = event.MarshalProto() + case events.ContentTypeJSON: + data, err = event.MarshalJSON() + default: + err = fmt.Errorf("unsupported encoding: %s", encoding) + } + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + + msg := &pubsub.Message{ + Data: data, + Attributes: gcpAttrs(event, encoding), + } + + // The Publish call does not block. + res := p.client.Topic(p.topic).Publish(ctx, msg) + + // TODO: Consider making the timeout configurable. + cctx, cancel := context.WithTimeout(ctx, pubsub.DefaultPublishSettings.Timeout) + defer cancel() + + // This Get call blocks until a response occurs, or the deadline is reached. + if _, err := res.Get(cctx); err != nil { + return fmt.Errorf("publish event %s to topic %q: %w", event.ID(), p.topic, err) + } + return nil +} + +func (p *Publisher) Close() error { + p.wg.Wait() + return p.client.Close() +} + +func parseRef(ref string) (projectID, topic string, err error) { + v := re.FindStringSubmatch(ref) + if len(v) != 3 { + err = fmt.Errorf("invalid gcppubsub format %q", ref) + return + } + projectID, topic = v[1], v[2] + return +} + +// GCP Pub/Sub attributes can be used to filter events server-side, reducing +// the processing for the client and reducing GCP costs for egress fees. +func gcpAttrs(event *events.Event, dataType events.EventContentType) map[string]string { + attrs := map[string]string{ + "source": event.Type().Source(), + "type": event.Type().Name(), + "datacontenttype": string(dataType), + } + for name, value := range event.Attributes() { + switch v := value.(type) { + case string: + attrs[name] = v + case time.Time: + attrs[name] = v.Format(time.RFC3339) + case []byte: + attrs[name] = base64.StdEncoding.EncodeToString(v) + default: + attrs[name] = fmt.Sprint(v) + } + } + + return attrs +} diff --git a/pkg/pubsub/gcp/publisher_test.go b/pkg/pubsub/gcp/publisher_test.go new file mode 100644 index 000000000..9735f3001 --- /dev/null +++ b/pkg/pubsub/gcp/publisher_test.go @@ -0,0 +1,141 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcp + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/sigstore/rekor/pkg/events" + "google.golang.org/protobuf/types/known/emptypb" +) + +func TestParseRef(t *testing.T) { + t.Parallel() + testCases := []struct { + desc string + ref string + + wantProject string + wantTopic string + wantErr bool + }{ + + { + desc: "Valid example", + ref: "gcppubsub://projects/project-foo/topics/topic-bar", + wantProject: "project-foo", + wantTopic: "topic-bar", + }, + { + desc: "Empty ref", + wantErr: true, + }, + { + desc: "Missing topic", + ref: "gcppubsub://projects/project-foo/topics/", + wantErr: true, + }, + { + desc: "Wrong scheme", + ref: "foo://projects/project-foo/topics/topic-bar", + wantErr: true, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + project, topic, err := parseRef(tc.ref) + gotErr := err != nil + if gotErr != tc.wantErr { + t.Errorf("parseRef(%s) error = %v, wantErr %v", tc.ref, gotErr, tc.wantErr) + return + } + if project != tc.wantProject { + t.Errorf("parseRef(%s) project = %s, want %s", tc.ref, project, tc.wantProject) + } + if topic != tc.wantTopic { + t.Errorf("parseRef(%s) topic = %s, want %s", tc.ref, topic, tc.wantTopic) + } + }) + } +} + +func TestGCPAttrs(t *testing.T) { + t.Parallel() + + empty := &emptypb.Empty{} + ty := events.RegisterType("gcpAttrsTestEvent", "/source", empty.ProtoReflect().Descriptor()) + + coreEvent, err := ty.New("A123-456", &emptypb.Empty{}, nil) + if err != nil { + t.Fatal(err) + } + attrs := map[string]any{ + "attr_string": "string", + "attr_bool": true, + "attr_int": 123, + "attr_bytes": []byte("hello"), + "attr_timestamp": time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(time.Second), + } + attrsEvent, err := ty.New("A456-789", &emptypb.Empty{}, attrs) + if err != nil { + t.Fatal(err) + } + + testCases := []struct { + desc string + event *events.Event + want map[string]string + }{ + { + desc: "Core attrs only", + event: coreEvent, + want: map[string]string{ + "datacontenttype": "application/fake-test-mime", + "source": "/source", + "type": "gcpAttrsTestEvent", + }, + }, + { + desc: "With optional attrs", + event: attrsEvent, + want: map[string]string{ + "datacontenttype": "application/fake-test-mime", + "source": "/source", + "type": "gcpAttrsTestEvent", + "attr_string": "string", + "attr_int": "123", + "attr_bool": "true", + "attr_bytes": "aGVsbG8=", + "attr_timestamp": time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(time.Second).Format(time.RFC3339), + }, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + got := gcpAttrs(tc.event, "application/fake-test-mime") + if diff := cmp.Diff(got, tc.want); diff != "" { + t.Errorf("unexpected diff:\n%s", diff) + } + }) + } +} diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go new file mode 100644 index 000000000..475c11b1b --- /dev/null +++ b/pkg/pubsub/publisher.go @@ -0,0 +1,74 @@ +// Copyright 2023 The Sigstore Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "context" + "fmt" + "strings" + + "github.com/sigstore/rekor/pkg/events" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" +) + +// Publisher provides methods for publishing events to a Pub/Sub topic. +type Publisher interface { + // Publish publishes a CloudEvent to the configured Pub/Sub topic serialized + // using the specified encoding type. + Publish(ctx context.Context, event *events.Event, encoding events.EventContentType) error + // Close safely closes any active connections. + Close() error +} + +// ProviderNotFoundError indicates that no matching PubSub provider was found. +type ProviderNotFoundError struct { + ref string +} + +func (e *ProviderNotFoundError) Error() string { + return fmt.Sprintf("no pubsub provider found for key reference: %s", e.ref) +} + +// ProviderInit is a function that initializes provider-specific Publisher. +type ProviderInit func(ctx context.Context, topicResourceID string) (Publisher, error) + +// AddProvider adds the provider implementation into the local cache +func AddProvider(uri string, init ProviderInit) { + providersMap[uri] = init +} + +var providersMap = map[string]ProviderInit{} + +// Get returns a Publisher for the given resource string and hash function. +// If no matching provider is found, Get returns a ProviderNotFoundError. It +// also returns an error if initializing the Publisher fails. If no resource +// is supplied, it returns a nil Publisher and no error. +func Get(ctx context.Context, topicResourceID string) (Publisher, error) { + for ref, pi := range providersMap { + if strings.HasPrefix(topicResourceID, ref) { + return pi(ctx, topicResourceID) + } + } + return nil, &ProviderNotFoundError{ref: topicResourceID} +} + +// SupportedProviders returns list of initialized providers +func SupportedProviders() []string { + names := maps.Keys(providersMap) + slices.Sort(names) + return names +} diff --git a/pkg/tle/tle.go b/pkg/tle/tle.go index 92676d0e9..2e51f5bcc 100644 --- a/pkg/tle/tle.go +++ b/pkg/tle/tle.go @@ -50,12 +50,22 @@ func GenerateTransparencyLogEntry(anon models.LogEntryAnon) (*rekor_pb.Transpare inclusionProofHashes[i] = hashBytes } - b, err := base64.StdEncoding.DecodeString(anon.Body.(string)) - if err != nil { - return nil, fmt.Errorf("base64 decoding body: %w", err) + // Different call paths may supply string or []byte. If string, it is base64 encoded. + var body []byte + switch v := anon.Body.(type) { + case string: + b, err := base64.StdEncoding.DecodeString(v) + if err != nil { + return nil, fmt.Errorf("base64 decoding body: %w", err) + } + body = b + case []byte: + body = v + default: + return nil, fmt.Errorf("body is not string or []byte: (%T)%v", v, v) } - pe, err := models.UnmarshalProposedEntry(bytes.NewReader(b), runtime.JSONConsumer()) + pe, err := models.UnmarshalProposedEntry(bytes.NewReader(body), runtime.JSONConsumer()) if err != nil { return nil, err } @@ -86,7 +96,7 @@ func GenerateTransparencyLogEntry(anon models.LogEntryAnon) (*rekor_pb.Transpare Envelope: *anon.Verification.InclusionProof.Checkpoint, }, }, - CanonicalizedBody: b, // we don't call eimpl.Canonicalize in the case that the logic is different in this caller vs when it was persisted in the log + CanonicalizedBody: body, // we don't call eimpl.Canonicalize in the case that the logic is different in this caller vs when it was persisted in the log }, nil } diff --git a/tests/e2e-test.sh b/tests/e2e-test.sh index a8472b9fb..f57550621 100755 --- a/tests/e2e-test.sh +++ b/tests/e2e-test.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash # # Copyright 2021 The Sigstore Authors. # @@ -22,8 +22,11 @@ rm -f /tmp/rekor-*.cov echo "installing gocovmerge" make gocovmerge +echo "building test-only containers" +docker build -t gcp-pubsub-emulator -f Dockerfile.pubsub-emulator . + echo "starting services" -docker-compose -f docker-compose.yml -f docker-compose.test.yml up -d --build +docker compose -f docker-compose.yml -f docker-compose.test.yml up -d --build echo "building CLI and server" go test -c ./cmd/rekor-cli -o rekor-cli -cover -covermode=count -coverpkg=./... @@ -31,15 +34,17 @@ go test -c ./cmd/rekor-server -o rekor-server -covermode=count -coverpkg=./... count=0 -echo -n "waiting up to 60 sec for system to start" -until [ $(docker-compose ps | grep -c "(healthy)") == 3 ]; +echo "waiting up to 2 min for system to start" +until [ $(docker compose ps | \ + grep -E "(rekor-mysql|rekor-redis|rekor-server|gcp-pubsub-emulator)" | \ + grep -c "(healthy)" ) == 4 ]; do - if [ $count -eq 6 ]; then + if [ $count -eq 24 ]; then echo "! timeout reached" exit 1 else echo -n "." - sleep 10 + sleep 5 let 'count+=1' fi done @@ -50,27 +55,33 @@ REKORTMPDIR="$(mktemp -d -t rekor_test.XXXXXX)" touch $REKORTMPDIR.rekor.yaml trap "rm -rf $REKORTMPDIR" EXIT if ! REKORTMPDIR=$REKORTMPDIR go test -tags=e2e ./tests/ -run TestIssue1308; then - docker-compose logs --no-color > /tmp/docker-compose.log + docker compose logs --no-color > /tmp/docker compose.log exit 1 fi -if ! REKORTMPDIR=$REKORTMPDIR go test -tags=e2e ./tests/; then - docker-compose logs --no-color > /tmp/docker-compose.log +if ! REKORTMPDIR=$REKORTMPDIR PUBSUB_EMULATOR_HOST=localhost:8085 go test -tags=e2e ./tests/; then + docker compose logs --no-color > /tmp/docker compose.log exit 1 fi -if docker-compose logs --no-color | grep -q "panic: runtime error:" ; then +if docker compose logs --no-color | grep -q "panic: runtime error:" ; then # if we're here, we found a panic echo "Failing due to panics detected in logs" - docker-compose logs --no-color > /tmp/docker-compose.log + docker compose logs --no-color > /tmp/docker compose.log exit 1 fi echo "generating code coverage" -docker-compose restart rekor-server +docker compose restart rekor-server + +# docker compose appears to name the containers slightly differently in GHA CI vs locally on macOS +container_name="rekor_rekor-server" +if [[ "$(uname -s)" -eq "Darwin" ]]; then + container_name="rekor-rekor-server" +fi -if ! docker cp $(docker ps -aqf "name=rekor_rekor-server"):go/rekor-server.cov /tmp/rekor-server.cov ; then +if ! docker cp $(docker ps -aqf "name=${container_name}"):/go/rekor-server.cov /tmp/rekor-server.cov ; then # failed to copy code coverage report from server echo "Failed to retrieve server code coverage report" - docker-compose logs --no-color > /tmp/docker-compose.log + docker compose logs --no-color > /tmp/docker compose.log exit 1 fi diff --git a/tests/e2e_test.go b/tests/e2e_test.go index 7561c315b..425c1b329 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -34,9 +34,11 @@ import ( "strconv" "strings" "testing" + "time" "golang.org/x/sync/errgroup" + "cloud.google.com/go/pubsub" "github.com/cyberphone/json-canonicalization/go/src/webpki.org/jsoncanonicalizer" "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" @@ -284,13 +286,18 @@ func TestEntryUpload(t *testing.T) { artifactPath := filepath.Join(t.TempDir(), "artifact") sigPath := filepath.Join(t.TempDir(), "signature.asc") + // Create the entry file createdPGPSignedArtifact(t, artifactPath, sigPath) - payload, _ := ioutil.ReadFile(artifactPath) - sig, _ := ioutil.ReadFile(sigPath) + payload, err := ioutil.ReadFile(artifactPath) + if err != nil { + t.Fatal(err) + } + sig, err := ioutil.ReadFile(sigPath) + if err != nil { + t.Fatal(err) + } - // Create the entry file entryPath := filepath.Join(t.TempDir(), "entry.json") - pubKeyBytes := []byte(publicKey) re := rekord.V001Entry{ @@ -314,16 +321,66 @@ func TestEntryUpload(t *testing.T) { } entryBytes, err := json.Marshal(returnVal) if err != nil { - t.Error(err) + t.Fatal(err) } - if err := ioutil.WriteFile(entryPath, entryBytes, 0644); err != nil { - t.Error(err) + t.Fatal(err) } + // Start pubsub client to capture notifications. Values match those in + // docker-compose.test.yml. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + psc, err := pubsub.NewClient(ctx, "test-project") + if err != nil { + t.Fatalf("Create pubsub client: %v", err) + } + topic, err := psc.CreateTopic(ctx, "new-entry") + if err != nil { + // Assume error is AlreadyExists if one occurrs unless it is context timeout. + // If the error was not AlreadyExists, it will be caught in later error + // checks in this test. + if errors.Is(err, os.ErrDeadlineExceeded) { + t.Fatalf("Create pubsub topic: %v", err) + } + topic = psc.Topic("new-entry") + } + filters := []string{ + `attributes:rekor_entry_kind`, // Ignore any messages that do not have this attribute + `attributes.rekor_signing_subjects = "test@rekor.dev"`, // This is the email in the hard-coded PGP test key + `attributes.datacontenttype = "application/json"`, // Only fetch the JSON formatted events + } + cfg := pubsub.SubscriptionConfig{ + Topic: topic, + Filter: strings.Join(filters, " AND "), + } + sub, err := psc.CreateSubscription(ctx, "new-entry-sub", cfg) + if err != nil { + if errors.Is(err, os.ErrDeadlineExceeded) { + t.Fatalf("Create pubsub subscription: %v", err) + } + sub = psc.Subscription("new-entry-sub") + } + ch := make(chan []byte, 1) + go func() { + if err := sub.Receive(ctx, func(_ context.Context, m *pubsub.Message) { + ch <- m.Data + }); err != nil { + t.Errorf("Receive pubusub msg: %v", err) + } + }() + // Now upload to rekor! out := runCli(t, "upload", "--entry", entryPath) outputContains(t, out, "Created entry at") + + // Await pubsub + select { + case msg := <-ch: + t.Logf("Got pubsub message!\n%s", string(msg)) + case <-ctx.Done(): + t.Errorf("Did not receive pubsub message: %v", ctx.Err()) + } } // Regression test for https://github.com/sigstore/rekor/pull/956