Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added long-running materialization support to wren #387

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions wren/compute/compute_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kaskada-ai/kaskada/wren/client"
"github.com/kaskada-ai/kaskada/wren/customerrors"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/internal"
"github.com/kaskada-ai/kaskada/wren/utils"
)
Expand All @@ -39,7 +40,9 @@ type ComputeManager interface {
InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error)
SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot)

// materialization related
// Runs all existing file-based materializations for the given owner
// Note: this exists in the ComputeManager interface instead of the MaterializationManager interface because
// it runs materializations in a similar way to InitiateQuery
RunMaterializations(ctx context.Context, owner *ent.Owner)
}

Expand Down Expand Up @@ -201,6 +204,8 @@ func (m *computeManager) SaveComputeSnapshots(queryContext *QueryContext, comput
}

// Runs all saved materializations on current data inside a go-routine that attempts to finish before shutdown
// TODO: After sparrow supports long-running materializations from file-based sources
// remove all the code related to this method
func (m *computeManager) RunMaterializations(requestCtx context.Context, owner *ent.Owner) {
m.errGroup.Go(func() error { return m.processMaterializations(requestCtx, owner) })
}
Expand Down Expand Up @@ -229,7 +234,7 @@ func (m *computeManager) processMaterializations(requestCtx context.Context, own
subLogger.Error().Err(err).Msg("issue getting current prepare cache buster")
}

materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner)
materializations, err := m.materializationClient.GetMaterializationsBySourceType(ctx, owner, materialization.SourceTypeFiles)
if err != nil {
subLogger.Error().Err(err).Msg("error listing materializations")
return nil
Expand Down
26 changes: 24 additions & 2 deletions wren/compute/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
)

type FileManager interface {
// metadata related

// GetFileSchema returns the schema of the file at the given URI
GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error)

// GetPulsarSchema returns the schema of the pulsar topic
GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha.PulsarConfig) (*v1alpha.Schema, error)
}

type fileManager struct {
Expand All @@ -27,7 +31,7 @@ func NewFileManager(computeClients *client.ComputeClients) FileManager {
}

func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) {
subLogger := log.Ctx(ctx).With().Str("method", "manager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger()
subLogger := log.Ctx(ctx).With().Str("method", "fileManager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger()
// Send the metadata request to the FileService

var sourceData *v1alpha.SourceData
Expand All @@ -42,6 +46,24 @@ func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.File
sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}}
}

return m.getSchema(ctx, sourceData)
}

func (m *fileManager) GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha.PulsarConfig) (*v1alpha.Schema, error) {
sourceData := &v1alpha.SourceData{
Source: &v1alpha.SourceData_PulsarSubscription{
PulsarSubscription: &v1alpha.PulsarSubscription{
Config: pulsarConfig,
},
},
}

return m.getSchema(ctx, sourceData)
}

func (m *fileManager) getSchema(ctx context.Context, sourceData *v1alpha.SourceData) (*v1alpha.Schema, error) {
subLogger := log.Ctx(ctx).With().Str("method", "fileManager.getSchema").Logger()

fileClient := m.computeClients.NewFileServiceClient(ctx)
defer fileClient.Close()

Expand Down
88 changes: 84 additions & 4 deletions wren/compute/materialization_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kaskada-ai/kaskada/wren/client"
"github.com/kaskada-ai/kaskada/wren/customerrors"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/internal"
"github.com/rs/zerolog/log"
)
Expand All @@ -22,6 +23,9 @@ type MaterializationManager interface {

// GetMaterializationStatus gets the status of a materialization on the compute backend
GetMaterializationStatus(ctx context.Context, materializationID string) (*v1alpha.ProgressInformation, error)

// ReconcileMaterializations reconciles the materializations in the database with the materializations on the compute backend
ReconcileMaterializations(ctx context.Context) error
}

type materializationManager struct {
Expand All @@ -30,14 +34,19 @@ type materializationManager struct {
computeClients client.ComputeClients
kaskadaTableClient internal.KaskadaTableClient
materializationClient internal.MaterializationClient

// this is used to keep track of which materializations are currently running on the compute backend
// so that if a materialization is deleted from the database, we can stop it the next time we reconcile
runningMaterializations map[string]interface{}
}

func NewMaterializationManager(compileManager *CompileManager, computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, materializationClient *internal.MaterializationClient) MaterializationManager {
return &materializationManager{
CompileManager: *compileManager,
computeClients: *computeClients,
kaskadaTableClient: *kaskadaTableClient,
materializationClient: *materializationClient,
CompileManager: *compileManager,
computeClients: *computeClients,
kaskadaTableClient: *kaskadaTableClient,
materializationClient: *materializationClient,
runningMaterializations: map[string]interface{}{},
}
}

Expand Down Expand Up @@ -110,6 +119,77 @@ func (m *materializationManager) GetMaterializationStatus(ctx context.Context, m
return statusResponse.Progress, nil
}

// ReconcileMaterializations reconciles the materializations in the database with the materializations on the compute backend
// After running this function, all materializations in the database will be running on the compute backend
// and all deleted materializations will be stopped
func (m *materializationManager) ReconcileMaterializations(ctx context.Context) error {
subLogger := log.Ctx(ctx).With().Str("method", "manager.ReconcileMaterializations").Logger()

allStreamMaterializations, err := m.materializationClient.GetAllMaterializationsBySourceType(ctx, materialization.SourceTypeStreams)
if err != nil {
subLogger.Error().Err(err).Msg("failed to get all stream materializations")
return err
}

// find all materializations in the database and start any that are not running
// we keep a map of materialization_name=>nil to keep track of which materializations are running
newRunningMaterializations := make(map[string]interface{})
for _, streamMaterialization := range allStreamMaterializations {
materializationID := streamMaterialization.ID.String()
owner := streamMaterialization.Edges.Owner

isRunning := false
// check to see if the materialization was running in the previous iteration
if _, found := m.runningMaterializations[materializationID]; found {
//verify that the materialization is still running
progressInfo, err := m.GetMaterializationStatus(ctx, materializationID)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to get materialization status")
}
isRunning = progressInfo != nil
epinzur marked this conversation as resolved.
Show resolved Hide resolved
}

if isRunning {
newRunningMaterializations[materializationID] = nil
} else {
log.Debug().Str("id", materializationID).Msg("found materialization that is not running, attempting to start it")

compileResp, _, err := m.CompileEntMaterialization(ctx, owner, streamMaterialization)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("issue compiling materialization")
} else {
err = m.StartMaterialization(ctx, owner, materializationID, compileResp, streamMaterialization.Destination)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to start materialization")
} else {
log.Debug().Str("id", materializationID).Msg("started materialization")
newRunningMaterializations[materializationID] = nil
}
}
}
}

// find all materializations that were running the previous time this method was called
// but no longer exist in the database. stop any that are found. this can happen due to a race
// condition where a materialization is deleted from the database after this method has started
// but before it has finished. this method is called periodically so it will eventually stop
// the materialization.
for materializationID := range m.runningMaterializations {
if _, found := newRunningMaterializations[materializationID]; !found {
log.Debug().Str("id", materializationID).Msg("found materialization that no longer exists, attempting to stop it")
err := m.StopMaterialization(ctx, materializationID)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to stop materialization")
newRunningMaterializations[materializationID] = nil
} else {
log.Debug().Str("id", materializationID).Msg("stopped materialization")
}
}
}
m.runningMaterializations = newRunningMaterializations
return nil
}

func (m *materializationManager) getMaterializationTables(ctx context.Context, owner *ent.Owner, compileResp *v1alpha.CompileResponse) ([]*v1alpha.ComputeTable, error) {
subLogger := log.Ctx(ctx).With().Str("method", "materializationManager.getMaterializationTables").Logger()

Expand Down
1 change: 1 addition & 0 deletions wren/ent/schema/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (Materialization) Fields() []ent.Field {
field.Bytes("slice_request").GoType(&v1alpha.SliceRequest{}).Immutable(),
field.Bytes("analysis").GoType(&v1alpha.Analysis{}).Immutable(),
field.Int64("data_version_id"),
field.Enum("source_type").Values("unspecified", "files", "streams").Default("unspecified"),
}
}

Expand Down
4 changes: 3 additions & 1 deletion wren/internal/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/kaskadafile"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/ent/predicate"
"github.com/kaskada-ai/kaskada/wren/ent/schema"
"github.com/kaskada-ai/kaskada/wren/property"
Expand Down Expand Up @@ -80,10 +81,11 @@ type MaterializationClient interface {
CreateMaterialization(ctx context.Context, owner *ent.Owner, newMaterialization *ent.Materialization, dependencies []*ent.MaterializationDependency) (*ent.Materialization, error)
DeleteMaterialization(ctx context.Context, owner *ent.Owner, view *ent.Materialization) error
GetAllMaterializations(ctx context.Context, owner *ent.Owner) ([]*ent.Materialization, error)
GetAllMaterializationsBySourceType(ctx context.Context, sourceType materialization.SourceType) ([]*ent.Materialization, error)
GetMaterialization(ctx context.Context, owner *ent.Owner, id uuid.UUID) (*ent.Materialization, error)
GetMaterializationByName(ctx context.Context, owner *ent.Owner, name string) (*ent.Materialization, error)
GetMaterializationsFromNames(ctx context.Context, owner *ent.Owner, names []string) (map[string]*ent.Materialization, error)
GetMaterializationsWithDependency(ctx context.Context, owner *ent.Owner, name string, dependencyType schema.DependencyType) ([]*ent.Materialization, error)
GetMaterializationsBySourceType(ctx context.Context, owner *ent.Owner, sourceType materialization.SourceType) ([]*ent.Materialization, error)
ListMaterializations(ctx context.Context, owner *ent.Owner, searchTerm string, pageSize int, offset int) ([]*ent.Materialization, error)
UpdateDataVersion(ctx context.Context, materialization *ent.Materialization, newDataVersion int64) (*ent.Materialization, error)
IncrementVersion(ctx context.Context, materialization *ent.Materialization) (*ent.Materialization, error)
Expand Down
55 changes: 29 additions & 26 deletions wren/internal/materialization_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/ent/materializationdependency"
"github.com/kaskada-ai/kaskada/wren/ent/predicate"
"github.com/kaskada-ai/kaskada/wren/ent/schema"
)

Expand Down Expand Up @@ -57,6 +56,7 @@ func (c *materializationClient) CreateMaterialization(ctx context.Context, owner
SetAnalysis(newMaterialization.Analysis).
SetDataVersionID(newMaterialization.DataVersionID).
SetVersion(newMaterialization.Version).
SetSourceType(newMaterialization.SourceType).
Save(ctx)

if err != nil {
Expand Down Expand Up @@ -155,31 +155,6 @@ func (c *materializationClient) GetMaterializationByName(ctx context.Context, ow
return materialization, nil
}

func (c *materializationClient) GetMaterializationsFromNames(ctx context.Context, owner *ent.Owner, names []string) (map[string]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetMaterializationsFromNames").
Logger()

predicates := make([]predicate.Materialization, 0, len(names))

for _, name := range names {
predicates = append(predicates, materialization.Name(name))
}

materializations, err := owner.QueryMaterializations().Where(materialization.Or(predicates...)).All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue getting materializations")
return nil, err
}

materializationMap := map[string]*ent.Materialization{}

for _, materialization := range materializations {
materializationMap[materialization.Name] = materialization
}

return materializationMap, nil
}

func (c *materializationClient) GetMaterializationsWithDependency(ctx context.Context, owner *ent.Owner, name string, dependencyType schema.DependencyType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Expand All @@ -206,6 +181,34 @@ func (c *materializationClient) GetMaterializationsWithDependency(ctx context.Co
return materializations, nil
}

func (c *materializationClient) GetMaterializationsBySourceType(ctx context.Context, owner *ent.Owner, sourceType materialization.SourceType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetMaterializationsBySourceType").
Str("source_type", string(sourceType)).
Logger()

materializations, err := owner.QueryMaterializations().Where(materialization.SourceTypeEQ(sourceType)).WithOwner().All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue listing materializations")
return nil, err
}
return materializations, nil
}

func (c *materializationClient) GetAllMaterializationsBySourceType(ctx context.Context, sourceType materialization.SourceType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetAllMaterializationsBySourceType").
Str("source_type", string(sourceType)).
Logger()

materializations, err := c.entClient.Materialization.Query().Where(materialization.SourceTypeEQ(sourceType)).All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue listing materializations")
return nil, err
}
return materializations, nil
}

func (c *materializationClient) ListMaterializations(ctx context.Context, owner *ent.Owner, searchTerm string, pageSize int, offset int) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.ListMaterializations").
Expand Down
12 changes: 12 additions & 0 deletions wren/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ func main() {
return nil
})

// peridocally reconcile materializations to ensure the ones that are supposed to be running are running
g.Go(func() error {
for {
time.Sleep(5 * time.Second)
epinzur marked this conversation as resolved.
Show resolved Hide resolved

err := materializationManager.ReconcileMaterializations(ctx)
if err != nil {
return err
}
}
})

// wait until shutdown signal occurs
select {
case <-interrupt:
Expand Down
Loading