Skip to content

Commit

Permalink
Update instance connectors on a connector reconcile (#4992)
Browse files Browse the repository at this point in the history
* UpdateInstanceWithRillYAML on connector reconcile

* Simplified connector reconcile

* Merged main

* Fixed test

* fmt fix

* ResetState
  • Loading branch information
esevastyanov authored and ericpgreen2 committed Jun 3, 2024
1 parent 7c0d2da commit 1c40dd9
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 62 deletions.
2 changes: 1 addition & 1 deletion proto/gen/rill/admin/v1/admin.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ definitions:
`NullValue` is a singleton enumeration to represent the null value for the
`Value` type union.
The JSON representation for `NullValue` is JSON `null`.
The JSON representation for `NullValue` is JSON `null`.
- NULL_VALUE: Null value.
rpcStatus:
Expand Down
85 changes: 48 additions & 37 deletions proto/gen/rill/runtime/v1/resources.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions proto/gen/rill/runtime/v1/resources.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2813,7 +2813,7 @@ definitions:
`NullValue` is a singleton enumeration to represent the null value for the
`Value` type union.
The JSON representation for `NullValue` is JSON `null`.
The JSON representation for `NullValue` is JSON `null`.
- NULL_VALUE: Null value.
rpcStatus:
Expand Down Expand Up @@ -3519,6 +3519,9 @@ definitions:
NOTE : properties_from_variables and properties both should be used to get all properties.
v1ConnectorState:
type: object
properties:
specHash:
type: string
v1ConnectorV2:
type: object
properties:
Expand Down
1 change: 1 addition & 0 deletions proto/rill/runtime/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ message ConnectorSpec {
}

message ConnectorState {
string spec_hash = 1;
}

message ConnectorV2 {
Expand Down
45 changes: 28 additions & 17 deletions runtime/compilers/rillv1/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ func (a *connectorAnalyzer) analyzeSource(ctx context.Context, r *Resource) {
spec := r.SourceSpec
srcProps := spec.Properties.AsMap()
_, sourceDriver, driverErr := a.parser.driverForConnector(spec.SourceConnector)
if driverErr != nil {
// Track the errored source connector and return
a.trackConnector(spec.SourceConnector, r, false)
return
}

// Check if we have anonymous access (unless we already know that we don't)
var anonAccess bool
if res, ok := a.result[spec.SourceConnector]; (!ok || res.AnonymousAccess) && driverErr != nil {
if res, ok := a.result[spec.SourceConnector]; !ok || res.AnonymousAccess {
anonAccess, _ = sourceDriver.HasAnonymousSourceAccess(ctx, srcProps, zap.NewNop())
}

Expand All @@ -133,10 +138,15 @@ func (a *connectorAnalyzer) analyzeModel(ctx context.Context, r *Resource) {
spec := r.ModelSpec
inputProps := spec.InputProperties.AsMap()
_, inputDriver, driverErr := a.parser.driverForConnector(spec.InputConnector)
if driverErr != nil {
// Track the errored input connector and return
a.trackConnector(spec.InputConnector, r, false)
return
}

// Check if we have anonymous access (unless we already know that we don't)
var anonAccess bool
if res, ok := a.result[spec.InputConnector]; (!ok || res.AnonymousAccess) && driverErr != nil {
if res, ok := a.result[spec.InputConnector]; !ok || res.AnonymousAccess {
anonAccess, _ = inputDriver.HasAnonymousSourceAccess(ctx, inputProps, zap.NewNop())
}

Expand Down Expand Up @@ -194,8 +204,6 @@ func (a *connectorAnalyzer) analyzeResourceNotifiers(r *Resource, notifiers []*r
func (a *connectorAnalyzer) trackConnector(connector string, r *Resource, anonAccess bool) {
res, ok := a.result[connector]
if !ok {
driver, driverConnector, driverErr := a.parser.driverForConnector(connector)

// Search rill.yaml for default config properties for this connector
var defaultConfig map[string]string
if a.parser.RillYAML != nil {
Expand All @@ -215,19 +223,22 @@ func (a *connectorAnalyzer) trackConnector(connector string, r *Resource, anonAc
}
}

var driverSpec *drivers.Spec
if driverConnector != nil {
spec := driverConnector.Spec()
driverSpec = &spec
}

res = &Connector{
Name: connector,
Driver: driver,
Spec: driverSpec,
DefaultConfig: defaultConfig,
AnonymousAccess: true,
Err: driverErr,
driver, driverConnector, driverErr := a.parser.driverForConnector(connector)
if driverErr != nil {
res = &Connector{
Name: connector,
Err: driverErr,
}
} else {
driverSpec := driverConnector.Spec()
res = &Connector{
Name: connector,
Driver: driver,
Spec: &driverSpec,
DefaultConfig: defaultConfig,
AnonymousAccess: true,
Err: driverErr,
}
}

a.result[connector] = res
Expand Down
6 changes: 4 additions & 2 deletions runtime/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,8 @@ name: my-gcs
},
Resource: &runtimev1.Resource_Connector{
Connector: &runtimev1.ConnectorV2{
Spec: &runtimev1.ConnectorSpec{Driver: "s3", Properties: map[string]string{"region": "us-west-2"}},
Spec: &runtimev1.ConnectorSpec{Driver: "s3", Properties: map[string]string{"region": "us-west-2"}},
State: &runtimev1.ConnectorState{},
},
},
})
Expand All @@ -1400,7 +1401,8 @@ name: my-gcs
},
Resource: &runtimev1.Resource_Connector{
Connector: &runtimev1.ConnectorV2{
Spec: &runtimev1.ConnectorSpec{Driver: "gcs"},
Spec: &runtimev1.ConnectorSpec{Driver: "gcs"},
State: &runtimev1.ConnectorState{},
},
},
})
Expand Down
63 changes: 62 additions & 1 deletion runtime/reconcilers/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package reconcilers

import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"sort"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
)

Expand Down Expand Up @@ -46,6 +49,7 @@ func (r *ConnectorReconciler) AssignState(from, to *runtimev1.Resource) error {
}

func (r *ConnectorReconciler) ResetState(res *runtimev1.Resource) error {
res.GetConnector().State = &runtimev1.ConnectorState{}
return nil
}

Expand All @@ -61,8 +65,65 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, n *runtimev1.Resour

// Exit early for deletion
if self.Meta.DeletedOn != nil {
err = r.C.Runtime.UpdateInstanceConnector(ctx, r.C.InstanceID, self.Meta.Name.Name, nil)
if err != nil {
return runtime.ReconcileResult{Err: err}
}
return runtime.ReconcileResult{}
}

// Check if the spec has changed
specHash, err := r.executionSpecHash(t.Spec)
if err != nil {
return runtime.ReconcileResult{Err: err}
}

if specHash == t.State.SpecHash {
return runtime.ReconcileResult{}
}

// Update instance connectors
err = r.C.Runtime.UpdateInstanceConnector(ctx, r.C.InstanceID, self.Meta.Name.Name, t.Spec)
if err != nil {
return runtime.ReconcileResult{Err: err}
}

t.State.SpecHash = specHash

err = r.C.UpdateState(ctx, self.Meta.Name, self)
if err != nil {
return runtime.ReconcileResult{Err: err}
}

return runtime.ReconcileResult{}
}

func (r *ConnectorReconciler) executionSpecHash(spec *runtimev1.ConnectorSpec) (string, error) {
hash := md5.New()

_, err := hash.Write([]byte(spec.Driver))
if err != nil {
return "", err
}

// sort properties by key
keys := make([]string, 0, len(spec.Properties))
for k := range spec.Properties {
keys = append(keys, k)
}
sort.Strings(keys)

// write properties to hash
for _, k := range keys {
_, err = hash.Write([]byte(k))
if err != nil {
return "", err
}
_, err = hash.Write([]byte(spec.Properties[k]))
if err != nil {
return "", err
}
}

return hex.EncodeToString(hash.Sum(nil)), nil
}
31 changes: 31 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ func (r *Runtime) UpdateInstanceWithRillYAML(ctx context.Context, instanceID str
return r.EditInstance(ctx, inst, restartController)
}

// UpdateInstanceConnector upserts or removes a connector from an instance
// If connector is nil, the connector is removed; otherwise, it is upserted
func (r *Runtime) UpdateInstanceConnector(ctx context.Context, instanceID, name string, connector *runtimev1.ConnectorSpec) error {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return err
}

// remove the connector if it exists
for i, c := range inst.ProjectConnectors {
if c.Name == name {
inst.ProjectConnectors = append(inst.ProjectConnectors[:i], inst.ProjectConnectors[i+1:]...)
break
}
}

if connector == nil {
// connector should be removed
return r.EditInstance(ctx, inst, false)
}

// append the new/updated connector
inst.ProjectConnectors = append(inst.ProjectConnectors, &runtimev1.Connector{
Name: name,
Type: connector.Driver,
Config: connector.Properties,
})

return r.EditInstance(ctx, inst, false)
}

func instanceAnnotationsToAttribs(instance *drivers.Instance) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, len(instance.Annotations)+1)
attrs = append(attrs, attribute.String("instance_id", instance.ID))
Expand Down
3 changes: 3 additions & 0 deletions runtime/testruntime/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ func RequireResource(t testing.TB, rt *runtime.Runtime, id string, a *runtimev1.
e.StartedOn = nil
e.FinishedOn = nil
}
case runtime.ResourceKindConnector:
state := b.GetConnector().State
state.SpecHash = ""
}

// Hack to only compare the Resource field (not Meta)
Expand Down
2 changes: 1 addition & 1 deletion web-admin/src/client/gen/index.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ export interface RpcStatus {
* `NullValue` is a singleton enumeration to represent the null value for the
`Value` type union.
The JSON representation for `NullValue` is JSON `null`.
The JSON representation for `NullValue` is JSON `null`.
- NULL_VALUE: Null value.
*/
Expand Down
Loading

1 comment on commit 1c40dd9

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.