Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconcilers: Add an initial concept of a dedicated reconcilers object in mediator #872

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (

"github.com/stacklok/mediator/internal/config"
"github.com/stacklok/mediator/internal/engine"
"github.com/stacklok/mediator/internal/events"
"github.com/stacklok/mediator/internal/logger"
"github.com/stacklok/mediator/internal/reconcilers"
"github.com/stacklok/mediator/pkg/controlplane"
"github.com/stacklok/mediator/pkg/db"
)
Expand Down Expand Up @@ -61,12 +63,19 @@ var serveCmd = &cobra.Command{

errg, ctx := errgroup.WithContext(ctx)

s, err := controlplane.NewServer(store, cfg)
evt, err := events.Setup()
if err != nil {
log.Printf("Failed to set up eventer: %v", err)
return err
}

s, err := controlplane.NewServer(store, evt, cfg)
if err != nil {
return fmt.Errorf("unable to create server: %w", err)
}

s.ConsumeEvents(engine.NewExecutor(store))
s.ConsumeEvents(reconcilers.NewRecociler(store, evt))

// Start the gRPC and HTTP server in separate goroutines
errg.Go(func() error {
Expand Down
209 changes: 163 additions & 46 deletions internal/engine/entity_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"

Expand All @@ -26,9 +27,12 @@ import (
pb "github.com/stacklok/mediator/pkg/generated/protobuf/go/mediator/v1"
)

// entityInfoWrapper is a helper struct to gather information
// EntityInfoWrapper is a helper struct to gather information
// about entities from events.
// It assumes that the message.Message contains a payload
// It's able to build message.Message structures from
// the information it gathers.
//
// It's also able to read the message.Message that contains a payload
// with a protobuf message that's specific to the entity type.
//
// It also assumes the following metadata keys are present:
Expand All @@ -43,7 +47,8 @@ import (
//
// - RepositoryEventEntityType - repository
// - VersionedArtifactEventEntityType - versioned_artifact
type entityInfoWrapper struct {
type EntityInfoWrapper struct {
Provider string
GroupID int32
Entity protoreflect.ProtoMessage
Type pb.Entity
Expand All @@ -60,6 +65,8 @@ const (
const (
// EntityTypeEventKey is the key for the entity type
EntityTypeEventKey = "entity_type"
// ProviderEventKey is the key for the provider
ProviderEventKey = "provider"
// GroupIDEventKey is the key for the group ID
GroupIDEventKey = "group_id"
// RepositoryIDEventKey is the key for the repository ID
Expand All @@ -68,66 +75,117 @@ const (
ArtifactIDEventKey = "artifact_id"
)

func parseEntityEvent(msg *message.Message) (*entityInfoWrapper, error) {
out := &entityInfoWrapper{
// NewEntityInfoWrapper creates a new EntityInfoWrapper
func NewEntityInfoWrapper() *EntityInfoWrapper {
return &EntityInfoWrapper{
OwnershipData: make(map[string]int32),
}
}

if err := out.withGroupIDFromMessage(msg); err != nil {
return nil, err
}
// WithProvider sets the provider
func (eiw *EntityInfoWrapper) WithProvider(provider string) *EntityInfoWrapper {
eiw.Provider = provider

// We always have the repository ID.
if err := out.withRepositoryIDFromMessage(msg); err != nil {
return nil, err
}
return eiw
}

typ := msg.Metadata.Get(EntityTypeEventKey)
switch typ {
case RepositoryEventEntityType:
out.asRepository()
case VersionedArtifactEventEntityType:
out.asVersionedArtifact()
if err := out.withArtifactIDFromMessage(msg); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown entity type: %s", typ)
}
// WithVersionedArtifact sets the entity to a versioned artifact
func (eiw *EntityInfoWrapper) WithVersionedArtifact(va *pb.VersionedArtifact) *EntityInfoWrapper {
eiw.Type = pb.Entity_ENTITY_ARTIFACTS
eiw.Entity = va

if err := out.unmarshalEntity(msg); err != nil {
return nil, fmt.Errorf("error unmarshalling payload: %w", err)
}
return eiw
}

return out, nil
// WithRepository sets the entity to a repository
func (eiw *EntityInfoWrapper) WithRepository(r *pb.RepositoryResult) *EntityInfoWrapper {
eiw.Type = pb.Entity_ENTITY_REPOSITORIES
eiw.Entity = r

return eiw
}

func newEntityInfoWrapper() *entityInfoWrapper {
return &entityInfoWrapper{
OwnershipData: make(map[string]int32),
}
// WithGroupID sets the group ID
func (eiw *EntityInfoWrapper) WithGroupID(id int32) *EntityInfoWrapper {
eiw.GroupID = id

return eiw
}

func (eiw *entityInfoWrapper) withVersionedArtifact(va *pb.VersionedArtifact) {
eiw.Type = pb.Entity_ENTITY_ARTIFACTS
eiw.Entity = va
// WithRepositoryID sets the repository ID
func (eiw *EntityInfoWrapper) WithRepositoryID(id int32) *EntityInfoWrapper {
eiw.withID(RepositoryIDEventKey, id)

return eiw
}

func (eiw *entityInfoWrapper) withGroupID(id int32) {
eiw.GroupID = id
// WithArtifactID sets the artifact ID
func (eiw *EntityInfoWrapper) WithArtifactID(id int32) *EntityInfoWrapper {
eiw.withID(ArtifactIDEventKey, id)

return eiw
}

func (eiw *entityInfoWrapper) asRepository() {
// AsRepository sets the entity type to a repository
func (eiw *EntityInfoWrapper) AsRepository() *EntityInfoWrapper {
eiw.Type = pb.Entity_ENTITY_REPOSITORIES
eiw.Entity = &pb.RepositoryResult{}

return eiw
}

func (eiw *entityInfoWrapper) asVersionedArtifact() {
// AsVersionedArtifact sets the entity type to a versioned artifact
func (eiw *EntityInfoWrapper) AsVersionedArtifact() *EntityInfoWrapper {
eiw.Type = pb.Entity_ENTITY_ARTIFACTS
eiw.Entity = &pb.VersionedArtifact{}

return eiw
}

// BuildMessage builds a message.Message from the information
func (eiw *EntityInfoWrapper) BuildMessage() (*message.Message, error) {
id, err := uuid.NewUUID()
if err != nil {
return nil, fmt.Errorf("error generating UUID: %w", err)
}

msg := message.NewMessage(id.String(), nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: should we return nil, err rather than empty_msg, err on error? (I know that the caller should just not touch the first return parameter on error..)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return msg, eiw.ToMessage(msg)
}

// ToMessage sets the information to a message.Message
func (eiw *EntityInfoWrapper) ToMessage(msg *message.Message) error {
typ, err := pbEntityTypeToString(eiw.Type)
if err != nil {
return err
}

if eiw.GroupID == 0 {
return fmt.Errorf("group ID is required")
}

if eiw.Provider == "" {
return fmt.Errorf("provider is required")
}

msg.Metadata.Set(ProviderEventKey, eiw.Provider)
msg.Metadata.Set(EntityTypeEventKey, typ)
msg.Metadata.Set(GroupIDEventKey, fmt.Sprintf("%d", eiw.GroupID))
for k, v := range eiw.OwnershipData {
if v == 0 {
return fmt.Errorf("%s is required", k)
}
msg.Metadata.Set(k, fmt.Sprintf("%d", v))
}
msg.Payload, err = protojson.Marshal(eiw.Entity)
if err != nil {
return fmt.Errorf("error marshalling repository: %w", err)
}

return nil
}

func (eiw *entityInfoWrapper) withGroupIDFromMessage(msg *message.Message) error {
func (eiw *EntityInfoWrapper) withGroupIDFromMessage(msg *message.Message) error {
id, err := getIDFromMessage(msg, GroupIDEventKey)
if err != nil {
return fmt.Errorf("error parsing %s: %w", GroupIDEventKey, err)
Expand All @@ -137,15 +195,25 @@ func (eiw *entityInfoWrapper) withGroupIDFromMessage(msg *message.Message) error
return nil
}

func (eiw *entityInfoWrapper) withRepositoryIDFromMessage(msg *message.Message) error {
func (eiw *EntityInfoWrapper) withProviderFromMessage(msg *message.Message) error {
provider := msg.Metadata.Get(ProviderEventKey)
if provider == "" {
return fmt.Errorf("%s not found in metadata", ProviderEventKey)
}

eiw.Provider = provider
return nil
}

func (eiw *EntityInfoWrapper) withRepositoryIDFromMessage(msg *message.Message) error {
return eiw.withIDFromMessage(msg, RepositoryIDEventKey)
}

func (eiw *entityInfoWrapper) withArtifactIDFromMessage(msg *message.Message) error {
func (eiw *EntityInfoWrapper) withArtifactIDFromMessage(msg *message.Message) error {
return eiw.withIDFromMessage(msg, ArtifactIDEventKey)
}

func (eiw *entityInfoWrapper) withIDFromMessage(msg *message.Message, key string) error {
func (eiw *EntityInfoWrapper) withIDFromMessage(msg *message.Message, key string) error {
id, err := getIDFromMessage(msg, key)
if err != nil {
return fmt.Errorf("error parsing %s: %w", key, err)
Expand All @@ -155,15 +223,15 @@ func (eiw *entityInfoWrapper) withIDFromMessage(msg *message.Message, key string
return nil
}

func (eiw *entityInfoWrapper) withID(key string, id int32) {
func (eiw *EntityInfoWrapper) withID(key string, id int32) {
eiw.OwnershipData[key] = id
}

func (eiw *entityInfoWrapper) unmarshalEntity(msg *message.Message) error {
func (eiw *EntityInfoWrapper) unmarshalEntity(msg *message.Message) error {
return protojson.Unmarshal(msg.Payload, eiw.Entity)
}

func (eiw *entityInfoWrapper) evalStatusParams(
func (eiw *EntityInfoWrapper) evalStatusParams(
policyID int32,
ruleTypeID int32,
evalErr error,
Expand All @@ -184,6 +252,17 @@ func (eiw *entityInfoWrapper) evalStatusParams(
return params
}

func pbEntityTypeToString(t pb.Entity) (string, error) {
switch t {
case pb.Entity_ENTITY_REPOSITORIES:
return RepositoryEventEntityType, nil
case pb.Entity_ENTITY_ARTIFACTS:
return VersionedArtifactEventEntityType, nil
default:
return "", fmt.Errorf("unknown entity type: %s", t.String())
}
}

func getIDFromMessage(msg *message.Message, key string) (int32, error) {
rawID := msg.Metadata.Get(key)
if rawID == "" {
Expand All @@ -192,3 +271,41 @@ func getIDFromMessage(msg *message.Message, key string) (int32, error) {

return util.Int32FromString(rawID)
}

func parseEntityEvent(msg *message.Message) (*EntityInfoWrapper, error) {
out := &EntityInfoWrapper{
OwnershipData: make(map[string]int32),
}

if err := out.withGroupIDFromMessage(msg); err != nil {
return nil, err
}

if err := out.withProviderFromMessage(msg); err != nil {
return nil, err
}

// We always have the repository ID.
if err := out.withRepositoryIDFromMessage(msg); err != nil {
return nil, err
}

typ := msg.Metadata.Get(EntityTypeEventKey)
switch typ {
case RepositoryEventEntityType:
out.AsRepository()
case VersionedArtifactEventEntityType:
out.AsVersionedArtifact()
if err := out.withArtifactIDFromMessage(msg); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown entity type: %s", typ)
}

if err := out.unmarshalEntity(msg); err != nil {
return nil, fmt.Errorf("error unmarshalling payload: %w", err)
}

return out, nil
}
Loading