-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
to-close: added long-running materialization support to wren #374
Conversation
preparedFile.Path = preparedFile.Path | ||
subLogger.Debug().Interface("prepared_file", preparedFile).Msg("these paths should be URIs") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code does nothing, removing it
wren/compute/interface.go
Outdated
ReconcileMaterialzations(ctx context.Context) error | ||
RunMaterializations(ctx context.Context, owner *ent.Owner) | ||
StartMaterialization(ctx context.Context, owner *ent.Owner, materializationID string, compileResp *v1alpha.CompileResponse, destination *v1alpha.Destination) error | ||
StopMaterialization(ctx context.Context, materializationID string) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-org and added a few methods in the materialization
section
@@ -511,7 +515,7 @@ func (m *Manager) processMaterializations(requestCtx context.Context, owner *ent | |||
return nil | |||
} | |||
|
|||
materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner) | |||
materializations, err := m.materializationClient.GetMaterializationsBySourceType(ctx, owner, materialization.SourceTypeFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this kicks off the old-style materializations managed by wren running compute at data-load time. Now only do this on materializations that are based on file sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could rip this code out at some point soon. Maybe we make a note somewhere to do so
wren/compute/materialization.go
Outdated
"github.com/kaskada-ai/kaskada/wren/ent" | ||
"github.com/kaskada-ai/kaskada/wren/ent/materialization" | ||
"github.com/rs/zerolog/log" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrappers around calling the new methods in the compute engine for managing long-lived materializations
wren/compute/materialization.go
Outdated
return statusResponse.Progress, nil | ||
} | ||
|
||
func (m *Manager) ReconcileMaterialzations(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
running this should make sure that all materializations in the DB are running in the compute engine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this as a comment - or can we choose a more descriptive name?
I see it's being run periodically, can you clarify why?
wren/compute/query_context.go
Outdated
@@ -104,6 +104,7 @@ func convertKaskadaTableToComputeTable(kaskadaTable *ent.KaskadaTable) *v1alpha. | |||
TimeColumnName: kaskadaTable.TimeColumnName, | |||
GroupColumnName: kaskadaTable.EntityKeyColumnName, | |||
Grouping: kaskadaTable.GroupingID, | |||
Source: kaskadaTable.Source, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass table source along to compute
@@ -31,6 +31,7 @@ func (Materialization) Fields() []ent.Field { | |||
field.Bytes("slice_request").GoType(&v1alpha.SliceRequest{}).Immutable(), | |||
field.Bytes("analysis").GoType(&v1alpha.Analysis{}).Immutable(), | |||
field.Int64("data_version_id"), | |||
field.Enum("source_type").Values("unspecified", "files", "streams").Default("unspecified"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a materialization source_type to the db
return materializations, nil | ||
} | ||
|
||
func (c *materializationClient) GetAllMaterializationsBySourceType(ctx context.Context, sourceType materialization.SourceType) ([]*ent.Materialization, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, but for all owners/clients instead of just one
return nil, customerrors.NewInvalidArgumentErrorWithCustomText("cannot materialize tables from different source types") | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protection around creating materializations with mixed source types
switch sourceType { | ||
case materialization.SourceTypeFiles: | ||
subLogger.Debug().Msg("running materializations") | ||
s.computeManager.RunMaterializations(ctx, owner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if is file-backed materialization, run as before
wren/service/materialization.go
Outdated
s.computeManager.RunMaterializations(ctx, owner) | ||
case materialization.SourceTypeStreams: | ||
subLogger.Debug().Msg("adding materialization to compute") | ||
err := s.computeManager.StartMaterialization(ctx, owner, createdMaterialization.ID.String(), compileResp, createdMaterialization.Destination) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if is stream backed materialization, attempt to start it on compute
}) | ||
Expect(err).Should(HaveOccurred()) | ||
Expect(err.Error()).Should(Equal("cannot materialize tables from different source types")) | ||
Expect(response).Should(BeNil()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test of protection around materializing from multiple source types
wren/compute/manager.go
Outdated
store: *objectStoreClient, | ||
tableStore: tableStore, | ||
tr: otel.Tracer("ComputeManager"), | ||
runningMaterailizations: map[string]struct{}{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling
wren/compute/manager.go
Outdated
@@ -52,22 +53,25 @@ type Manager struct { | |||
store client.ObjectStoreClient | |||
tableStore store.TableStore | |||
tr trace.Tracer | |||
|
|||
runningMaterailizations map[string]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling
@@ -511,7 +515,7 @@ func (m *Manager) processMaterializations(requestCtx context.Context, owner *ent | |||
return nil | |||
} | |||
|
|||
materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner) | |||
materializations, err := m.materializationClient.GetMaterializationsBySourceType(ctx, owner, materialization.SourceTypeFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could rip this code out at some point soon. Maybe we make a note somewhere to do so
wren/compute/materialization.go
Outdated
func (m *Manager) StartMaterialization(ctx context.Context, owner *ent.Owner, materializationID string, compileResp *v1alpha.CompileResponse, destination *v1alpha.Destination) error { | ||
subLogger := log.Ctx(ctx).With().Str("method", "manager.StartMaterialization").Str("materialization_id", materializationID).Logger() | ||
|
||
tables, err := m.getMaterializationTables(ctx, owner, compileResp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we start moving away from Tables
and preferring Input
or Source
?
wren/compute/materialization.go
Outdated
return statusResponse.Progress, nil | ||
} | ||
|
||
func (m *Manager) ReconcileMaterialzations(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add this as a comment - or can we choose a more descriptive name?
I see it's being run periodically, can you clarify why?
wren/compute/materialization.go
Outdated
return err | ||
} | ||
|
||
newRunningMaterailizations := make(map[string]struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spelling
wren/compute/materialization.go
Outdated
} | ||
|
||
if isRunning { | ||
newRunningMaterailizations[materializationID] = struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, what is the struct{}{}
holding?
wren/compute/materialization.go
Outdated
} | ||
} | ||
|
||
// find all materializations that were running in the previous iteration but don't exist anymore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on this logic? What is a previous iteration in this context
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = s.materializationClient.DeleteMaterialization(ctx, owner, materialization) | ||
if foundMaterialization.SourceType == materialization.SourceTypeStreams { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume in the future we'll also have a way to have a long-running file backed materialization, so maybe a todo
here
@jordanrfrazier this is a new branch than the previous PR: #374 which I'm going to close.
No description provided.