Skip to content

Commit

Permalink
added long-running materialization support to wren
Browse files Browse the repository at this point in the history
  • Loading branch information
epinzur committed May 25, 2023
1 parent e0525e9 commit d638b79
Show file tree
Hide file tree
Showing 10 changed files with 570 additions and 61 deletions.
9 changes: 7 additions & 2 deletions wren/compute/compute_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kaskada-ai/kaskada/wren/client"
"github.com/kaskada-ai/kaskada/wren/customerrors"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/internal"
"github.com/kaskada-ai/kaskada/wren/utils"
)
Expand All @@ -39,7 +40,9 @@ type ComputeManager interface {
InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error)
SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot)

// materialization related
// Runs all existing file-based materializations for the given owner
// Note: this exists in the ComputeManager interface instead of the MaterializationManager interface because
// it runs materializations in a similar way to InitiateQuery
RunMaterializations(ctx context.Context, owner *ent.Owner)
}

Expand Down Expand Up @@ -201,6 +204,8 @@ func (m *computeManager) SaveComputeSnapshots(queryContext *QueryContext, comput
}

// Runs all saved materializations on current data inside a go-routine that attempts to finish before shutdown
// TODO: After sparrow supports long-running materializations from file-based sources
// remove all the code related to this method
func (m *computeManager) RunMaterializations(requestCtx context.Context, owner *ent.Owner) {
m.errGroup.Go(func() error { return m.processMaterializations(requestCtx, owner) })
}
Expand Down Expand Up @@ -229,7 +234,7 @@ func (m *computeManager) processMaterializations(requestCtx context.Context, own
subLogger.Error().Err(err).Msg("issue getting current prepare cache buster")
}

materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner)
materializations, err := m.materializationClient.GetMaterializationsBySourceType(ctx, owner, materialization.SourceTypeFiles)
if err != nil {
subLogger.Error().Err(err).Msg("error listing materializations")
return nil
Expand Down
26 changes: 24 additions & 2 deletions wren/compute/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (
)

type FileManager interface {
// metadata related

// GetFileSchema returns the schema of the file at the given URI.
GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error)

// GetPulsarSchema returns the schema of the pulsar topic
GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha.PulsarConfig) (*v1alpha.Schema, error)
}

type fileManager struct {
Expand All @@ -27,7 +31,7 @@ func NewFileManager(computeClients *client.ComputeClients) FileManager {
}

func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) {
subLogger := log.Ctx(ctx).With().Str("method", "manager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger()
subLogger := log.Ctx(ctx).With().Str("method", "fileManager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger()
// Send the metadata request to the FileService

var sourceData *v1alpha.SourceData
Expand All @@ -42,6 +46,24 @@ func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.File
sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}}
}

return m.getSchema(ctx, sourceData)
}

func (m *fileManager) GetPulsarSchema(ctx context.Context, pulsarConfig *v1alpha.PulsarConfig) (*v1alpha.Schema, error) {
sourceData := &v1alpha.SourceData{
Source: &v1alpha.SourceData_PulsarSubscription{
PulsarSubscription: &v1alpha.PulsarSubscription{
Config: pulsarConfig,
},
},
}

return m.getSchema(ctx, sourceData)
}

func (m *fileManager) getSchema(ctx context.Context, sourceData *v1alpha.SourceData) (*v1alpha.Schema, error) {
subLogger := log.Ctx(ctx).With().Str("method", "fileManager.getSchema").Logger()

fileClient := m.computeClients.NewFileServiceClient(ctx)
defer fileClient.Close()

Expand Down
88 changes: 84 additions & 4 deletions wren/compute/materialization_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kaskada-ai/kaskada/wren/client"
"github.com/kaskada-ai/kaskada/wren/customerrors"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/internal"
"github.com/rs/zerolog/log"
)
Expand All @@ -22,6 +23,9 @@ type MaterializationManager interface {

// GetMaterializationStatus gets the status of a materialization on the compute backend
GetMaterializationStatus(ctx context.Context, materializationID string) (*v1alpha.ProgressInformation, error)

// ReconcileMaterializations reconciles the materializations in the database with the materializations on the compute backend
ReconcileMaterializations(ctx context.Context) error
}

type materializationManager struct {
Expand All @@ -30,14 +34,19 @@ type materializationManager struct {
computeClients client.ComputeClients
kaskadaTableClient internal.KaskadaTableClient
materializationClient internal.MaterializationClient

// this is used to keep track of which materializations are currently running on the compute backend
// so that if a materialization is deleted from the database, we can stop it the next time we reconcile
runningMaterializations map[string]interface{}
}

func NewMaterializationManager(compileManager *CompileManager, computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, materializationClient *internal.MaterializationClient) MaterializationManager {
return &materializationManager{
CompileManager: *compileManager,
computeClients: *computeClients,
kaskadaTableClient: *kaskadaTableClient,
materializationClient: *materializationClient,
CompileManager: *compileManager,
computeClients: *computeClients,
kaskadaTableClient: *kaskadaTableClient,
materializationClient: *materializationClient,
runningMaterializations: map[string]interface{}{},
}
}

Expand Down Expand Up @@ -110,6 +119,77 @@ func (m *materializationManager) GetMaterializationStatus(ctx context.Context, m
return statusResponse.Progress, nil
}

// ReconcileMaterializations reconciles the materializations in the database with the materializations on the compute backend
// After running this function, all materializations in the database will be running on the compute backend
// and all deleted materializations will be stopped
func (m *materializationManager) ReconcileMaterializations(ctx context.Context) error {
subLogger := log.Ctx(ctx).With().Str("method", "manager.ReconcileMaterializations").Logger()

allStreamMaterializations, err := m.materializationClient.GetAllMaterializationsBySourceType(ctx, materialization.SourceTypeStreams)
if err != nil {
subLogger.Error().Err(err).Msg("failed to get all stream materializations")
return err
}

// find all materializations in the database and start any that are not running
// we keep a map of materialization_name=>nil to keep track of which materializations are running
newRunningMaterializations := make(map[string]interface{})
for _, streamMaterialization := range allStreamMaterializations {
materializationID := streamMaterialization.ID.String()
owner := streamMaterialization.Edges.Owner

isRunning := false
// check to see if the materialization was running in the previous iteration
if _, found := m.runningMaterializations[materializationID]; found {
//verify that the materialization is still running
progressInfo, err := m.GetMaterializationStatus(ctx, materializationID)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to get materialization status")
}
isRunning = progressInfo != nil
}

if isRunning {
newRunningMaterializations[materializationID] = nil
} else {
log.Debug().Str("id", materializationID).Msg("found materialization that is not running, attempting to start it")

compileResp, _, err := m.CompileEntMaterialization(ctx, owner, streamMaterialization)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("issue compiling materialization")
} else {
err = m.StartMaterialization(ctx, owner, materializationID, compileResp, streamMaterialization.Destination)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to start materialization")
} else {
log.Debug().Str("id", materializationID).Msg("started materialization")
newRunningMaterializations[materializationID] = nil
}
}
}
}

// find all materializations that were running the previous time this method was called
// but no longer exist in the database. stop any that are found. this can happen due to a race
// condition where a materialization is deleted from the database after this method has started
// but before it has finished. this method is called periodically so it will eventually stop
// the materialization.
for materializationID := range m.runningMaterializations {
if _, found := newRunningMaterializations[materializationID]; !found {
log.Debug().Str("id", materializationID).Msg("found materialization that no longer exists, attempting to stop it")
err := m.StopMaterialization(ctx, materializationID)
if err != nil {
log.Error().Err(err).Str("id", materializationID).Msg("failed to stop materialization")
newRunningMaterializations[materializationID] = nil
} else {
log.Debug().Str("id", materializationID).Msg("stopped materialization")
}
}
}
m.runningMaterializations = newRunningMaterializations
return nil
}

func (m *materializationManager) getMaterializationTables(ctx context.Context, owner *ent.Owner, compileResp *v1alpha.CompileResponse) ([]*v1alpha.ComputeTable, error) {
subLogger := log.Ctx(ctx).With().Str("method", "materializationManager.getMaterializationTables").Logger()

Expand Down
4 changes: 3 additions & 1 deletion wren/internal/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha"
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/kaskadafile"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/ent/predicate"
"github.com/kaskada-ai/kaskada/wren/ent/schema"
"github.com/kaskada-ai/kaskada/wren/property"
Expand Down Expand Up @@ -80,10 +81,11 @@ type MaterializationClient interface {
CreateMaterialization(ctx context.Context, owner *ent.Owner, newMaterialization *ent.Materialization, dependencies []*ent.MaterializationDependency) (*ent.Materialization, error)
DeleteMaterialization(ctx context.Context, owner *ent.Owner, view *ent.Materialization) error
GetAllMaterializations(ctx context.Context, owner *ent.Owner) ([]*ent.Materialization, error)
GetAllMaterializationsBySourceType(ctx context.Context, sourceType materialization.SourceType) ([]*ent.Materialization, error)
GetMaterialization(ctx context.Context, owner *ent.Owner, id uuid.UUID) (*ent.Materialization, error)
GetMaterializationByName(ctx context.Context, owner *ent.Owner, name string) (*ent.Materialization, error)
GetMaterializationsFromNames(ctx context.Context, owner *ent.Owner, names []string) (map[string]*ent.Materialization, error)
GetMaterializationsWithDependency(ctx context.Context, owner *ent.Owner, name string, dependencyType schema.DependencyType) ([]*ent.Materialization, error)
GetMaterializationsBySourceType(ctx context.Context, owner *ent.Owner, sourceType materialization.SourceType) ([]*ent.Materialization, error)
ListMaterializations(ctx context.Context, owner *ent.Owner, searchTerm string, pageSize int, offset int) ([]*ent.Materialization, error)
UpdateDataVersion(ctx context.Context, materialization *ent.Materialization, newDataVersion int64) (*ent.Materialization, error)
IncrementVersion(ctx context.Context, materialization *ent.Materialization) (*ent.Materialization, error)
Expand Down
55 changes: 29 additions & 26 deletions wren/internal/materialization_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/kaskada-ai/kaskada/wren/ent"
"github.com/kaskada-ai/kaskada/wren/ent/materialization"
"github.com/kaskada-ai/kaskada/wren/ent/materializationdependency"
"github.com/kaskada-ai/kaskada/wren/ent/predicate"
"github.com/kaskada-ai/kaskada/wren/ent/schema"
)

Expand Down Expand Up @@ -57,6 +56,7 @@ func (c *materializationClient) CreateMaterialization(ctx context.Context, owner
SetAnalysis(newMaterialization.Analysis).
SetDataVersionID(newMaterialization.DataVersionID).
SetVersion(newMaterialization.Version).
SetSourceType(newMaterialization.SourceType).
Save(ctx)

if err != nil {
Expand Down Expand Up @@ -155,31 +155,6 @@ func (c *materializationClient) GetMaterializationByName(ctx context.Context, ow
return materialization, nil
}

func (c *materializationClient) GetMaterializationsFromNames(ctx context.Context, owner *ent.Owner, names []string) (map[string]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetMaterializationsFromNames").
Logger()

predicates := make([]predicate.Materialization, 0, len(names))

for _, name := range names {
predicates = append(predicates, materialization.Name(name))
}

materializations, err := owner.QueryMaterializations().Where(materialization.Or(predicates...)).All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue getting materializations")
return nil, err
}

materializationMap := map[string]*ent.Materialization{}

for _, materialization := range materializations {
materializationMap[materialization.Name] = materialization
}

return materializationMap, nil
}

func (c *materializationClient) GetMaterializationsWithDependency(ctx context.Context, owner *ent.Owner, name string, dependencyType schema.DependencyType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Expand All @@ -206,6 +181,34 @@ func (c *materializationClient) GetMaterializationsWithDependency(ctx context.Co
return materializations, nil
}

func (c *materializationClient) GetMaterializationsBySourceType(ctx context.Context, owner *ent.Owner, sourceType materialization.SourceType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetMaterializationsBySourceType").
Str("source_type", string(sourceType)).
Logger()

materializations, err := owner.QueryMaterializations().Where(materialization.SourceTypeEQ(sourceType)).WithOwner().All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue listing materializations")
return nil, err
}
return materializations, nil
}

func (c *materializationClient) GetAllMaterializationsBySourceType(ctx context.Context, sourceType materialization.SourceType) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.GetAllMaterializationsBySourceType").
Str("source_type", string(sourceType)).
Logger()

materializations, err := c.entClient.Materialization.Query().Where(materialization.SourceTypeEQ(sourceType)).All(ctx)
if err != nil {
subLogger.Error().Err(err).Msg("issue listing materializations")
return nil, err
}
return materializations, nil
}

func (c *materializationClient) ListMaterializations(ctx context.Context, owner *ent.Owner, searchTerm string, pageSize int, offset int) ([]*ent.Materialization, error) {
subLogger := log.Ctx(ctx).With().
Str("method", "materializationClient.ListMaterializations").
Expand Down
12 changes: 12 additions & 0 deletions wren/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ func main() {
return nil
})

// peridocally reconcile materializations to ensure the ones that are supposed to be running are running
g.Go(func() error {
for {
time.Sleep(5 * time.Second)

err := materializationManager.ReconcileMaterializations(ctx)
if err != nil {
return err
}
}
})

// wait until shutdown signal occurs
select {
case <-interrupt:
Expand Down
Loading

0 comments on commit d638b79

Please sign in to comment.