Skip to content

Commit

Permalink
Runtime: load env on start and reload on change (#4738)
Browse files Browse the repository at this point in the history
* load env on start and reload on change

* also restart controller when rill.yaml is changed

* use resolved variables in execution hash

* Update runtime/reconcilers/model.go

Co-authored-by: Benjamin Egelund-Müller <b@egelund-muller.com>

* set env before starting controller

* set env before starting controller

* also set other field from rill.yaml

* fix fmt

* common function

* Update runtime/registry.go

---------

Co-authored-by: Benjamin Egelund-Müller <b@egelund-muller.com>
  • Loading branch information
k-anshul and begelundmuller authored Apr 30, 2024
1 parent 10cd5d0 commit e6f6227
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 74 deletions.
25 changes: 25 additions & 0 deletions runtime/compilers/rillv1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ func ParseRillYAML(ctx context.Context, repo drivers.RepoStore, instanceID strin
return p.RillYAML, nil
}

// ParseDotEnv parses only the .env file present in project's root.
func ParseDotEnv(ctx context.Context, repo drivers.RepoStore, instanceID string) (map[string]string, error) {
files, err := repo.ListRecursive(ctx, ".env", true)
if err != nil {
return nil, fmt.Errorf("could not list project files: %w", err)
}

if len(files) == 0 {
return nil, nil
}

paths := make([]string, len(files))
for i, file := range files {
paths[i] = file.Path
}

p := Parser{Repo: repo, InstanceID: instanceID}
err = p.parsePaths(ctx, paths)
if err != nil {
return nil, err
}

return p.DotEnv, nil
}

// Parse creates a new parser and parses the entire project.
func Parse(ctx context.Context, repo drivers.RepoStore, instanceID, environment, defaultOLAPConnector string) (*Parser, error) {
p := &Parser{
Expand Down
36 changes: 27 additions & 9 deletions runtime/reconcilers/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,14 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
materialize = true
}

// Resolve variables before computing the execution hash to ensure we re-trigger when a variable is updated
sql, err := r.resolveTemplateSQL(ctx, self)
if err != nil {
return runtime.ReconcileResult{Err: err}
}

// Use a hash of execution-related fields from the spec to determine if something has changed
hash, err := r.executionSpecHash(ctx, self.Meta.Refs, model.Spec, materialize)
hash, err := r.executionSpecHash(ctx, self.Meta.Refs, model.Spec, materialize, sql)
if err != nil {
return runtime.ReconcileResult{Err: fmt.Errorf("failed to compute hash: %w", err)}
}
Expand Down Expand Up @@ -223,7 +229,7 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
}

// Create the model
createErr := r.createModel(ctx, self, stagingTableName, !materialize)
createErr := r.createModel(ctx, self, sql, stagingTableName, !materialize)
if createErr != nil {
createErr = fmt.Errorf("failed to create model: %w", createErr)
}
Expand Down Expand Up @@ -325,7 +331,7 @@ func (r *ModelReconciler) delayedMaterializeTime(spec *runtimev1.ModelSpec, sinc
}

// executionSpecHash computes a hash of only those model properties that impact execution.
func (r *ModelReconciler) executionSpecHash(ctx context.Context, refs []*runtimev1.ResourceName, spec *runtimev1.ModelSpec, materialize bool) (string, error) {
func (r *ModelReconciler) executionSpecHash(ctx context.Context, refs []*runtimev1.ResourceName, spec *runtimev1.ModelSpec, materialize bool, sql string) (string, error) {
hash := md5.New()

for _, ref := range refs { // Refs are always sorted
Expand Down Expand Up @@ -367,7 +373,7 @@ func (r *ModelReconciler) executionSpecHash(ctx context.Context, refs []*runtime
return "", err
}

_, err = hash.Write([]byte(spec.Sql))
_, err = hash.Write([]byte(sql))
if err != nil {
return "", err
}
Expand Down Expand Up @@ -410,19 +416,18 @@ func (r *ModelReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Reso
return r.C.UpdateSpec(ctx, self.Meta.Name, self)
}

// createModel creates or updates the model in the OLAP connector.
func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resource, tableName string, view bool) error {
func (r *ModelReconciler) resolveTemplateSQL(ctx context.Context, self *runtimev1.Resource) (string, error) {
inst, err := r.C.Runtime.Instance(ctx, r.C.InstanceID)
if err != nil {
return err
return "", err
}

spec := self.Resource.(*runtimev1.Resource_Model).Model.Spec
state := self.Resource.(*runtimev1.Resource_Model).Model.State

olap, release, err := r.C.AcquireOLAP(ctx, spec.Connector)
if err != nil {
return err
return "", err
}
defer release()

Expand Down Expand Up @@ -456,12 +461,25 @@ func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resou
},
})
if err != nil {
return fmt.Errorf("failed to resolve template: %w", err)
return "", fmt.Errorf("failed to resolve template: %w", err)
}
} else {
sql = spec.Sql
}

return sql, nil
}

// createModel creates or updates the model in the OLAP connector.
func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resource, sql, tableName string, view bool) error {
spec := self.Resource.(*runtimev1.Resource_Model).Model.Spec

olap, release, err := r.C.AcquireOLAP(ctx, spec.Connector)
if err != nil {
return err
}
defer release()

// If materializing, set timeout on ctx
if !view {
timeout := _defaultMaterializeTimeout
Expand Down
59 changes: 6 additions & 53 deletions runtime/reconcilers/project_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,10 @@ func (r *ProjectParserReconciler) reconcileParser(ctx context.Context, inst *dri
return parseErrsErr
}

// Treat reloads the same as a fresh parse (where there's no diff)
if diff != nil && diff.Reloaded {
diff = nil
}

// Update state from rill.yaml and .env
if diff == nil || diff.ModifiedDotEnv {
err := r.reconcileProjectConfig(ctx, parser)
if err != nil {
return err
}
// not setting restartController=true when diff is actually nil prevents infinite restarts
restartController := diff != nil && (diff.ModifiedDotEnv || diff.Reloaded)
if restartController {
return r.reconcileProjectConfig(ctx, parser, true)
}

// Reconcile resources.
Expand All @@ -324,48 +317,8 @@ func (r *ProjectParserReconciler) reconcileParser(ctx context.Context, inst *dri
}

// reconcileProjectConfig updates instance config derived from rill.yaml and .env
func (r *ProjectParserReconciler) reconcileProjectConfig(ctx context.Context, parser *compilerv1.Parser) error {
inst, err := r.C.Runtime.Instance(ctx, r.C.InstanceID)
if err != nil {
return err
}

// Shallow clone for editing
tmp := *inst
inst = &tmp

inst.ProjectOLAPConnector = parser.RillYAML.OLAPConnector

conns := make([]*runtimev1.Connector, 0, len(parser.RillYAML.Connectors))
for _, c := range parser.RillYAML.Connectors {
conns = append(conns, &runtimev1.Connector{
Type: c.Type,
Name: c.Name,
Config: c.Defaults,
})
}
inst.ProjectConnectors = conns

vars := make(map[string]string)
for _, v := range parser.RillYAML.Variables {
vars[v.Name] = v.Default
}
for k, v := range parser.DotEnv {
vars[k] = v
}
inst.ProjectVariables = vars

inst.FeatureFlags = parser.RillYAML.FeatureFlags

// TODO: Passing "false" guards against infinite cancellations and restarts of the controller,
// but it also ignores potential consistency issues where we update connector config without evicting cached connctions,
// or where we update variables and don't re-evaluate all resources.
err = r.C.Runtime.EditInstance(ctx, inst, false)
if err != nil {
return err
}

return nil
func (r *ProjectParserReconciler) reconcileProjectConfig(ctx context.Context, parser *compilerv1.Parser, restartController bool) error {
return r.C.Runtime.UpdateInstanceWithRillYAML(ctx, r.C.InstanceID, parser.RillYAML, parser.DotEnv, restartController)
}

// reconcileResources creates, updates and deletes resources as necessary to match the parser's output with the current resources in the catalog.
Expand Down
26 changes: 14 additions & 12 deletions runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN
return runtime.ReconcileResult{Err: err}
}

srcConfig, err := r.driversSource(ctx, self, src.Spec.Properties)
if err != nil {
return runtime.ReconcileResult{Err: err}
}

// Use a hash of ingestion-related fields from the spec to determine if we need to re-ingest
hash, err := r.ingestionSpecHash(src.Spec)
hash, err := r.ingestionSpecHash(src.Spec, srcConfig)
if err != nil {
return runtime.ReconcileResult{Err: fmt.Errorf("failed to compute hash: %w", err)}
}
Expand Down Expand Up @@ -183,7 +188,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN

// Execute ingestion
r.C.Logger.Info("Ingesting source data", zap.String("name", n.Name), zap.String("connector", connector))
ingestErr := r.ingestSource(ctx, self, stagingTableName)
ingestErr := r.ingestSource(ctx, self, srcConfig, driversSink(stagingTableName))
if ingestErr != nil {
ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr)
}
Expand Down Expand Up @@ -261,7 +266,7 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN
}

// ingestionSpecHash computes a hash of only those source spec properties that impact ingestion.
func (r *SourceReconciler) ingestionSpecHash(spec *runtimev1.SourceSpec) (string, error) {
func (r *SourceReconciler) ingestionSpecHash(spec *runtimev1.SourceSpec, srcConfig map[string]any) (string, error) {
hash := md5.New()

_, err := hash.Write([]byte(spec.SourceConnector))
Expand All @@ -274,7 +279,11 @@ func (r *SourceReconciler) ingestionSpecHash(spec *runtimev1.SourceSpec) (string
return "", err
}

err = pbutil.WriteHash(structpb.NewStructValue(spec.Properties), hash)
st, err := structpb.NewStruct(srcConfig)
if err != nil {
return "", err
}
err = pbutil.WriteHash(structpb.NewStructValue(st), hash)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -316,7 +325,7 @@ func (r *SourceReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Res
// ingestSource ingests the source into a table with tableName.
// It does NOT drop the table if ingestion fails after the table has been created.
// It will return an error if the sink connector is not an OLAP.
func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Resource, tableName string) (outErr error) {
func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Resource, srcConfig, sinkConfig map[string]any) (outErr error) {
src := self.GetSource().Spec

// Get connections and transporter
Expand All @@ -338,13 +347,6 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Res
}
}

// Get source and sink configs
srcConfig, err := r.driversSource(ctx, self, src.Properties)
if err != nil {
return err
}
sinkConfig := driversSink(tableName)

// Set timeout on ctx
timeout := _defaultIngestTimeout
if src.TimeoutSeconds > 0 {
Expand Down
25 changes: 25 additions & 0 deletions runtime/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/logbuffer"
Expand Down Expand Up @@ -417,6 +418,9 @@ func (r *registryCache) restartController(iwc *instanceWithController) {
}

// Start controller
if err := r.updateProjectConfig(iwc); err != nil {
iwc.logger.Warn("failed to parse and update the project config before starting the controller", zap.Error(err))
}
iwc.logger.Debug("controller starting")
ctrl, err := NewController(iwc.ctx, r.rt, iwc.instanceID, iwc.logger, r.activity)
if err == nil {
Expand Down Expand Up @@ -540,6 +544,27 @@ func (r *registryCache) emitHeartbeatForInstance(inst *drivers.Instance) {
)
}

// updateProjectConfig updates the project config for the given instance.
// This does the same operation as ProjectParserReconciler's reconcileProjectConfig and is done before starting the controller
// to ensure that when controller first starts, it doesn’t immediately restart due to changed variables
func (r *registryCache) updateProjectConfig(iwc *instanceWithController) error {
repo, release, err := r.rt.Repo(iwc.ctx, iwc.instanceID)
if err != nil {
return err
}
defer release()

rillYAML, err := rillv1.ParseRillYAML(iwc.ctx, repo, iwc.instanceID)
if err != nil {
return err
}
dotEnv, err := rillv1.ParseDotEnv(iwc.ctx, repo, iwc.instanceID)
if err != nil {
return err
}
return r.rt.UpdateInstanceWithRillYAML(iwc.ctx, iwc.instanceID, rillYAML, dotEnv, false)
}

func sizeOfDir(path string) int64 {
var size int64
_ = fs.WalkDir(os.DirFS(path), ".", func(path string, d fs.DirEntry, err error) error {
Expand Down
34 changes: 34 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/compilers/rillv1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/conncache"
Expand Down Expand Up @@ -109,6 +110,39 @@ func (r *Runtime) GetInstanceAttributes(ctx context.Context, instanceID string)
return instanceAnnotationsToAttribs(instance)
}

func (r *Runtime) UpdateInstanceWithRillYAML(ctx context.Context, instanceID string, rillYAML *rillv1.RillYAML, dotEnv map[string]string, restartController bool) error {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return err
}

// Shallow clone for editing
tmp := *inst
inst = &tmp

inst.ProjectOLAPConnector = rillYAML.OLAPConnector

conns := make([]*runtimev1.Connector, 0, len(rillYAML.Connectors))
for _, c := range rillYAML.Connectors {
conns = append(conns, &runtimev1.Connector{
Type: c.Type,
Name: c.Name,
Config: c.Defaults,
})
}
inst.ProjectConnectors = conns
vars := make(map[string]string)
for _, v := range rillYAML.Variables {
vars[v.Name] = v.Default
}
for k, v := range dotEnv {
vars[k] = v
}
inst.ProjectVariables = vars
inst.FeatureFlags = rillYAML.FeatureFlags
return r.EditInstance(ctx, inst, restartController)
}

func instanceAnnotationsToAttribs(instance *drivers.Instance) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, len(instance.Annotations)+1)
attrs = append(attrs, attribute.String("instance_id", instance.ID))
Expand Down

0 comments on commit e6f6227

Please sign in to comment.