diff --git a/wren/.mockery.yaml b/wren/.mockery.yaml index 1d91b06bf..73593a41b 100644 --- a/wren/.mockery.yaml +++ b/wren/.mockery.yaml @@ -12,3 +12,19 @@ packages: MaterializationClient: OwnerClient: PrepareJobClient: + github.com/kaskada-ai/kaskada/wren/compute: + config: + dir: "{{.InterfaceDir}}" + interfaces: + CompileManager: + ComputeManager: + FileManager: + MaterializationManager: + PrepareManager: + github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha: + config: + dir: "{{.InterfaceDir}}" + interfaces: + ComputeServiceClient: + FileServiceClient: + PreparationServiceClient: diff --git a/wren/client/compute_clients.go b/wren/client/compute_clients.go index a192189a8..51f22fcb6 100644 --- a/wren/client/compute_clients.go +++ b/wren/client/compute_clients.go @@ -6,16 +6,22 @@ import ( "github.com/rs/zerolog/log" ) +type ComputeClients interface { + NewFileServiceClient(ctx context.Context) FileServiceClient + NewPrepareServiceClient(ctx context.Context) PrepareServiceClient + NewComputeServiceClient(ctx context.Context) ComputeServiceClient +} + // ComputeClients is the container to hold client for communicating with compute services -type ComputeClients struct { +type computeClients struct { fileServiceConfig *HostConfig prepareServiceConfig *HostConfig computeServiceConfig *HostConfig } // CreateComputeClients initializes the computeClients -func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) *ComputeClients { - return &ComputeClients{ +func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) ComputeClients { + return &computeClients{ fileServiceConfig: fileServiceConfig, prepareServiceConfig: prepareServiceConfig, computeServiceConfig: computeServiceConfig, @@ -23,7 +29,7 @@ func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *H } // FileServiceClient creates a new FileServiceClient from the configuration and context -func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClient { +func (c *computeClients) NewFileServiceClient(ctx context.Context) FileServiceClient { conn, err := connection(ctx, c.fileServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.fileServiceConfig).Msg("unable to dial FileServiceClient") @@ -35,7 +41,7 @@ func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClien } // PrepareServiceClient creates a new PrepareServiceClient from the configuration and context -func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServiceClient { +func (c *computeClients) NewPrepareServiceClient(ctx context.Context) PrepareServiceClient { conn, err := connection(ctx, c.prepareServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.prepareServiceConfig).Msg("unable to dial PrepareServiceClient") @@ -47,7 +53,7 @@ func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServic } // ComputeServiceClient creates a new ComputeServiceClient from the configuration and context -func (c *ComputeClients) ComputeServiceClient(ctx context.Context) ComputeServiceClient { +func (c *computeClients) NewComputeServiceClient(ctx context.Context) ComputeServiceClient { conn, err := connection(ctx, c.computeServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.computeServiceConfig).Msg("unable to dial ComputeServiceClient") diff --git a/wren/compute/compile_manager.go b/wren/compute/compile_manager.go new file mode 100644 index 000000000..14b2383e9 --- /dev/null +++ b/wren/compute/compile_manager.go @@ -0,0 +1,274 @@ +package compute + +import ( + "context" + "fmt" + "strings" + "time" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + v2alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v2alpha" + "github.com/kaskada-ai/kaskada/wren/client" + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/rs/zerolog/log" + "golang.org/x/exp/maps" +) + +type compileRequest struct { + Expression string + Views []*v1alpha.WithView + SliceRequest *v1alpha.SliceRequest + ResultBehavior v1alpha.Query_ResultBehavior +} + +type compileOptions struct { + IsFormula bool + IsExperimental bool +} + +type CompileManager interface { + CompileEntMaterialization(ctx context.Context, owner *ent.Owner, materialization *ent.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) + CompileV1Materialization(ctx context.Context, owner *ent.Owner, materialization *v1alpha.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) + CompileV1Query(ctx context.Context, owner *ent.Owner, query *v1alpha.Query, queryOptions *v1alpha.QueryOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error) + CompileV2Query(ctx context.Context, owner *ent.Owner, expression string, views []*v2alpha.QueryView, queryConfig *v2alpha.QueryConfig) (*v1alpha.CompileResponse, []*v1alpha.View, error) + CompileV1View(ctx context.Context, owner *ent.Owner, view *v1alpha.View) (*v1alpha.CompileResponse, error) +} + +type compileManager struct { + computeClients client.ComputeClients + kaskadaTableClient internal.KaskadaTableClient + kaskadaViewClient internal.KaskadaViewClient +} + +func NewCompileManager(computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient) CompileManager { + return &compileManager{ + computeClients: *computeClients, + kaskadaTableClient: *kaskadaTableClient, + kaskadaViewClient: *kaskadaViewClient, + } +} + +func (m *compileManager) CompileEntMaterialization(ctx context.Context, owner *ent.Owner, materialization *ent.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) { + compileRequest := &compileRequest{ + Expression: materialization.Expression, + Views: materialization.WithViews.Views, + SliceRequest: materialization.SliceRequest, + ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS, + } + + compileOptions := &compileOptions{ + IsFormula: false, + IsExperimental: false, + } + return m.compile(ctx, owner, compileRequest, compileOptions) +} + +func (m *compileManager) CompileV1Materialization(ctx context.Context, owner *ent.Owner, materialization *v1alpha.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) { + compileRequest := &compileRequest{ + Expression: materialization.Expression, + Views: materialization.WithViews, + SliceRequest: materialization.Slice, + ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS, + } + + compileOptions := &compileOptions{ + IsFormula: false, + IsExperimental: false, + } + return m.compile(ctx, owner, compileRequest, compileOptions) +} + +func (m *compileManager) CompileV1Query(ctx context.Context, owner *ent.Owner, query *v1alpha.Query, queryOptions *v1alpha.QueryOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error) { + compileRequest := &compileRequest{ + Expression: query.Expression, + Views: []*v1alpha.WithView{}, + SliceRequest: query.Slice, + ResultBehavior: query.ResultBehavior, + } + + compileOptions := &compileOptions{ + IsFormula: false, + IsExperimental: queryOptions != nil && queryOptions.ExperimentalFeatures, + } + + return m.compile(ctx, owner, compileRequest, compileOptions) +} + +func (m *compileManager) CompileV2Query(ctx context.Context, owner *ent.Owner, expression string, views []*v2alpha.QueryView, queryConfig *v2alpha.QueryConfig) (*v1alpha.CompileResponse, []*v1alpha.View, error) { + + compileRequest := &compileRequest{ + Expression: expression, + Views: make([]*v1alpha.WithView, len(views)), + SliceRequest: queryConfig.Slice, + } + + for i, view := range views { + compileRequest.Views[i] = &v1alpha.WithView{ + Name: view.ViewName, + Expression: view.Expression, + } + } + + switch queryConfig.ResultBehavior.ResultBehavior.(type) { + case *v2alpha.ResultBehavior_AllResults: + compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS + case *v2alpha.ResultBehavior_FinalResults: + compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS + case *v2alpha.ResultBehavior_FinalResultsAtTime: + compileRequest.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME + default: + subLogger := log.Ctx(ctx).With().Str("method", "compileManager.CompileV2Query").Logger() + subLogger.Error().Str("resultBehavior", fmt.Sprintf("%T", queryConfig.ResultBehavior.ResultBehavior)).Msg("unexpected resultBehavior") + return nil, nil, fmt.Errorf("unexpected resultBehavior: %T", queryConfig.ResultBehavior.ResultBehavior) + } + + incrementalQueryExperiment := false + for _, experimentalFeature := range queryConfig.ExperimentalFeatures { + switch { + case strings.EqualFold("incremental", experimentalFeature): + incrementalQueryExperiment = true + } + } + + compileOptions := &compileOptions{ + IsFormula: false, + IsExperimental: incrementalQueryExperiment, + } + return m.compile(ctx, owner, compileRequest, compileOptions) +} + +func (m *compileManager) CompileV1View(ctx context.Context, owner *ent.Owner, view *v1alpha.View) (*v1alpha.CompileResponse, error) { + compileRequest := &compileRequest{ + Expression: view.Expression, + Views: []*v1alpha.WithView{}, + ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS, + } + + compileOptions := &compileOptions{ + IsFormula: true, + IsExperimental: false, + } + + compileResponse, _, err := m.compile(ctx, owner, compileRequest, compileOptions) + return compileResponse, err +} + +func (m *compileManager) compile(ctx context.Context, owner *ent.Owner, request *compileRequest, options *compileOptions) (*v1alpha.CompileResponse, []*v1alpha.View, error) { + subLogger := log.Ctx(ctx).With().Str("method", "compileManager.compileQuery").Logger() + + var perEntityBehavior v1alpha.PerEntityBehavior + switch request.ResultBehavior { + case v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED: + perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_UNSPECIFIED + case v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS: + perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_ALL + case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS: + perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL + case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME: + perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL_AT_TIME + default: + subLogger.Error().Str("resultBehavior", request.ResultBehavior.String()).Msg("unexpected resultBehavior") + return nil, nil, fmt.Errorf("unexpected resultBehavior: %s", request.ResultBehavior.String()) + } + + formulaMap, err := m.getFormulaMap(ctx, owner, request.Views) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting formulas") + return nil, nil, err + } + + computeTables := []*v1alpha.ComputeTable{} + kaskadaTables, err := m.kaskadaTableClient.GetAllKaskadaTables(ctx, owner) + if err != nil { + subLogger.Error().Err(err).Msg("error getting all tables") + return nil, nil, err + } + + for _, kaskadaTable := range kaskadaTables { + // if merged schema not set, table still contains no data + if kaskadaTable.MergedSchema != nil { + computeTables = append(computeTables, convertKaskadaTableToComputeTable(kaskadaTable)) + } + } + + compileRequest := &v1alpha.CompileRequest{ + Experimental: options.IsExperimental, + FeatureSet: &v1alpha.FeatureSet{ + Formulas: maps.Values(formulaMap), + Query: request.Expression, + }, + PerEntityBehavior: perEntityBehavior, + SliceRequest: request.SliceRequest, + Tables: computeTables, + } + + if options.IsFormula { + compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_FORMULA + } else { + compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE + } + + computeClient := m.computeClients.NewComputeServiceClient(ctx) + defer computeClient.Close() + + subLogger.Info().Interface("request", compileRequest).Msg("sending compile request") + compileTimeoutCtx, compileTimeoutCancel := context.WithTimeout(ctx, time.Second*compileTimeoutSeconds) + defer compileTimeoutCancel() + + compileResponse, err := computeClient.Compile(compileTimeoutCtx, compileRequest) + subLogger.Info().Err(err). + Interface("fenl_diagnostics", compileResponse.FenlDiagnostics). + Bool("incremental_enabled", compileResponse.IncrementalEnabled). + Strs("free_names", compileResponse.FreeNames). + Strs("missing_names", compileResponse.MissingNames). + Interface("plan_hash", compileResponse.PlanHash). + Interface("result_type", compileResponse.ResultType). + Interface("slices", compileResponse.TableSlices).Msg("received compile response") + if err != nil { + return nil, nil, err + } + + views := []*v1alpha.View{} + for _, freeName := range compileResponse.FreeNames { + if formula, ok := formulaMap[freeName]; ok { + views = append(views, &v1alpha.View{ + ViewName: formula.Name, + Expression: formula.Formula, + }) + } + } + + return compileResponse, views, nil +} + +// returns map of formulaName to formula, including all persisted views in the owner, and all requested views +// if a requestView and a persisted view have the same name, the requestView will be used +func (m *compileManager) getFormulaMap(ctx context.Context, owner *ent.Owner, requestViews []*v1alpha.WithView) (map[string]*v1alpha.Formula, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.getFormulas").Logger() + persistedViews, err := m.kaskadaViewClient.GetAllKaskadaViews(ctx, owner) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting persisted views") + return nil, err + } + + formulaMap := map[string]*v1alpha.Formula{} + + for _, persistedView := range persistedViews { + formulaMap[persistedView.Name] = &v1alpha.Formula{ + Name: persistedView.Name, + Formula: persistedView.Expression, + SourceLocation: fmt.Sprintf("Persisted View: %s", persistedView.Name), + } + } + + for _, requestView := range requestViews { + formulaMap[requestView.Name] = &v1alpha.Formula{ + Name: requestView.Name, + Formula: requestView.Expression, + SourceLocation: fmt.Sprintf("Requested View: %s", requestView.Name), + } + } + + return formulaMap, nil +} diff --git a/wren/compute/compile_manager_test.go b/wren/compute/compile_manager_test.go new file mode 100644 index 000000000..c3b8ebfef --- /dev/null +++ b/wren/compute/compile_manager_test.go @@ -0,0 +1,195 @@ +package compute + +import ( + "context" + + "github.com/google/uuid" + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/stretchr/testify/mock" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("CompileManager", func() { + + var ( + ctx context.Context + owner *ent.Owner + + defaultUUID = uuid.MustParse("00000000-0000-0000-0000-000000000000") + + mockComputeServiceClient *v1alpha.MockComputeServiceClient + mockFileServiceClient *v1alpha.MockFileServiceClient + mockPreparationServiceClient *v1alpha.MockPreparationServiceClient + mockKaskadaTableClient *internal.MockKaskadaTableClient + mockKaskadaViewClient *internal.MockKaskadaViewClient + + objectStoreDestination = &v1alpha.Destination{ + Destination: &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: v1alpha.FileType_FILE_TYPE_CSV, + OutputPrefixUri: "gs://some-bucket/some-prefix", + }, + }, + } + + sliceRequest = &v1alpha.SliceRequest{ + Slice: &v1alpha.SliceRequest_Percent{ + Percent: &v1alpha.SliceRequest_PercentSlice{ + Percent: 42, + }, + }, + } + + persistedViews = []*ent.KaskadaView{ + { + Name: "persisted_view", + Expression: "persisted_view_expression", + }, + { + Name: "overwritten_view", + Expression: "overwritten_view_expression", + }, + } + + persistedTables = []*ent.KaskadaTable{ + { + ID: defaultUUID, + Name: "persisted_table1", + MergedSchema: &v1alpha.Schema{}, + }, + { + ID: defaultUUID, + Name: "persisted_table2", + MergedSchema: &v1alpha.Schema{}, + }, + } + ) + + BeforeEach(func() { + ctx = context.Background() + owner = &ent.Owner{} + + mockKaskadaTableClient = internal.NewMockKaskadaTableClient(GinkgoT()) + mockKaskadaViewClient = internal.NewMockKaskadaViewClient(GinkgoT()) + mockComputeServiceClient = v1alpha.NewMockComputeServiceClient(GinkgoT()) + mockFileServiceClient = v1alpha.NewMockFileServiceClient(GinkgoT()) + mockPreparationServiceClient = v1alpha.NewMockPreparationServiceClient(GinkgoT()) + + }) + + Context("CompileEntMaterialization", func() { + It("should compile a materialization", func() { + mockKaskadaViewClient.EXPECT().GetAllKaskadaViews(ctx, owner).Return(persistedViews, nil) + + mockKaskadaTableClient.EXPECT().GetAllKaskadaTables(ctx, owner).Return(persistedTables, nil) + + entMaterialization := &ent.Materialization{ + Name: "ent_materialization", + Expression: "ent_materialization_expression", + Destination: objectStoreDestination, + SliceRequest: sliceRequest, + WithViews: &v1alpha.WithViews{ + Views: []*v1alpha.WithView{ + { + Name: "with_view", + Expression: "with_view_expression", + }, + { + Name: "overwritten_view", + Expression: "overwritten_view_expression2", + }, + }, + }, + } + + computeTables := []*v1alpha.ComputeTable{ + { + Config: &v1alpha.TableConfig{ + Name: "persisted_table1", + Uuid: defaultUUID.String(), + }, + Metadata: &v1alpha.TableMetadata{ + Schema: &v1alpha.Schema{}, + }, + FileSets: []*v1alpha.ComputeTable_FileSet{}, + }, + { + Config: &v1alpha.TableConfig{ + Name: "persisted_table2", + Uuid: defaultUUID.String(), + }, + Metadata: &v1alpha.TableMetadata{ + Schema: &v1alpha.Schema{}, + }, + FileSets: []*v1alpha.ComputeTable_FileSet{}, + }, + } + + formulas := []*v1alpha.Formula{ + { + Name: "persisted_view", + Formula: "persisted_view_expression", + SourceLocation: "Persisted View: persisted_view", + }, + { + Name: "overwritten_view", + Formula: "overwritten_view_expression2", + SourceLocation: "Requested View: overwritten_view", + }, + { + Name: "with_view", + Formula: "with_view_expression", + SourceLocation: "Requested View: with_view", + }, + } + + compileRequest := &v1alpha.CompileRequest{ + Experimental: false, + FeatureSet: &v1alpha.FeatureSet{ + Formulas: formulas, + Query: entMaterialization.Expression, + }, + PerEntityBehavior: v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL, + SliceRequest: sliceRequest, + Tables: computeTables, + ExpressionKind: v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE, + } + + compileResponse := &v1alpha.CompileResponse{ + FreeNames: []string{"with_view", "overwritten_view", "persisted_table1"}, + } + + mockComputeServiceClient.EXPECT().Compile(mock.Anything, compileRequest).Return(compileResponse, nil) + + computeClients := newMockComputeServiceClients(mockFileServiceClient, mockPreparationServiceClient, mockComputeServiceClient) + compManager := &compileManager{ + computeClients: computeClients, + kaskadaTableClient: mockKaskadaTableClient, + kaskadaViewClient: mockKaskadaViewClient, + } + + compileResponse, views, err := compManager.CompileEntMaterialization(ctx, owner, entMaterialization) + Expect(err).ToNot(HaveOccurred()) + Expect(compileResponse).ToNot(BeNil()) + Expect(views).ToNot(BeNil()) + + expectedViews := []*v1alpha.View{ + { + ViewName: "with_view", + Expression: "with_view_expression", + }, + { + ViewName: "overwritten_view", + Expression: "overwritten_view_expression2", + }, + } + + Expect(views).To(Equal(expectedViews)) + + }) + }) +}) diff --git a/wren/compute/compute_manager.go b/wren/compute/compute_manager.go new file mode 100644 index 000000000..3a7e842be --- /dev/null +++ b/wren/compute/compute_manager.go @@ -0,0 +1,364 @@ +package compute + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "math" + "path" + "strconv" + + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" + _ "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/wrapperspb" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/auth" + "github.com/kaskada-ai/kaskada/wren/client" + "github.com/kaskada-ai/kaskada/wren/customerrors" + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/kaskada-ai/kaskada/wren/utils" +) + +const ( + keyColumnName = "key" + compileTimeoutSeconds = 10 +) + +type ComputeManager interface { + CompileManager + + // execute related + GetOutputURI(owner *ent.Owner, planHash []byte) string + InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error) + SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot) + + // materialization related + RunMaterializations(ctx context.Context, owner *ent.Owner) +} + +type computeManager struct { + CompileManager + + prepareManager PrepareManager + computeClients client.ComputeClients + errGroup *errgroup.Group + dataTokenClient internal.DataTokenClient + kaskadaTableClient internal.KaskadaTableClient + materializationClient internal.MaterializationClient + objectStore client.ObjectStoreClient + tr trace.Tracer +} + +// NewComputeManager creates a new compute manager +func NewComputeManager(errGroup *errgroup.Group, compileManager *CompileManager, computeClients *client.ComputeClients, dataTokenClient *internal.DataTokenClient, kaskadaTableClient *internal.KaskadaTableClient, materializationClient *internal.MaterializationClient, objectStoreClient *client.ObjectStoreClient, prepareManager *PrepareManager) ComputeManager { + return &computeManager{ + CompileManager: *compileManager, + computeClients: *computeClients, + errGroup: errGroup, + dataTokenClient: *dataTokenClient, + kaskadaTableClient: *kaskadaTableClient, + materializationClient: *materializationClient, + objectStore: *objectStoreClient, + prepareManager: *prepareManager, + tr: otel.Tracer("ComputeManager"), + } +} + +type QueryResult struct { + DataTokenId string + Paths []string +} + +func (m *computeManager) GetOutputURI(owner *ent.Owner, planHash []byte) string { + subPath := path.Join("results", owner.ID.String(), base64.RawURLEncoding.EncodeToString(planHash)) + return m.objectStore.GetDataPathURI(subPath) +} + +func (m *computeManager) InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error) { + subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.InitiateQuery").Logger() + + executeRequest := &v1alpha.ExecuteRequest{ + ChangedSince: queryContext.changedSinceTime, + FinalResultTime: queryContext.finalResultTime, + Plan: queryContext.compileResp.Plan, + Limits: queryContext.limits, + Destination: queryContext.destination, + Tables: queryContext.GetComputeTables(), + } + + snapshotCacheBuster, err := m.getSnapshotCacheBuster(queryContext.ctx) + if err != nil { + return nil, nil, err + } + prepareCacheBuster, err := m.prepareManager.GetPrepareCacheBuster(queryContext.ctx) + if err != nil { + return nil, nil, err + } + + queryClient := m.computeClients.NewComputeServiceClient(queryContext.ctx) + + subLogger.Info().Bool("incremental_enabled", queryContext.compileResp.IncrementalEnabled).Bool("is_current_data_token", queryContext.isCurrentDataToken).Msg("Populating snapshot config if needed") + if queryContext.compileResp.IncrementalEnabled && queryContext.isCurrentDataToken && queryContext.compileResp.PlanHash != nil { + executeRequest.ComputeSnapshotConfig = &v1alpha.ComputeSnapshotConfig{ + OutputPrefix: ConvertURIForCompute(m.getComputeSnapshotDataURI(queryContext.owner, *snapshotCacheBuster, queryContext.compileResp.PlanHash.Hash, queryContext.dataToken.DataVersionID)), + } + subLogger.Info().Str("SnapshotPrefix", executeRequest.ComputeSnapshotConfig.OutputPrefix).Msg("Snapshot output prefix") + + bestSnapshot, err := m.kaskadaTableClient.GetBestComputeSnapshot(queryContext.ctx, queryContext.owner, queryContext.compileResp.PlanHash.Hash, *snapshotCacheBuster, queryContext.GetSlices(), *prepareCacheBuster) + if err != nil { + log.Warn().Err(err).Msg("issue getting existing snapshot. query will execute from scratch") + } else if bestSnapshot != nil { + executeRequest.ComputeSnapshotConfig.ResumeFrom = &wrapperspb.StringValue{Value: ConvertURIForCompute(bestSnapshot.Path)} + subLogger.Info().Str("ResumeFrom", executeRequest.ComputeSnapshotConfig.ResumeFrom.Value).Msg("Found snapshot to resume compute from") + } else { + subLogger.Info().Msg("no valid snapshot to resume from") + } + } + + subLogger.Info(). + Interface("compute_snapshot_config", executeRequest.ComputeSnapshotConfig). + Interface("tables", executeRequest.Tables). + Interface("limits", executeRequest.Limits). + Interface("final_result_time", executeRequest.FinalResultTime). + Interface("changed_since_time", executeRequest.ChangedSince). + Interface("destination", executeRequest.Destination).Msg("sending streaming query request to compute backend") + + executeClient, err := queryClient.Execute(queryContext.ctx, executeRequest) + if err != nil { + subLogger.Warn().Err(err).Msg("issue initiating streaming query compute request") + return nil, nil, customerrors.NewComputeError(reMapSparrowError(queryContext.ctx, err)) + } + return queryClient, executeClient, nil +} + +func (m *computeManager) runMaterializationQuery(queryContext *QueryContext) (*QueryResult, error) { + subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.runMaterializationQuery").Logger() + + client, stream, err := m.InitiateQuery(queryContext) + if err != nil { + return nil, err + } + defer client.Close() + + result := &QueryResult{ + DataTokenId: queryContext.dataToken.ID.String(), + Paths: []string{}, + } + + for { + // Start receiving streaming messages + res, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + subLogger.Warn().Err(err).Msg("issue receiving execute response") + return result, customerrors.NewComputeError(reMapSparrowError(queryContext.ctx, err)) + } + + // Note: this does nothing visible to the user at the moment, as + // running materializations are currently opaque to the user. + // Eventually, we'll want to provide useful metadata for all destination types. + if res.Destination != nil { + switch kind := res.Destination.Destination.(type) { + case *v1alpha.Destination_ObjectStore: + result.Paths = append(result.Paths, kind.ObjectStore.OutputPaths.Paths...) + } + } + + switch res.State { + case v1alpha.LongQueryState_LONG_QUERY_STATE_INITIAL: + subLogger.Info().Msg("received initial message from execute request") + case v1alpha.LongQueryState_LONG_QUERY_STATE_RUNNING: + subLogger.Info().Interface("progress", res.Progress).Msg("received progress from execute request") + case v1alpha.LongQueryState_LONG_QUERY_STATE_FINAL: + subLogger.Info().Bool("query_done", res.IsQueryDone).Msg("received final message from execute request") + default: + subLogger.Error().Str("state", res.State.String()).Msg("unexpected long query state") + } + + m.SaveComputeSnapshots(queryContext, res.ComputeSnapshots) + } + + subLogger.Info().Interface("result", result).Msg("final query result") + return result, nil +} + +func (m *computeManager) SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot) { + subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.SaveComputeSnapshots").Logger() + for _, computeSnapshot := range computeSnapshots { + if err := m.kaskadaTableClient.SaveComputeSnapshot(queryContext.ctx, queryContext.owner, computeSnapshot.PlanHash.Hash, computeSnapshot.SnapshotVersion, queryContext.dataToken, ConvertURIForManager(computeSnapshot.Path), computeSnapshot.MaxEventTime.AsTime(), queryContext.GetTableIDs()); err != nil { + subLogger.Error().Err(err).Str("data_token_id", queryContext.dataToken.ID.String()).Msg("issue saving compute snapshot") + } + } +} + +// Runs all saved materializations on current data inside a go-routine that attempts to finish before shutdown +func (m *computeManager) RunMaterializations(requestCtx context.Context, owner *ent.Owner) { + m.errGroup.Go(func() error { return m.processMaterializations(requestCtx, owner) }) +} + +// Runs all saved materializations on current data +// Note: any errors returned from this method will cause wren to start its safe-shutdown routine +// so be careful to only return errors that truly warrant a shutdown. +func (m *computeManager) processMaterializations(requestCtx context.Context, owner *ent.Owner) error { + ctx, cancel, err := auth.NewBackgroundContextWithAPIClient(requestCtx) + if err != nil { + log.Ctx(requestCtx).Error().Err(err).Msg("error creating background context for processing materializations") + return err + } + defer cancel() + + subLogger := log.Ctx(ctx).With().Str("method", "manager.processMaterializations").Logger() + + dataToken, err := m.dataTokenClient.GetCurrentDataToken(ctx, owner) + if err != nil { + subLogger.Error().Err(err).Msg("getting current data_token") + return nil + } + + prepareCacheBuster, err := m.prepareManager.GetPrepareCacheBuster(ctx) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting current prepare cache buster") + } + + materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner) + if err != nil { + subLogger.Error().Err(err).Msg("error listing materializations") + return nil + } + + for _, materialization := range materializations { + matLogger := subLogger.With().Str("materialization_name", materialization.Name).Logger() + + compileResp, _, err := m.CompileEntMaterialization(ctx, owner, materialization) + if err != nil { + matLogger.Error().Err(err).Msg("issue compiling materialization") + return nil + } + + if compileResp.Plan == nil { + matLogger.Error().Interface("missing_names", compileResp.MissingNames).Interface("diagnostics", compileResp.FenlDiagnostics).Msg("analysis determined the materialization is not executable. This is unexpected, as it was previously able to compile.") + return nil + } + + tables, err := m.prepareManager.PrepareTablesForCompute(ctx, owner, dataToken, compileResp.TableSlices) + if err != nil { + matLogger.Error().Err(err).Str("data_token_id", dataToken.ID.String()).Msg("getting tables for data_token") + return nil + } + + destination := &v1alpha.Destination{} + if materialization.Destination == nil { + matLogger.Error().Str("materialization", materialization.Name).Msg("materialization has no destination") + return nil + } + switch kind := materialization.Destination.Destination.(type) { + case *v1alpha.Destination_ObjectStore: + matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializating to object store") + + // Append the materialization version to the output prefix so result files + // for specific datatokens are grouped together. + outputPrefixUri := kind.ObjectStore.GetOutputPrefixUri() + outputPrefixUri = path.Join(outputPrefixUri, strconv.FormatInt(materialization.Version, 10)) + + destination.Destination = &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: kind.ObjectStore.GetFileType(), + OutputPrefixUri: outputPrefixUri, + }, + } + case *v1alpha.Destination_Pulsar: + matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializating to pulsar") + destination.Destination = kind + case *v1alpha.Destination_Redis: + matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializing to redis") + destination.Destination = kind + default: + matLogger.Error().Interface("type", kind).Str("when", "pre-compute").Msg("materialization output type not implemented") + return fmt.Errorf("materialization output type %s is not implemented", kind) + } + + queryContext, _ := GetNewQueryContext(ctx, owner, nil, compileResp, dataToken, nil, true, nil, destination, materialization.SliceRequest, tables) + + dataVersionID := materialization.DataVersionID + var minTimeInNewFiles int64 = math.MaxInt64 + for _, slice := range queryContext.GetSlices() { + minTime, err := m.kaskadaTableClient.GetMinTimeOfNewPreparedFiles(ctx, *prepareCacheBuster, slice, dataVersionID) + if ent.IsNotFound(err) { + continue + } + if err != nil { + return fmt.Errorf("could not get min time of new files for slice %s. Not materializing results", slice) + } + + if *minTime < minTimeInNewFiles { + minTimeInNewFiles = *minTime + } + } + + // Interpret the int64 (as nanos since epoch) as a proto timestamp + changedSinceTime := ×tamppb.Timestamp{ + Seconds: minTimeInNewFiles / 1_000_000_000, + Nanos: (int32)(minTimeInNewFiles % 1_000_000_000), + } + + // Remakes the query context with the changed since time. + // + // Not a great pattern, since we're recreating the context. If we're able + // to pull out the relevant code that converts `SlicePlans` to `SliceInfo` + // for the table client to get the min time of files, we can clean this up. + queryContext, queryContextCancel := GetNewQueryContext(ctx, owner, changedSinceTime, compileResp, dataToken, nil, true, nil, destination, materialization.SliceRequest, tables) + defer queryContextCancel() + + _, err = m.runMaterializationQuery(queryContext) + if err != nil { + matLogger.Error().Err(err).Msg("error computing materialization") + return nil + } + subLogger.Info().Msg("successfully exported materialization") + + // Update materializations that have run with the current data version id, so on + // subsequent runs only the updated values will be produced. + _, err = m.materializationClient.UpdateDataVersion(ctx, materialization, queryContext.dataToken.DataVersionID) + if err != nil { + matLogger.Error().Err(err).Int64("previousDataVersion", dataVersionID).Int64("newDataVersion", queryContext.dataToken.DataVersionID).Msg("error updating materialization with new data version") + return nil + } + // Update the version for this materialization. + _, err = m.materializationClient.IncrementVersion(ctx, materialization) + if err != nil { + matLogger.Error().Err(err).Msg("error updating materialization version") + return nil + } + } + + return nil +} + +// gets the current snapshot cache buster +func (m *computeManager) getSnapshotCacheBuster(ctx context.Context) (*int32, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.getSnapshotCacheBuster").Logger() + queryClient := m.computeClients.NewComputeServiceClient(ctx) + defer queryClient.Close() + + res, err := queryClient.GetCurrentSnapshotVersion(ctx, &v1alpha.GetCurrentSnapshotVersionRequest{}) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting snapshot_cache_buster") + return nil, err + } + return &res.SnapshotVersion, nil +} + +// returns s3://root/computeSnapshots//// +func (m *computeManager) getComputeSnapshotDataURI(owner *ent.Owner, snapshotCacheBuster int32, planHash []byte, dataVersion int64) string { + subPath := path.Join("computeSnapshots", strconv.Itoa(int(snapshotCacheBuster)), owner.ID.String(), base64.RawURLEncoding.EncodeToString(planHash), utils.Int64ToString(dataVersion)) + return m.objectStore.GetDataPathURI(subPath) +} diff --git a/wren/compute/compute_suite_test.go b/wren/compute/compute_suite_test.go new file mode 100644 index 000000000..e4e50523a --- /dev/null +++ b/wren/compute/compute_suite_test.go @@ -0,0 +1,73 @@ +package compute + +import ( + "context" + "testing" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + + "github.com/kaskada-ai/kaskada/wren/client" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCompute(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Compute Suite") +} + +type mockComputeServiceClients struct { + fileServiceClient v1alpha.MockFileServiceClient + preparationServiceClient v1alpha.MockPreparationServiceClient + computeServiceClient v1alpha.MockComputeServiceClient +} + +func newMockComputeServiceClients(fileServiceClient *v1alpha.MockFileServiceClient, preparationServiceClient *v1alpha.MockPreparationServiceClient, computeServiceClient *v1alpha.MockComputeServiceClient) client.ComputeClients { + return &mockComputeServiceClients{ + fileServiceClient: *fileServiceClient, + preparationServiceClient: *preparationServiceClient, + computeServiceClient: *computeServiceClient, + } +} + +func (m *mockComputeServiceClients) NewFileServiceClient(ctx context.Context) client.FileServiceClient { + return &mockFileServiceClient{ + MockFileServiceClient: m.fileServiceClient, + } +} + +func (m *mockComputeServiceClients) NewPrepareServiceClient(ctx context.Context) client.PrepareServiceClient { + return &mockPrepareServiceClient{ + MockPreparationServiceClient: m.preparationServiceClient, + } +} + +func (m *mockComputeServiceClients) NewComputeServiceClient(ctx context.Context) client.ComputeServiceClient { + return &mockComputeServiceClient{ + MockComputeServiceClient: m.computeServiceClient, + } +} + +type mockFileServiceClient struct { + v1alpha.MockFileServiceClient +} + +func (s mockFileServiceClient) Close() error { + return nil +} + +type mockPrepareServiceClient struct { + v1alpha.MockPreparationServiceClient +} + +func (s mockPrepareServiceClient) Close() error { + return nil +} + +type mockComputeServiceClient struct { + v1alpha.MockComputeServiceClient +} + +func (s mockComputeServiceClient) Close() error { + return nil +} diff --git a/wren/compute/file_manager.go b/wren/compute/file_manager.go new file mode 100644 index 000000000..bec9ada9f --- /dev/null +++ b/wren/compute/file_manager.go @@ -0,0 +1,65 @@ +package compute + +import ( + "context" + "fmt" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/client" + "github.com/kaskada-ai/kaskada/wren/ent/kaskadafile" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/rs/zerolog/log" +) + +type FileManager interface { + // metadata related + GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) +} + +type fileManager struct { + computeClients client.ComputeClients +} + +func NewFileManager(computeClients *client.ComputeClients) FileManager { + return &fileManager{ + computeClients: *computeClients, + } +} + +func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger() + // Send the metadata request to the FileService + + var sourceData *v1alpha.SourceData + + switch fileInput.GetType() { + case kaskadafile.TypeCsv: + sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_CsvPath{CsvPath: fileInput.GetURI()}} + case kaskadafile.TypeParquet: + sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}} + default: + subLogger.Warn().Msg("user didn't specifiy file type, defaulting to parquet for now, but will error in the future") + sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}} + } + + fileClient := m.computeClients.NewFileServiceClient(ctx) + defer fileClient.Close() + + metadataReq := &v1alpha.GetMetadataRequest{ + SourceData: sourceData, + } + + subLogger.Debug().Interface("request", metadataReq).Msg("sending get_metadata request to file service") + metadataRes, err := fileClient.GetMetadata(ctx, metadataReq) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting file schema from file_service") + return nil, err + } + + if metadataRes.SourceMetadata == nil { + subLogger.Error().Msg("issue getting file schema from file_service") + return nil, fmt.Errorf("issue getting file schema from file_service") + } + + return metadataRes.SourceMetadata.Schema, nil +} diff --git a/wren/compute/helpers.go b/wren/compute/helpers.go new file mode 100644 index 000000000..ed728f037 --- /dev/null +++ b/wren/compute/helpers.go @@ -0,0 +1,96 @@ +package compute + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/rs/zerolog/log" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/runtime/protoiface" + "google.golang.org/protobuf/types/known/timestamppb" + "google.golang.org/protobuf/types/known/wrapperspb" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" +) + +func ConvertURIForCompute(URI string) string { + return strings.TrimPrefix(URI, "file://") +} + +func ConvertURIForManager(URI string) string { + if strings.HasPrefix(URI, "/") { + return fmt.Sprintf("file://%s", URI) + } + return URI +} + +func reMapSparrowError(ctx context.Context, err error) error { + subLogger := log.Ctx(ctx).With().Str("method", "manager.reMapSparrowError").Logger() + inStatus, ok := status.FromError(err) + if !ok { + subLogger.Error().Msg("unexpected: compute error did not include error-status") + return err + } + outStatus := status.New(inStatus.Code(), inStatus.Message()) + + for _, detail := range inStatus.Details() { + switch t := detail.(type) { + case protoiface.MessageV1: + outStatus, err = outStatus.WithDetails(t) + if err != nil { + subLogger.Error().Err(err).Interface("detail", t).Msg("unable to add detail to re-mapped error details") + } + default: + subLogger.Error().Err(err).Interface("detail", t).Msg("unexpected: detail from compute doesn't implement the protoifam.MessageV1 interface") + } + } + return outStatus.Err() +} + +func convertKaskadaTableToComputeTable(kaskadaTable *ent.KaskadaTable) *v1alpha.ComputeTable { + if kaskadaTable == nil { + return nil + } + computeTable := &v1alpha.ComputeTable{ + Config: &v1alpha.TableConfig{ + Name: kaskadaTable.Name, + Uuid: kaskadaTable.ID.String(), + TimeColumnName: kaskadaTable.TimeColumnName, + GroupColumnName: kaskadaTable.EntityKeyColumnName, + Grouping: kaskadaTable.GroupingID, + Source: kaskadaTable.Source, + }, + Metadata: &v1alpha.TableMetadata{ + Schema: kaskadaTable.MergedSchema, + }, + FileSets: []*v1alpha.ComputeTable_FileSet{}, + } + + if kaskadaTable.SubsortColumnName != nil { + computeTable.Config.SubsortColumnName = &wrapperspb.StringValue{Value: *kaskadaTable.SubsortColumnName} + } + return computeTable +} + +func getComputePreparedFiles(prepareJobs []*ent.PrepareJob) []*v1alpha.PreparedFile { + computePreparedFiles := []*v1alpha.PreparedFile{} + for _, prepareJob := range prepareJobs { + for _, preparedFile := range prepareJob.Edges.PreparedFiles { + metadataPath := "" + if preparedFile.MetadataPath != nil { + metadataPath = *preparedFile.MetadataPath + } + computePreparedFiles = append(computePreparedFiles, &v1alpha.PreparedFile{ + Path: ConvertURIForCompute(preparedFile.Path), + MaxEventTime: timestamppb.New(time.Unix(0, preparedFile.MaxEventTime)), + MinEventTime: timestamppb.New(time.Unix(0, preparedFile.MinEventTime)), + NumRows: preparedFile.RowCount, + MetadataPath: ConvertURIForCompute(metadataPath), + }) + } + } + return computePreparedFiles +} diff --git a/wren/compute/interface.go b/wren/compute/interface.go deleted file mode 100644 index 6b79c4269..000000000 --- a/wren/compute/interface.go +++ /dev/null @@ -1,27 +0,0 @@ -package compute - -import ( - "context" - - "github.com/google/uuid" - v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" - v2alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v2alpha" - "github.com/kaskada-ai/kaskada/wren/client" - "github.com/kaskada-ai/kaskada/wren/ent" - "github.com/kaskada-ai/kaskada/wren/internal" -) - -type ComputeManager interface { - CompileQuery(ctx context.Context, owner *ent.Owner, query string, requestViews []*v1alpha.WithView, isFormula bool, isExperimental bool, sliceRequest *v1alpha.SliceRequest, resultBehavior v1alpha.Query_ResultBehavior) (*v1alpha.CompileResponse, error) - GetFormulas(ctx context.Context, owner *ent.Owner, views *v2alpha.QueryViews) ([]*v1alpha.Formula, error) - GetUsedViews(formulas []*v1alpha.Formula, compileResponse *v1alpha.CompileResponse) *v2alpha.QueryViews - CompileQueryV2(ctx context.Context, owner *ent.Owner, expression string, formulas []*v1alpha.Formula, config *v2alpha.QueryConfig) (*v1alpha.CompileResponse, error) - CreateCompileRequest(ctx context.Context, owner *ent.Owner, request *QueryRequest, options *QueryOptions) (*v1alpha.CompileRequest, error) - RunCompileRequest(ctx context.Context, owner *ent.Owner, compileRequest *v1alpha.CompileRequest) (*CompileQueryResponse, error) - GetOutputURI(owner *ent.Owner, planHash []byte) string - InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error) - SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot) - RunMaterializations(requestCtx context.Context, owner *ent.Owner) - GetTablesForCompute(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, slicePlans []*v1alpha.SlicePlan) (map[uuid.UUID]*internal.SliceTable, error) - GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) -} diff --git a/wren/compute/manager.go b/wren/compute/manager.go deleted file mode 100644 index 08565483a..000000000 --- a/wren/compute/manager.go +++ /dev/null @@ -1,875 +0,0 @@ -package compute - -import ( - "context" - "encoding/base64" - "fmt" - "io" - "math" - "path" - "strconv" - "strings" - "time" - - "github.com/google/uuid" - - "github.com/rs/zerolog/log" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" - _ "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/runtime/protoiface" - "google.golang.org/protobuf/types/known/timestamppb" - "google.golang.org/protobuf/types/known/wrapperspb" - - v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" - v2alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v2alpha" - "github.com/kaskada-ai/kaskada/wren/auth" - "github.com/kaskada-ai/kaskada/wren/client" - "github.com/kaskada-ai/kaskada/wren/customerrors" - "github.com/kaskada-ai/kaskada/wren/ent" - "github.com/kaskada-ai/kaskada/wren/ent/kaskadafile" - "github.com/kaskada-ai/kaskada/wren/internal" - "github.com/kaskada-ai/kaskada/wren/store" - "github.com/kaskada-ai/kaskada/wren/utils" -) - -const ( - keyColumnName = "key" - compileTimeoutSeconds = 10 -) - -type Manager struct { - computeClients *client.ComputeClients - errGroup *errgroup.Group - dataTokenClient internal.DataTokenClient - kaskadaTableClient internal.KaskadaTableClient - kaskadaViewClient internal.KaskadaViewClient - materializationClient internal.MaterializationClient - prepareJobClient internal.PrepareJobClient - parallelizeConfig utils.ParallelizeConfig - store client.ObjectStoreClient - tableStore store.TableStore - tr trace.Tracer -} - -// NewManager creates a new compute manager -func NewManager(errGroup *errgroup.Group, computeClients *client.ComputeClients, dataTokenClient *internal.DataTokenClient, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient, materializationClient *internal.MaterializationClient, prepareJobClient internal.PrepareJobClient, objectStoreClient *client.ObjectStoreClient, tableStore store.TableStore, parallelizeConfig utils.ParallelizeConfig) ComputeManager { - return &Manager{ - computeClients: computeClients, - errGroup: errGroup, - dataTokenClient: *dataTokenClient, - kaskadaTableClient: *kaskadaTableClient, - kaskadaViewClient: *kaskadaViewClient, - materializationClient: *materializationClient, - parallelizeConfig: parallelizeConfig, - prepareJobClient: prepareJobClient, - store: *objectStoreClient, - tableStore: tableStore, - tr: otel.Tracer("ComputeManager"), - } -} - -func (m *Manager) CompileQuery(ctx context.Context, owner *ent.Owner, query string, requestViews []*v1alpha.WithView, isFormula bool, isExperimental bool, sliceRequest *v1alpha.SliceRequest, resultBehavior v1alpha.Query_ResultBehavior) (*v1alpha.CompileResponse, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.CompileQuery").Logger() - formulas, err := m.getFormulas(ctx, owner, requestViews) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting formulas") - return nil, err - } - - tables, err := m.getTablesForCompile(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting tables for compile") - return nil, err - } - - var perEntityBehavior v1alpha.PerEntityBehavior - switch resultBehavior { - case v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_UNSPECIFIED - case v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_ALL - case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL - case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL_AT_TIME - default: - subLogger.Error().Str("resultBehavior", resultBehavior.String()).Msg("unexpected resultBehavior") - return nil, fmt.Errorf("unexpected resultBehavior: %s", resultBehavior.String()) - } - - compileRequest := &v1alpha.CompileRequest{ - Experimental: isExperimental, - FeatureSet: &v1alpha.FeatureSet{ - Formulas: formulas, - Query: query, - }, - PerEntityBehavior: perEntityBehavior, - SliceRequest: sliceRequest, - Tables: tables, - } - - if isFormula { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_FORMULA - } else { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE - } - - queryClient := m.computeClients.ComputeServiceClient(ctx) - defer queryClient.Close() - - subLogger.Info().Interface("request", compileRequest).Msg("sending compile request") - compileTimeoutCtx, compileTimeoutCancel := context.WithTimeout(ctx, time.Second*compileTimeoutSeconds) - defer compileTimeoutCancel() - compileResponse, err := queryClient.Compile(compileTimeoutCtx, compileRequest) - subLogger.Info().Err(err).Interface("response", compileResponse).Msg("received compile respone") - - return compileResponse, err -} - -// gets the set of passed views and system views available for a query -func (m *Manager) GetFormulas(ctx context.Context, owner *ent.Owner, views *v2alpha.QueryViews) ([]*v1alpha.Formula, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.GetFormulas").Logger() - requestViews := []*v1alpha.WithView{} - for _, queryView := range views.Views { - requestViews = append(requestViews, &v1alpha.WithView{ - Name: queryView.ViewName, - Expression: queryView.Expression, - }) - } - - formulas, err := m.getFormulas(ctx, owner, requestViews) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting formulas") - return nil, err - } - return formulas, nil -} - -// gets the actual set of views used in a compiled query -func (m *Manager) GetUsedViews(formulas []*v1alpha.Formula, compileResponse *v1alpha.CompileResponse) *v2alpha.QueryViews { - formulaMap := make(map[string]string, len(formulas)) - - for _, formula := range formulas { - formulaMap[formula.Name] = formula.Formula - } - - views := []*v2alpha.QueryView{} - - for _, freeName := range compileResponse.FreeNames { - if formula, found := formulaMap[freeName]; found { - views = append(views, &v2alpha.QueryView{ViewName: freeName, Expression: formula}) - } - } - - return &v2alpha.QueryViews{Views: views} -} - -func (m *Manager) CompileQueryV2(ctx context.Context, owner *ent.Owner, expression string, formulas []*v1alpha.Formula, config *v2alpha.QueryConfig) (*v1alpha.CompileResponse, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.CompileQueryV2").Logger() - - tables, err := m.getTablesForCompile(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting tables for compile") - return nil, err - } - - var perEntityBehavior v1alpha.PerEntityBehavior - switch config.ResultBehavior.ResultBehavior.(type) { - case *v2alpha.ResultBehavior_AllResults: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_ALL - case *v2alpha.ResultBehavior_FinalResults: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL - case *v2alpha.ResultBehavior_FinalResultsAtTime: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL_AT_TIME - default: - subLogger.Error().Str("resultBehavior", fmt.Sprintf("%T", config.ResultBehavior.ResultBehavior)).Msg("unexpected resultBehavior") - return nil, customerrors.NewInternalError("unexpected resultBehavior") - } - - sliceRequest := &v1alpha.SliceRequest{} - if config.Slice != nil && config.Slice.Slice != nil { - switch s := config.Slice.Slice.(type) { - case *v1alpha.SliceRequest_EntityKeys: - sliceRequest.Slice = &v1alpha.SliceRequest_EntityKeys{ - EntityKeys: &v1alpha.SliceRequest_EntityKeysSlice{ - EntityKeys: s.EntityKeys.EntityKeys, - }, - } - case *v1alpha.SliceRequest_Percent: - sliceRequest.Slice = &v1alpha.SliceRequest_Percent{ - Percent: &v1alpha.SliceRequest_PercentSlice{ - Percent: s.Percent.Percent, - }, - } - default: - subLogger.Error().Str("sliceRequest", fmt.Sprintf("%T", config.Slice.Slice)).Msg("unexpected sliceRequest") - return nil, customerrors.NewInternalError("unexpected sliceRequest") - } - } - - incrementalQueryExperiment := false - for _, experimentalFeature := range config.ExperimentalFeatures { - switch { - case strings.EqualFold("incremental", experimentalFeature): - incrementalQueryExperiment = true - } - } - - compileRequest := &v1alpha.CompileRequest{ - Experimental: incrementalQueryExperiment, - FeatureSet: &v1alpha.FeatureSet{ - Formulas: formulas, - Query: expression, - }, - PerEntityBehavior: perEntityBehavior, - SliceRequest: sliceRequest, - Tables: tables, - } - - isFormula := false - - if isFormula { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_FORMULA - } else { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE - } - - queryClient := m.computeClients.ComputeServiceClient(ctx) - defer queryClient.Close() - - subLogger.Info().Interface("request", compileRequest).Msg("sending compile request") - compileTimeoutCtx, compileTimeoutCancel := context.WithTimeout(ctx, time.Second*compileTimeoutSeconds) - defer compileTimeoutCancel() - compileResponse, err := queryClient.Compile(compileTimeoutCtx, compileRequest) - subLogger.Info().Err(err). - Interface("fenl_diagnostics", compileResponse.FenlDiagnostics). - Bool("incremental_enabled", compileResponse.IncrementalEnabled). - Strs("free_names", compileResponse.FreeNames). - Strs("missing_names", compileResponse.MissingNames). - Interface("plan_hash", compileResponse.PlanHash). - Interface("result_type", compileResponse.ResultType). - Interface("slices", compileResponse.TableSlices).Msg("received compile response") - - return compileResponse, err -} - -type QueryRequest struct { - Query string - RequestViews []*v1alpha.WithView - SliceRequest *v1alpha.SliceRequest - ResultBehavior v1alpha.Query_ResultBehavior -} -type QueryOptions struct { - IsFormula bool - IsExperimental bool -} - -type CompileQueryResponse struct { - ComputeResponse *v1alpha.CompileResponse - Views []*v1alpha.View -} - -func (m *Manager) CreateCompileRequest(ctx context.Context, owner *ent.Owner, request *QueryRequest, options *QueryOptions) (*v1alpha.CompileRequest, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.CreateCompileRequest").Logger() - formulas, err := m.getFormulas(ctx, owner, request.RequestViews) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting formulas") - return nil, err - } - - tables, err := m.getTablesForCompile(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting tables for compile") - return nil, err - } - - var perEntityBehavior v1alpha.PerEntityBehavior - - switch request.ResultBehavior { - case v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_UNSPECIFIED - case v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_ALL - case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL - case v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS_AT_TIME: - perEntityBehavior = v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL_AT_TIME - default: - subLogger.Error().Str("resultBehavior", request.ResultBehavior.String()).Msg("unexpected resultBehavior") - return nil, fmt.Errorf("unexpected resultBehavior: %s", request.ResultBehavior.String()) - } - - compileRequest := &v1alpha.CompileRequest{ - Experimental: options.IsExperimental, - FeatureSet: &v1alpha.FeatureSet{ - Formulas: formulas, - Query: request.Query, - }, - PerEntityBehavior: perEntityBehavior, - SliceRequest: request.SliceRequest, - Tables: tables, - } - - if options.IsFormula { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_FORMULA - } else { - compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE - } - - return compileRequest, nil -} - -func (m *Manager) RunCompileRequest(ctx context.Context, owner *ent.Owner, compileRequest *v1alpha.CompileRequest) (*CompileQueryResponse, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.RunCompileRequest").Logger() - computeClient := m.computeClients.ComputeServiceClient(ctx) - defer computeClient.Close() - - subLogger.Info().Interface("request", compileRequest).Msg("sending compile request") - compileTimeoutCtx, compileTimeoutCancel := context.WithTimeout(ctx, time.Second*compileTimeoutSeconds) - defer compileTimeoutCancel() - - computeCompileResponse, err := computeClient.Compile(compileTimeoutCtx, compileRequest) - subLogger.Info().Err(err). - Interface("fenl_diagnostics", computeCompileResponse.FenlDiagnostics). - Bool("incremental_enabled", computeCompileResponse.IncrementalEnabled). - Strs("free_names", computeCompileResponse.FreeNames). - Strs("missing_names", computeCompileResponse.MissingNames). - Interface("plan_hash", computeCompileResponse.PlanHash). - Interface("result_type", computeCompileResponse.ResultType). - Interface("slices", computeCompileResponse.TableSlices).Msg("received compile response") - if err != nil { - return nil, err - } - - compileResponse := CompileQueryResponse{ - ComputeResponse: computeCompileResponse, - Views: make([]*v1alpha.View, 0), - } - - for _, formula := range compileRequest.FeatureSet.Formulas { - for _, freeName := range computeCompileResponse.FreeNames { - if freeName == formula.Name { - compileResponse.Views = append(compileResponse.Views, &v1alpha.View{ - ViewName: formula.Name, - Expression: formula.Formula, - }) - } - } - } - - return &compileResponse, err -} - -func (m *Manager) GetOutputURI(owner *ent.Owner, planHash []byte) string { - subPath := path.Join("results", owner.ID.String(), base64.RawURLEncoding.EncodeToString(planHash)) - return m.store.GetDataPathURI(subPath) -} - -func (m *Manager) InitiateQuery(queryContext *QueryContext) (client.ComputeServiceClient, v1alpha.ComputeService_ExecuteClient, error) { - subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.InitiateQuery").Logger() - - executeRequest := &v1alpha.ExecuteRequest{ - ChangedSince: queryContext.changedSinceTime, - FinalResultTime: queryContext.finalResultTime, - Plan: queryContext.compileResp.Plan, - Limits: queryContext.limits, - Destination: queryContext.destination, - Tables: queryContext.GetComputeTables(), - } - - snapshotCacheBuster, err := m.getSnapshotCacheBuster(queryContext.ctx) - if err != nil { - return nil, nil, err - } - prepareCacheBuster, err := m.getPrepareCacheBuster(queryContext.ctx) - if err != nil { - return nil, nil, err - } - - queryClient := m.computeClients.ComputeServiceClient(queryContext.ctx) - - subLogger.Info().Bool("incremental_enabled", queryContext.compileResp.IncrementalEnabled).Bool("is_current_data_token", queryContext.isCurrentDataToken).Msg("Populating snapshot config if needed") - if queryContext.compileResp.IncrementalEnabled && queryContext.isCurrentDataToken && queryContext.compileResp.PlanHash != nil { - executeRequest.ComputeSnapshotConfig = &v1alpha.ComputeSnapshotConfig{ - OutputPrefix: ConvertURIForCompute(m.getComputeSnapshotDataURI(queryContext.owner, *snapshotCacheBuster, queryContext.compileResp.PlanHash.Hash, queryContext.dataToken.DataVersionID)), - } - subLogger.Info().Str("SnapshotPrefix", executeRequest.ComputeSnapshotConfig.OutputPrefix).Msg("Snapshot output prefix") - - bestSnapshot, err := m.kaskadaTableClient.GetBestComputeSnapshot(queryContext.ctx, queryContext.owner, queryContext.compileResp.PlanHash.Hash, *snapshotCacheBuster, queryContext.GetSlices(), *prepareCacheBuster) - if err != nil { - log.Warn().Err(err).Msg("issue getting existing snapshot. query will execute from scratch") - } else if bestSnapshot != nil { - executeRequest.ComputeSnapshotConfig.ResumeFrom = &wrapperspb.StringValue{Value: ConvertURIForCompute(bestSnapshot.Path)} - subLogger.Info().Str("ResumeFrom", executeRequest.ComputeSnapshotConfig.ResumeFrom.Value).Msg("Found snapshot to resume compute from") - } else { - subLogger.Info().Msg("no valid snapshot to resume from") - } - } - - subLogger.Info(). - Interface("compute_snapshot_config", executeRequest.ComputeSnapshotConfig). - Interface("tables", executeRequest.Tables). - Interface("limits", executeRequest.Limits). - Interface("final_result_time", executeRequest.FinalResultTime). - Interface("changed_since_time", executeRequest.ChangedSince). - Interface("destination", executeRequest.Destination).Msg("sending streaming query request to compute backend") - - executeClient, err := queryClient.Execute(queryContext.ctx, executeRequest) - if err != nil { - subLogger.Warn().Err(err).Msg("issue initiating streaming query compute request") - return nil, nil, customerrors.NewComputeError(reMapSparrowError(queryContext.ctx, err)) - } - return queryClient, executeClient, nil -} - -func (m *Manager) runMaterializationQuery(queryContext *QueryContext) (*QueryResult, error) { - subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.runMaterializationQuery").Logger() - - client, stream, err := m.InitiateQuery(queryContext) - if err != nil { - return nil, err - } - defer client.Close() - - result := &QueryResult{ - DataTokenId: queryContext.dataToken.ID.String(), - Paths: []string{}, - } - - for { - // Start receiving streaming messages - res, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - subLogger.Warn().Err(err).Msg("issue receiving execute response") - return result, customerrors.NewComputeError(reMapSparrowError(queryContext.ctx, err)) - } - - // Note: this does nothing visible to the user at the moment, as - // running materializations are currently opaque to the user. - // Eventually, we'll want to provide useful metadata for all destination types. - if res.Destination != nil { - switch kind := res.Destination.Destination.(type) { - case *v1alpha.Destination_ObjectStore: - result.Paths = append(result.Paths, kind.ObjectStore.OutputPaths.Paths...) - } - } - - switch res.State { - case v1alpha.LongQueryState_LONG_QUERY_STATE_INITIAL: - subLogger.Info().Msg("received initial message from execute request") - case v1alpha.LongQueryState_LONG_QUERY_STATE_RUNNING: - subLogger.Info().Interface("progress", res.Progress).Msg("received progress from execute request") - case v1alpha.LongQueryState_LONG_QUERY_STATE_FINAL: - subLogger.Info().Bool("query_done", res.IsQueryDone).Msg("received final message from execute request") - default: - subLogger.Error().Str("state", res.State.String()).Msg("unexpected long query state") - } - - m.SaveComputeSnapshots(queryContext, res.ComputeSnapshots) - } - - subLogger.Info().Interface("result", result).Msg("final query result") - return result, nil -} - -func (m *Manager) SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot) { - subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.SaveComputeSnapshots").Logger() - for _, computeSnapshot := range computeSnapshots { - if err := m.kaskadaTableClient.SaveComputeSnapshot(queryContext.ctx, queryContext.owner, computeSnapshot.PlanHash.Hash, computeSnapshot.SnapshotVersion, queryContext.dataToken, ConvertURIForManager(computeSnapshot.Path), computeSnapshot.MaxEventTime.AsTime(), queryContext.GetTableIDs()); err != nil { - subLogger.Error().Err(err).Str("data_token_id", queryContext.dataToken.ID.String()).Msg("issue saving compute snapshot") - } - } -} - -// Runs all saved materializations on current data inside a go-routine that attempts to finish before shutdown -func (m *Manager) RunMaterializations(requestCtx context.Context, owner *ent.Owner) { - m.errGroup.Go(func() error { return m.processMaterializations(requestCtx, owner) }) -} - -// Runs all saved materializations on current data -// Note: any errors returned from this method will cause wren to start its safe-shutdown routine -// so be careful to only return errors that truly warrant a shutdown. -func (m *Manager) processMaterializations(requestCtx context.Context, owner *ent.Owner) error { - ctx, cancel, err := auth.NewBackgroundContextWithAPIClient(requestCtx) - if err != nil { - log.Ctx(requestCtx).Error().Err(err).Msg("error creating background context for processing materializations") - return err - } - defer cancel() - - subLogger := log.Ctx(ctx).With().Str("method", "manager.processMaterializations").Logger() - - dataToken, err := m.dataTokenClient.GetCurrentDataToken(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("getting current data_token") - return nil - } - - materializations, err := m.materializationClient.GetAllMaterializations(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("error listing materializations") - return nil - } - - for _, materialization := range materializations { - matLogger := subLogger.With().Str("materialization_name", materialization.Name).Logger() - - prepareCacheBuster, err := m.getPrepareCacheBuster(ctx) - if err != nil { - matLogger.Error().Err(err).Msg("issue getting current prepare cache buster") - } - - isExperimental := false - compileResp, err := m.CompileQuery(ctx, owner, materialization.Expression, materialization.WithViews.Views, false, isExperimental, materialization.SliceRequest, v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS) - if err != nil { - matLogger.Error().Err(err).Msg("analyzing materialization") - return nil - } - - if compileResp.Plan == nil { - matLogger.Error().Interface("missing_names", compileResp.MissingNames).Interface("diagnostics", compileResp.FenlDiagnostics).Msg("analysis determined query is not executable. This is unexpected, as the materialization was previously able to compile.") - return nil - } - - tables, err := m.GetTablesForCompute(ctx, owner, dataToken, compileResp.TableSlices) - if err != nil { - matLogger.Error().Err(err).Str("data_token_id", dataToken.ID.String()).Msg("getting tables for data_token") - return nil - } - - destination := &v1alpha.Destination{} - if materialization.Destination == nil { - matLogger.Error().Str("materialization", materialization.Name).Msg("materialization has no destination") - return nil - } - switch kind := materialization.Destination.Destination.(type) { - case *v1alpha.Destination_ObjectStore: - matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializating to object store") - - // Append the materialization version to the output prefix so result files - // for specific datatokens are grouped together. - outputPrefixUri := kind.ObjectStore.GetOutputPrefixUri() - outputPrefixUri = path.Join(outputPrefixUri, strconv.FormatInt(materialization.Version, 10)) - - destination.Destination = &v1alpha.Destination_ObjectStore{ - ObjectStore: &v1alpha.ObjectStoreDestination{ - FileType: kind.ObjectStore.GetFileType(), - OutputPrefixUri: outputPrefixUri, - }, - } - case *v1alpha.Destination_Pulsar: - matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializating to pulsar") - destination.Destination = kind - case *v1alpha.Destination_Redis: - matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializing to redis") - destination.Destination = kind - default: - matLogger.Error().Interface("type", kind).Str("when", "pre-compute").Msg("materialization output type not implemented") - return fmt.Errorf("materialization output type %s is not implemented", kind) - } - - queryContext, _ := GetNewQueryContext(ctx, owner, nil, compileResp, dataToken, nil, true, nil, destination, materialization.SliceRequest, tables) - - dataVersionID := materialization.DataVersionID - var minTimeInNewFiles int64 = math.MaxInt64 - for _, slice := range queryContext.GetSlices() { - minTime, err := m.kaskadaTableClient.GetMinTimeOfNewPreparedFiles(ctx, *prepareCacheBuster, slice, dataVersionID) - if ent.IsNotFound(err) { - continue - } - if err != nil { - return fmt.Errorf("could not get min time of new files for slice %s. Not materializing results", slice) - } - - if *minTime < minTimeInNewFiles { - minTimeInNewFiles = *minTime - } - } - - // Interpret the int64 (as nanos since epoch) as a proto timestamp - changedSinceTime := ×tamppb.Timestamp{ - Seconds: minTimeInNewFiles / 1_000_000_000, - Nanos: (int32)(minTimeInNewFiles % 1_000_000_000), - } - - // Remakes the query context with the changed since time. - // - // Not a great pattern, since we're recreating the context. If we're able - // to pull out the relevant code that converts `SlicePlans` to `SliceInfo` - // for the table client to get the min time of files, we can clean this up. - queryContext, queryContextCancel := GetNewQueryContext(ctx, owner, changedSinceTime, compileResp, dataToken, nil, true, nil, destination, materialization.SliceRequest, tables) - defer queryContextCancel() - - err = m.computeMaterialization(materialization, queryContext) - if err != nil { - matLogger.Error().Err(err).Str("name", materialization.Name).Msg("error computing materialization") - return nil - } - - // Update materializations that have run with the current data version id, so on - // subsequent runs only the updated values will be produced. - _, err = m.materializationClient.UpdateDataVersion(ctx, materialization, queryContext.dataToken.DataVersionID) - if err != nil { - matLogger.Error().Err(err).Str("name", materialization.Name).Int64("previousDataVersion", dataVersionID).Int64("newDataVersion", queryContext.dataToken.DataVersionID).Msg("error updating materialization with new data version") - return nil - } - // Update the version for this materialization. - _, err = m.materializationClient.IncrementVersion(ctx, materialization) - if err != nil { - matLogger.Error().Err(err).Str("name", materialization.Name).Msg("error updating materialization version") - return nil - } - } - - return nil -} - -func (m *Manager) computeMaterialization(materialization *ent.Materialization, queryContext *QueryContext) error { - subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.computeMaterialization").Str("materialization", materialization.Name).Logger() - _, err := m.runMaterializationQuery(queryContext) - if err != nil { - subLogger.Error().Err(err).Msg("invalid compute backend response") - return err - } - - subLogger.Info().Msg("successfully exported materialization") - return nil -} - -func reMapSparrowError(ctx context.Context, err error) error { - subLogger := log.Ctx(ctx).With().Str("method", "manager.reMapSparrowError").Logger() - inStatus, ok := status.FromError(err) - if !ok { - subLogger.Error().Msg("unexpected: compute error did not include error-status") - return err - } - outStatus := status.New(inStatus.Code(), inStatus.Message()) - - for _, detail := range inStatus.Details() { - switch t := detail.(type) { - case protoiface.MessageV1: - outStatus, err = outStatus.WithDetails(t) - if err != nil { - subLogger.Error().Err(err).Interface("detail", t).Msg("unable to add detail to re-mapped error details") - } - default: - subLogger.Error().Err(err).Interface("detail", t).Msg("unexpected: detail from compute doesn't implement the protoifam.MessageV1 interface") - } - } - return outStatus.Err() -} - -func (m *Manager) getTablesForCompile(ctx context.Context, owner *ent.Owner) ([]*v1alpha.ComputeTable, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.getTablesForCompile").Logger() - computeTables := []*v1alpha.ComputeTable{} - - kaskadaTables, err := m.kaskadaTableClient.GetAllKaskadaTables(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("error getting all tables") - return nil, err - } - - for _, kaskadaTable := range kaskadaTables { - // if merged schema not set, table still contains no data - if kaskadaTable.MergedSchema != nil { - computeTables = append(computeTables, convertKaskadaTableToComputeTable(kaskadaTable)) - } - } - return computeTables, nil -} - -func (m *Manager) getTablesForQuery(ctx context.Context, owner *ent.Owner, slicePlans []*v1alpha.SlicePlan) (map[string]*ent.KaskadaTable, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.getTablesForQuery").Logger() - - tableMap := map[string]*ent.KaskadaTable{} - for _, slicePlan := range slicePlans { - tableName := slicePlan.TableName - sliceLoggger := subLogger.With().Str("table_name", tableName).Interface("slice_plan", slicePlan.Slice).Logger() - - if _, found := tableMap[tableName]; !found { - kaskadaTable, err := m.kaskadaTableClient.GetKaskadaTableByName(ctx, owner, tableName) - if err != nil { - sliceLoggger.Error().Err(err).Msg("issue getting kaskada table") - return nil, err - } - tableMap[tableName] = kaskadaTable - } - } - return tableMap, nil -} - -// converts a dataToken into a map of of tableIDs to internal.SliceTable -// prepares data as needed -func (m *Manager) GetTablesForCompute(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, slicePlans []*v1alpha.SlicePlan) (map[uuid.UUID]*internal.SliceTable, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.GetTablesForCompute").Logger() - - kaskadaTableMap, err := m.getTablesForQuery(ctx, owner, slicePlans) - if err != nil { - return nil, err - } - - sliceTableMap := make(map[uuid.UUID]*internal.SliceTable, len(kaskadaTableMap)) - for _, kaskadaTable := range kaskadaTableMap { - sliceTableMap[kaskadaTable.ID] = internal.GetNewSliceTable(kaskadaTable) - } - - for _, slicePlan := range slicePlans { - tableName := slicePlan.TableName - sliceLoggger := subLogger.With().Str("table_name", tableName).Interface("slice_plan", slicePlan.Slice).Logger() - - kaskadaTable, found := kaskadaTableMap[slicePlan.TableName] - if !found { - sliceLoggger.Error().Msg("unexpected; missing kaskadaTable") - return nil, fmt.Errorf("unexpected; missing kaskadaTable") - } - - sliceInfo, err := internal.GetNewSliceInfo(slicePlan, kaskadaTable) - if err != nil { - sliceLoggger.Error().Err(err).Msg("issue gettting slice info") - } - - prepareJobs, err := m.getOrCreatePrepareJobs(ctx, owner, dataToken, sliceInfo) - if err != nil { - sliceLoggger.Error().Err(err).Msg("issue getting and/or creating prepare jobs") - return nil, err - } - - sliceTableMap[kaskadaTable.ID].FileSetMap[&sliceInfo.PlanHash] = internal.GetNewFileSet(sliceInfo, prepareJobs) - } - - err = m.parallelPrepare(ctx, owner, sliceTableMap) - if err != nil { - subLogger.Error().Err(err).Msg("issue preparing tables") - return nil, err - } - - //refresh prepareJobs after prepare complete - // for each job - for _, sliceTable := range sliceTableMap { - for _, fileSet := range sliceTable.FileSetMap { - refreshedPrepareJobs := make([]*ent.PrepareJob, len(fileSet.PrepareJobs)) - for i, prepareJob := range fileSet.PrepareJobs { - refreshedPrepareJobs[i], err = m.prepareJobClient.GetPrepareJob(ctx, prepareJob.ID) - subLogger.Debug().Interface("prepare_job", refreshedPrepareJobs[i]).Msg("refreshed job") - if err != nil { - subLogger.Error().Err(err).Msg("issue refreshing prepare jobs") - return nil, fmt.Errorf("issue refreshing prepare jobs") - } - } - fileSet.PrepareJobs = refreshedPrepareJobs - } - } - - return sliceTableMap, nil -} - -// converts request views and persisted views to formulas. if a request view has the same name as a persisted view -// the request view is used. -func (m *Manager) getFormulas(ctx context.Context, owner *ent.Owner, requestViews []*v1alpha.WithView) ([]*v1alpha.Formula, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.getFormulas").Logger() - persistedViews, err := m.kaskadaViewClient.GetAllKaskadaViews(ctx, owner) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting persisted views") - return nil, err - } - - formulas := []*v1alpha.Formula{} - - requestViewNames := make(map[string]struct{}, len(requestViews)) - for _, requestView := range requestViews { - requestViewNames[requestView.Name] = struct{}{} - - formulas = append(formulas, &v1alpha.Formula{ - Name: requestView.Name, - Formula: requestView.Expression, - SourceLocation: fmt.Sprintf("View %s", requestView.Name), - }) - } - - for _, persistedView := range persistedViews { - if _, found := requestViewNames[persistedView.Name]; !found { - formulas = append(formulas, &v1alpha.Formula{ - Name: persistedView.Name, - Formula: persistedView.Expression, - SourceLocation: fmt.Sprintf("View %s", persistedView.Name), - }) - } - } - - return formulas, nil -} - -// gets the current snapshot cache buster -func (m *Manager) getSnapshotCacheBuster(ctx context.Context) (*int32, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.getSnapshotCacheBuster").Logger() - queryClient := m.computeClients.ComputeServiceClient(ctx) - defer queryClient.Close() - - res, err := queryClient.GetCurrentSnapshotVersion(ctx, &v1alpha.GetCurrentSnapshotVersionRequest{}) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting snapshot_cache_buster") - return nil, err - } - return &res.SnapshotVersion, nil -} - -// returns s3://root/computeSnapshots//// -func (m *Manager) getComputeSnapshotDataURI(owner *ent.Owner, snapshotCacheBuster int32, planHash []byte, dataVersion int64) string { - subPath := path.Join("computeSnapshots", strconv.Itoa(int(snapshotCacheBuster)), owner.ID.String(), base64.RawURLEncoding.EncodeToString(planHash), utils.Int64ToString(dataVersion)) - return m.store.GetDataPathURI(subPath) -} - -func (m *Manager) GetFileSchema(ctx context.Context, fileInput internal.FileInput) (*v1alpha.Schema, error) { - subLogger := log.Ctx(ctx).With().Str("method", "manager.GetFileSchema").Str("uri", fileInput.GetURI()).Str("type", fileInput.GetExtension()).Logger() - // Send the metadata request to the FileService - - var sourceData *v1alpha.SourceData - - switch fileInput.GetType() { - case kaskadafile.TypeCsv: - sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_CsvPath{CsvPath: fileInput.GetURI()}} - case kaskadafile.TypeParquet: - sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}} - default: - subLogger.Warn().Msg("user didn't specifiy file type, defaulting to parquet for now, but will error in the future") - sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}} - } - - fileClient := m.computeClients.FileServiceClient(ctx) - defer fileClient.Close() - - metadataReq := &v1alpha.GetMetadataRequest{ - SourceData: sourceData, - } - - subLogger.Debug().Interface("request", metadataReq).Msg("sending get_metadata request to file service") - metadataRes, err := fileClient.GetMetadata(ctx, metadataReq) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting file schema from file_service") - return nil, err - } - - if metadataRes.SourceMetadata == nil { - subLogger.Error().Msg("issue getting file schema from file_service") - return nil, fmt.Errorf("issue getting file schema from file_service") - } - - return metadataRes.SourceMetadata.Schema, nil -} - -func ConvertURIForCompute(URI string) string { - return strings.TrimPrefix(URI, "file://") -} - -func ConvertURIForManager(URI string) string { - if strings.HasPrefix(URI, "/") { - return fmt.Sprintf("file://%s", URI) - } - return URI -} diff --git a/wren/compute/materialization_manager.go b/wren/compute/materialization_manager.go new file mode 100644 index 000000000..c86fd3835 --- /dev/null +++ b/wren/compute/materialization_manager.go @@ -0,0 +1,146 @@ +package compute + +import ( + "context" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/client" + "github.com/kaskada-ai/kaskada/wren/customerrors" + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/rs/zerolog/log" +) + +type MaterializationManager interface { + CompileManager + + // StartMaterialization starts a materialization on the compute backend + StartMaterialization(ctx context.Context, owner *ent.Owner, materializationID string, compileResp *v1alpha.CompileResponse, destination *v1alpha.Destination) error + + // StopMaterialization stops a materialization on the compute backend + StopMaterialization(ctx context.Context, materializationID string) error + + // GetMaterializationStatus gets the status of a materialization on the compute backend + GetMaterializationStatus(ctx context.Context, materializationID string) (*v1alpha.ProgressInformation, error) +} + +type materializationManager struct { + CompileManager + + computeClients client.ComputeClients + kaskadaTableClient internal.KaskadaTableClient + materializationClient internal.MaterializationClient +} + +func NewMaterializationManager(compileManager *CompileManager, computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, materializationClient *internal.MaterializationClient) MaterializationManager { + return &materializationManager{ + CompileManager: *compileManager, + computeClients: *computeClients, + kaskadaTableClient: *kaskadaTableClient, + materializationClient: *materializationClient, + } +} + +func (m *materializationManager) StartMaterialization(ctx context.Context, owner *ent.Owner, materializationID string, compileResp *v1alpha.CompileResponse, destination *v1alpha.Destination) error { + subLogger := log.Ctx(ctx).With().Str("method", "manager.StartMaterialization").Str("materialization_id", materializationID).Logger() + + tables, err := m.getMaterializationTables(ctx, owner, compileResp) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting materialization tables") + } + + startRequest := &v1alpha.StartMaterializationRequest{ + MaterializationId: materializationID, + Plan: compileResp.Plan, + Tables: tables, + Destination: destination, + } + + computeClient := m.computeClients.NewComputeServiceClient(ctx) + defer computeClient.Close() + + subLogger.Info(). + Interface("tables", startRequest.Tables). + Interface("destination", startRequest.Destination).Msg("sending start materialization request to compute backend") + + _, err = computeClient.StartMaterialization(ctx, startRequest) + if err != nil { + subLogger.Error().Err(err).Msg("issue starting materialization") + return customerrors.NewComputeError(reMapSparrowError(ctx, err)) + } + + return nil +} + +func (m *materializationManager) StopMaterialization(ctx context.Context, materializationID string) error { + subLogger := log.Ctx(ctx).With().Str("method", "manager.StopMaterialization").Str("materialization_id", materializationID).Logger() + + stopRequest := &v1alpha.StopMaterializationRequest{ + MaterializationId: materializationID, + } + + computeClient := m.computeClients.NewComputeServiceClient(ctx) + defer computeClient.Close() + + _, err := computeClient.StopMaterialization(ctx, stopRequest) + if err != nil { + subLogger.Error().Err(err).Msg("issue stopping materialization") + return customerrors.NewComputeError(reMapSparrowError(ctx, err)) + } + + return nil +} + +func (m *materializationManager) GetMaterializationStatus(ctx context.Context, materializationID string) (*v1alpha.ProgressInformation, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.GetMaterializationStatus").Str("materialization_id", materializationID).Logger() + + statusRequest := &v1alpha.GetMaterializationStatusRequest{ + MaterializationId: materializationID, + } + + computeClient := m.computeClients.NewComputeServiceClient(ctx) + defer computeClient.Close() + + statusResponse, err := computeClient.GetMaterializationStatus(ctx, statusRequest) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting materialization status") + return nil, customerrors.NewComputeError(reMapSparrowError(ctx, err)) + } + + return statusResponse.Progress, nil +} + +func (m *materializationManager) getMaterializationTables(ctx context.Context, owner *ent.Owner, compileResp *v1alpha.CompileResponse) ([]*v1alpha.ComputeTable, error) { + subLogger := log.Ctx(ctx).With().Str("method", "materializationManager.getMaterializationTables").Logger() + + // map of tableName to a list of slice plans + slicePlanMap := map[string][]*v1alpha.SlicePlan{} + for _, slicePlan := range compileResp.TableSlices { + if _, found := slicePlanMap[slicePlan.TableName]; !found { + slicePlanMap[slicePlan.TableName] = []*v1alpha.SlicePlan{} + } + slicePlanMap[slicePlan.TableName] = append(slicePlanMap[slicePlan.TableName], slicePlan) + } + + computeTables := make([]*v1alpha.ComputeTable, len(slicePlanMap)) + i := 0 + + for tableName, slicePlanList := range slicePlanMap { + + kaskadaTable, err := m.kaskadaTableClient.GetKaskadaTableByName(ctx, owner, tableName) + if err != nil { + subLogger.Error().Err(err).Msg("issue getting kaskada table") + return nil, err + } + + computeTables[i] = convertKaskadaTableToComputeTable(kaskadaTable) + computeTables[i].FileSets = make([]*v1alpha.ComputeTable_FileSet, len(slicePlanList)) + + for j, slicePlan := range slicePlanList { + computeTables[i].FileSets[j] = &v1alpha.ComputeTable_FileSet{SlicePlan: slicePlan} + } + i++ + } + + return computeTables, nil +} diff --git a/wren/compute/prepare.go b/wren/compute/prepare_manager.go similarity index 64% rename from wren/compute/prepare.go rename to wren/compute/prepare_manager.go index 44af1eea2..50ddd747f 100644 --- a/wren/compute/prepare.go +++ b/wren/compute/prepare_manager.go @@ -7,13 +7,18 @@ import ( "github.com/google/uuid" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/client" "github.com/kaskada-ai/kaskada/wren/ent" "github.com/kaskada-ai/kaskada/wren/ent/kaskadafile" "github.com/kaskada-ai/kaskada/wren/internal" "github.com/kaskada-ai/kaskada/wren/property" + "github.com/kaskada-ai/kaskada/wren/store" + "github.com/kaskada-ai/kaskada/wren/utils" ) const ( @@ -21,8 +26,98 @@ const ( prepareTimeoutSeconds = 1800 //30 mins ) +type PrepareManager interface { + PrepareTablesForCompute(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, slicePlans []*v1alpha.SlicePlan) (map[uuid.UUID]*internal.SliceTable, error) + GetPrepareCacheBuster(ctx context.Context) (*int32, error) +} + +type prepareManager struct { + computeClients client.ComputeClients + kaskadaTableClient internal.KaskadaTableClient + prepareJobClient internal.PrepareJobClient + parallelizeConfig utils.ParallelizeConfig + tableStore store.TableStore + tr trace.Tracer +} + +func NewPrepareManager(computeClients *client.ComputeClients, kaskadaTableClient *internal.KaskadaTableClient, prepareJobClient *internal.PrepareJobClient, parallelizeConfig *utils.ParallelizeConfig, tableStore *store.TableStore) PrepareManager { + return &prepareManager{ + computeClients: *computeClients, + kaskadaTableClient: *kaskadaTableClient, + prepareJobClient: *prepareJobClient, + parallelizeConfig: *parallelizeConfig, + tableStore: *tableStore, + tr: otel.Tracer("prepareManager"), + } +} + +// converts a dataToken into a map of of tableIDs to internal.SliceTable +// prepares data as needed +func (m *prepareManager) PrepareTablesForCompute(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, slicePlans []*v1alpha.SlicePlan) (map[uuid.UUID]*internal.SliceTable, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.GetTablesForCompute").Logger() + + kaskadaTableMap, err := m.getTablesForQuery(ctx, owner, slicePlans) + if err != nil { + return nil, err + } + + sliceTableMap := make(map[uuid.UUID]*internal.SliceTable, len(kaskadaTableMap)) + for _, kaskadaTable := range kaskadaTableMap { + sliceTableMap[kaskadaTable.ID] = internal.GetNewSliceTable(kaskadaTable) + } + + for _, slicePlan := range slicePlans { + tableName := slicePlan.TableName + sliceLoggger := subLogger.With().Str("table_name", tableName).Interface("slice_plan", slicePlan.Slice).Logger() + + kaskadaTable, found := kaskadaTableMap[slicePlan.TableName] + if !found { + sliceLoggger.Error().Msg("unexpected; missing kaskadaTable") + return nil, fmt.Errorf("unexpected; missing kaskadaTable") + } + + sliceInfo, err := internal.GetNewSliceInfo(slicePlan, kaskadaTable) + if err != nil { + sliceLoggger.Error().Err(err).Msg("issue gettting slice info") + } + + prepareJobs, err := m.getOrCreatePrepareJobs(ctx, owner, dataToken, sliceInfo) + if err != nil { + sliceLoggger.Error().Err(err).Msg("issue getting and/or creating prepare jobs") + return nil, err + } + + sliceTableMap[kaskadaTable.ID].FileSetMap[&sliceInfo.PlanHash] = internal.GetNewFileSet(sliceInfo, prepareJobs) + } + + err = m.parallelPrepare(ctx, owner, sliceTableMap) + if err != nil { + subLogger.Error().Err(err).Msg("issue preparing tables") + return nil, err + } + + //refresh prepareJobs after prepare complete + // for each job + for _, sliceTable := range sliceTableMap { + for _, fileSet := range sliceTable.FileSetMap { + refreshedPrepareJobs := make([]*ent.PrepareJob, len(fileSet.PrepareJobs)) + for i, prepareJob := range fileSet.PrepareJobs { + refreshedPrepareJobs[i], err = m.prepareJobClient.GetPrepareJob(ctx, prepareJob.ID) + subLogger.Debug().Interface("prepare_job", refreshedPrepareJobs[i]).Msg("refreshed job") + if err != nil { + subLogger.Error().Err(err).Msg("issue refreshing prepare jobs") + return nil, fmt.Errorf("issue refreshing prepare jobs") + } + } + fileSet.PrepareJobs = refreshedPrepareJobs + } + } + + return sliceTableMap, nil +} + // parallelly prepares files and downloads them after prepare. starts downloading as soon as files are available. -func (m *Manager) parallelPrepare(ctx context.Context, owner *ent.Owner, sliceTableMap map[uuid.UUID]*internal.SliceTable) error { +func (m *prepareManager) parallelPrepare(ctx context.Context, owner *ent.Owner, sliceTableMap map[uuid.UUID]*internal.SliceTable) error { subLogger := log.Ctx(ctx).With().Str("method", "compute.parallelPrepare").Logger() ctx, span := m.tr.Start(ctx, "compute.parallelPrepare") defer span.End() @@ -121,7 +216,7 @@ func (m *Manager) parallelPrepare(ctx context.Context, owner *ent.Owner, sliceTa // executePrepare will prepare files via the Compute Prepare API // when successful, updates the the `prepareJob` -func (m *Manager) executePrepare(ctx context.Context, owner *ent.Owner, prepareJob *ent.PrepareJob) error { +func (m *prepareManager) executePrepare(ctx context.Context, owner *ent.Owner, prepareJob *ent.PrepareJob) error { if prepareJob == nil { log.Ctx(ctx).Error().Msg("unexpected; got nil prepare_job") return fmt.Errorf("unexpected; got nil prepare_job") @@ -167,7 +262,7 @@ func (m *Manager) executePrepare(ctx context.Context, owner *ent.Owner, prepareJ } // Send the preparation request to the prepare client - prepareClient := m.computeClients.PrepareServiceClient(ctx) + prepareClient := m.computeClients.NewPrepareServiceClient(ctx) defer prepareClient.Close() prepareReq := &v1alpha.PrepareDataRequest{ SourceData: sourceData, @@ -188,12 +283,6 @@ func (m *Manager) executePrepare(ctx context.Context, owner *ent.Owner, prepareJ } subLogger.Debug().Interface("response", prepareRes).Msg("received prepare response") - for _, preparedFile := range prepareRes.PreparedFiles { - preparedFile.MetadataPath = preparedFile.MetadataPath - preparedFile.Path = preparedFile.Path - subLogger.Debug().Interface("prepared_file", preparedFile).Msg("these paths should be URIs") - } - err = m.prepareJobClient.AddFilesToPrepareJob(ctx, prepareJob, prepareRes.PreparedFiles, kaskadaFile) if err != nil { subLogger.Error().Err(err).Msg("issue adding prepared_files to prepare_job") @@ -209,12 +298,12 @@ func (m *Manager) executePrepare(ctx context.Context, owner *ent.Owner, prepareJ return nil } -func (m *Manager) getOrCreatePrepareJobs(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, sliceInfo *internal.SliceInfo) ([]*ent.PrepareJob, error) { +func (m *prepareManager) getOrCreatePrepareJobs(ctx context.Context, owner *ent.Owner, dataToken *ent.DataToken, sliceInfo *internal.SliceInfo) ([]*ent.PrepareJob, error) { kaskadaTable := sliceInfo.KaskadaTable slicePlan := sliceInfo.Plan subLogger := log.Ctx(ctx).With().Str("method", "manager.getOrCreatePrepareJobs").Str("table_name", kaskadaTable.Name).Interface("slice_plan", slicePlan.Slice).Logger() - prepareCacheBuster, err := m.getPrepareCacheBuster(ctx) + prepareCacheBuster, err := m.GetPrepareCacheBuster(ctx) if err != nil { subLogger.Error().Err(err).Msg("issue getting current prepare cache buster") } @@ -265,9 +354,9 @@ func (m *Manager) getOrCreatePrepareJobs(ctx context.Context, owner *ent.Owner, } // gets the current prepare cache buster -func (m *Manager) getPrepareCacheBuster(ctx context.Context) (*int32, error) { +func (m *prepareManager) GetPrepareCacheBuster(ctx context.Context) (*int32, error) { subLogger := log.Ctx(ctx).With().Str("method", "manager.getPrepareCacheBuster").Logger() - prepareClient := m.computeClients.PrepareServiceClient(ctx) + prepareClient := m.computeClients.NewPrepareServiceClient(ctx) defer prepareClient.Close() res, err := prepareClient.GetCurrentPrepID(ctx, &v1alpha.GetCurrentPrepIDRequest{}) if err != nil { @@ -276,3 +365,23 @@ func (m *Manager) getPrepareCacheBuster(ctx context.Context) (*int32, error) { } return &res.PrepId, nil } + +func (m *prepareManager) getTablesForQuery(ctx context.Context, owner *ent.Owner, slicePlans []*v1alpha.SlicePlan) (map[string]*ent.KaskadaTable, error) { + subLogger := log.Ctx(ctx).With().Str("method", "manager.getTablesForQuery").Logger() + + tableMap := map[string]*ent.KaskadaTable{} + for _, slicePlan := range slicePlans { + tableName := slicePlan.TableName + sliceLoggger := subLogger.With().Str("table_name", tableName).Interface("slice_plan", slicePlan.Slice).Logger() + + if _, found := tableMap[tableName]; !found { + kaskadaTable, err := m.kaskadaTableClient.GetKaskadaTableByName(ctx, owner, tableName) + if err != nil { + sliceLoggger.Error().Err(err).Msg("issue getting kaskada table") + return nil, err + } + tableMap[tableName] = kaskadaTable + } + } + return tableMap, nil +} diff --git a/wren/compute/query_context.go b/wren/compute/query_context.go index 2c59673e8..5f34a09b7 100644 --- a/wren/compute/query_context.go +++ b/wren/compute/query_context.go @@ -2,11 +2,9 @@ package compute import ( "context" - "time" "github.com/google/uuid" "google.golang.org/protobuf/types/known/timestamppb" - "google.golang.org/protobuf/types/known/wrapperspb" v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" "github.com/kaskada-ai/kaskada/wren/ent" @@ -92,49 +90,3 @@ func (qc *QueryContext) GetComputeTables() []*v1alpha.ComputeTable { func (qc *QueryContext) Cancelled() bool { return qc.ctx.Err() != nil } - -func convertKaskadaTableToComputeTable(kaskadaTable *ent.KaskadaTable) *v1alpha.ComputeTable { - if kaskadaTable == nil { - return nil - } - - computeTable := &v1alpha.ComputeTable{ - Config: &v1alpha.TableConfig{ - Name: kaskadaTable.Name, - Uuid: kaskadaTable.ID.String(), - TimeColumnName: kaskadaTable.TimeColumnName, - GroupColumnName: kaskadaTable.EntityKeyColumnName, - Grouping: kaskadaTable.GroupingID, - Source: kaskadaTable.Source, - }, - Metadata: &v1alpha.TableMetadata{ - Schema: kaskadaTable.MergedSchema, - }, - FileSets: []*v1alpha.ComputeTable_FileSet{}, - } - - if kaskadaTable.SubsortColumnName != nil { - computeTable.Config.SubsortColumnName = &wrapperspb.StringValue{Value: *kaskadaTable.SubsortColumnName} - } - return computeTable -} - -func getComputePreparedFiles(prepareJobs []*ent.PrepareJob) []*v1alpha.PreparedFile { - computePreparedFiles := []*v1alpha.PreparedFile{} - for _, prepareJob := range prepareJobs { - for _, preparedFile := range prepareJob.Edges.PreparedFiles { - metadataPath := "" - if preparedFile.MetadataPath != nil { - metadataPath = *preparedFile.MetadataPath - } - computePreparedFiles = append(computePreparedFiles, &v1alpha.PreparedFile{ - Path: ConvertURIForCompute(preparedFile.Path), - MaxEventTime: timestamppb.New(time.Unix(0, preparedFile.MaxEventTime)), - MinEventTime: timestamppb.New(time.Unix(0, preparedFile.MinEventTime)), - NumRows: preparedFile.RowCount, - MetadataPath: ConvertURIForCompute(metadataPath), - }) - } - } - return computePreparedFiles -} diff --git a/wren/compute/query_result.go b/wren/compute/query_result.go deleted file mode 100644 index 6e9501a54..000000000 --- a/wren/compute/query_result.go +++ /dev/null @@ -1,6 +0,0 @@ -package compute - -type QueryResult struct { - DataTokenId string - Paths []string -} diff --git a/wren/go.mod b/wren/go.mod index e0f77588d..3bd641bbd 100644 --- a/wren/go.mod +++ b/wren/go.mod @@ -91,6 +91,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.2.0 // indirect go.opentelemetry.io/proto/otlp v0.11.0 // indirect golang.org/x/crypto v0.1.0 // indirect + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/sys v0.5.0 // indirect diff --git a/wren/go.sum b/wren/go.sum index 6267568f8..899da1e3f 100644 --- a/wren/go.sum +++ b/wren/go.sum @@ -434,6 +434,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/wren/main.go b/wren/main.go index 8231a193b..451f9092b 100644 --- a/wren/main.go +++ b/wren/main.go @@ -215,7 +215,11 @@ func main() { // connect to stores tableStore := store.NewTableStore(&objectStoreClient) - computeManager := compute.NewManager(g, computeClients, &dataTokenClient, &kaskadaTableClient, &kaskadaViewClient, &materializationClient, prepareJobClient, &objectStoreClient, *tableStore, *parallelizeConfig) + compileManager := compute.NewCompileManager(&computeClients, &kaskadaTableClient, &kaskadaViewClient) + prepareManager := compute.NewPrepareManager(&computeClients, &kaskadaTableClient, &prepareJobClient, parallelizeConfig, tableStore) + computeManager := compute.NewComputeManager(g, &compileManager, &computeClients, &dataTokenClient, &kaskadaTableClient, &materializationClient, &objectStoreClient, &prepareManager) + fileManager := compute.NewFileManager(&computeClients) + materializationManager := compute.NewMaterializationManager(&compileManager, &computeClients, &kaskadaTableClient, &materializationClient) // gRPC Health Server healthServer := health.NewServer() @@ -287,10 +291,10 @@ func main() { metricsProvider.RegisterGrpc(grpcServer) dependencyAnalyzerService := service.NewDependencyAnalyzer(&kaskadaViewClient, &materializationClient) - tableService := service.NewTableService(&computeManager, &kaskadaTableClient, &objectStoreClient, tableStore, &dependencyAnalyzerService) - viewService := service.NewViewService(&computeManager, &kaskadaTableClient, &kaskadaViewClient, &dependencyAnalyzerService) - materializationService := service.NewMaterializationService(&computeManager, &kaskadaTableClient, &kaskadaViewClient, &dataTokenClient, &materializationClient) - queryV1Service := service.NewQueryV1Service(&computeManager, &dataTokenClient, &kaskadaQueryClient, &objectStoreClient) + tableService := service.NewTableService(&computeManager, &fileManager, &kaskadaTableClient, &objectStoreClient, tableStore, &dependencyAnalyzerService) + viewService := service.NewViewService(&compileManager, &kaskadaTableClient, &kaskadaViewClient, &dependencyAnalyzerService) + materializationService := service.NewMaterializationService(&computeManager, &materializationManager, &kaskadaTableClient, &kaskadaViewClient, &dataTokenClient, &materializationClient) + queryV1Service := service.NewQueryV1Service(&computeManager, &dataTokenClient, &kaskadaQueryClient, &objectStoreClient, &prepareManager) // Register the grpc services v1alpha.RegisterDataTokenServiceServer(grpcServer, service.NewDataTokenService(&dataTokenClient)) diff --git a/wren/service/materialization.go b/wren/service/materialization.go index f6d51b9e6..e0e2c828b 100644 --- a/wren/service/materialization.go +++ b/wren/service/materialization.go @@ -27,21 +27,23 @@ type MaterializationService interface { type materializationService struct { v1alpha.UnimplementedMaterializationServiceServer - kaskadaTableClient internal.KaskadaTableClient - kaskadaViewClient internal.KaskadaViewClient - dataTokenClient internal.DataTokenClient - materializationClient internal.MaterializationClient - computeManager compute.ComputeManager + kaskadaTableClient internal.KaskadaTableClient + kaskadaViewClient internal.KaskadaViewClient + dataTokenClient internal.DataTokenClient + materializationClient internal.MaterializationClient + computeManager compute.ComputeManager + materializationManager compute.MaterializationManager } // NewMaterializationService creates a new materialization service -func NewMaterializationService(computeManager *compute.ComputeManager, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient, dataTokenClient *internal.DataTokenClient, materializationClient *internal.MaterializationClient) *materializationService { +func NewMaterializationService(computeManager *compute.ComputeManager, materializationManager *compute.MaterializationManager, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient, dataTokenClient *internal.DataTokenClient, materializationClient *internal.MaterializationClient) *materializationService { return &materializationService{ - kaskadaTableClient: *kaskadaTableClient, - kaskadaViewClient: *kaskadaViewClient, - dataTokenClient: *dataTokenClient, - materializationClient: *materializationClient, - computeManager: *computeManager, + kaskadaTableClient: *kaskadaTableClient, + kaskadaViewClient: *kaskadaViewClient, + dataTokenClient: *dataTokenClient, + materializationClient: *materializationClient, + computeManager: *computeManager, + materializationManager: *materializationManager, } } @@ -159,8 +161,7 @@ func (s *materializationService) createMaterialization(ctx context.Context, owne return nil, customerrors.NewInvalidArgumentErrorWithCustomText("missing materialization destination") } - isExperimental := false - compileResp, err := s.computeManager.CompileQuery(ctx, owner, request.Materialization.Expression, request.Materialization.WithViews, false, isExperimental, request.Materialization.Slice, v1alpha.Query_RESULT_BEHAVIOR_FINAL_RESULTS) + compileResp, _, err := s.materializationManager.CompileV1Materialization(ctx, owner, request.Materialization) if err != nil { subLogger.Error().Err(err).Msg("issue compiling materialization") return nil, err diff --git a/wren/service/query_v1.go b/wren/service/query_v1.go index e79e86873..526df315c 100644 --- a/wren/service/query_v1.go +++ b/wren/service/query_v1.go @@ -40,15 +40,17 @@ type queryV1Service struct { dataTokenClient internal.DataTokenClient kaskadaQueryClient internal.KaskadaQueryClient objectStoreClient client.ObjectStoreClient + prepareManager compute.PrepareManager } // NewQueryV1Service creates a new query service -func NewQueryV1Service(computeManager *compute.ComputeManager, dataTokenClient *internal.DataTokenClient, kaskadaQueryClient *internal.KaskadaQueryClient, objectStoreClient *client.ObjectStoreClient) v1alpha.QueryServiceServer { +func NewQueryV1Service(computeManager *compute.ComputeManager, dataTokenClient *internal.DataTokenClient, kaskadaQueryClient *internal.KaskadaQueryClient, objectStoreClient *client.ObjectStoreClient, prepareManager *compute.PrepareManager) v1alpha.QueryServiceServer { return &queryV1Service{ computeManager: *computeManager, dataTokenClient: *dataTokenClient, kaskadaQueryClient: *kaskadaQueryClient, objectStoreClient: *objectStoreClient, + prepareManager: *prepareManager, } } @@ -67,58 +69,27 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon return wrapErrorWithStatus(err, subLogger) } - if request.Query.ResultBehavior == v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED { - request.Query.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS - } - - queryRequest := compute.QueryRequest{ - Query: request.Query.Expression, - RequestViews: make([]*v1alpha.WithView, 0), - SliceRequest: request.Query.Slice, - ResultBehavior: request.Query.ResultBehavior, - } - - queryOptions := compute.QueryOptions{ - IsFormula: false, - IsExperimental: false, - } - - if request.QueryOptions != nil && request.QueryOptions.ExperimentalFeatures { - queryOptions.IsExperimental = true - } + query := request.Query + queryOptions := request.QueryOptions - previousQueryId, err := uuid.Parse(request.Query.QueryId) + previousQueryId, err := uuid.Parse(query.QueryId) if err == nil { previousQuery, err := q.kaskadaQueryClient.GetKaskadaQuery(ctx, owner, previousQueryId, false) if err != nil { subLogger.Debug().Msg("returning from GetKaskadaQuery") return wrapErrorWithStatus(err, subLogger) } - queryRequest.Query = previousQuery.Expression - for _, view := range previousQuery.Query.Views { - queryRequest.RequestViews = append(queryRequest.RequestViews, &v1alpha.WithView{ - Name: view.ViewName, - Expression: view.Expression, - }) - } - queryRequest.SliceRequest = previousQuery.Query.Slice - queryRequest.ResultBehavior = previousQuery.Query.ResultBehavior + query = previousQuery.Query } - compileRequest, err := q.computeManager.CreateCompileRequest(ctx, owner, &queryRequest, &queryOptions) - if err != nil { - subLogger.Debug().Msg("returning from CreateCompileRequest") - return wrapErrorWithStatus(err, subLogger) + if query.ResultBehavior == v1alpha.Query_RESULT_BEHAVIOR_UNSPECIFIED { + query.ResultBehavior = v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS } - compileResponse, err := q.computeManager.RunCompileRequest(ctx, owner, compileRequest) - if err != nil { - subLogger.Debug().Msg("returning from RunCompileRequest") - return wrapErrorWithStatus(err, subLogger) - } + compileResponse, views, err := q.computeManager.CompileV1Query(ctx, owner, query, queryOptions) // Update the request views with only the views required for the query. - request.Query.Views = compileResponse.Views + request.Query.Views = views analysisResponse := &v1alpha.CreateQueryResponse{ State: v1alpha.CreateQueryResponse_STATE_ANALYSIS, @@ -126,10 +97,10 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon SliceRequest: request.Query.Slice, }, Analysis: &v1alpha.CreateQueryResponse_Analysis{ - CanExecute: compileResponse.ComputeResponse.Plan != nil, - Schema: compileResponse.ComputeResponse.ResultType.GetStruct(), + CanExecute: compileResponse.Plan != nil, + Schema: compileResponse.ResultType.GetStruct(), }, - FenlDiagnostics: compileResponse.ComputeResponse.FenlDiagnostics, + FenlDiagnostics: compileResponse.FenlDiagnostics, } dataTokenId := "" @@ -158,7 +129,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon responseStream.Send(q.addMetricsIfRequested(request, analysisResponse, metrics)) } - if compileResponse.ComputeResponse.Plan == nil { + if compileResponse.Plan == nil { responseStream.Send(&v1alpha.CreateQueryResponse{ State: v1alpha.CreateQueryResponse_STATE_FAILURE, Metrics: metrics, @@ -167,10 +138,10 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon return nil } - query, err := q.kaskadaQueryClient.CreateKaskadaQuery(ctx, owner, &ent.KaskadaQuery{ - Expression: request.Query.Expression, + kaskadaQuery, err := q.kaskadaQueryClient.CreateKaskadaQuery(ctx, owner, &ent.KaskadaQuery{ + Expression: query.Expression, DataTokenID: dataToken.ID, - Query: request.Query, + Query: query, }, false) if err != nil { @@ -179,7 +150,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon } queryIDResponse := &v1alpha.CreateQueryResponse{ - QueryId: query.ID.String(), + QueryId: kaskadaQuery.ID.String(), } responseStream.Send(queryIDResponse) @@ -215,7 +186,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon } // do prepare - tables, err := q.computeManager.GetTablesForCompute(ctx, owner, dataToken, compileResponse.ComputeResponse.TableSlices) + tables, err := q.prepareManager.PrepareTablesForCompute(ctx, owner, dataToken, compileResponse.TableSlices) if err != nil { subLogger.Error().Err(err).Str("data_token", dataToken.ID.String()).Msg("issue getting tables for compute") return wrapErrorWithStatus(err, subLogger) @@ -235,7 +206,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon destination := &v1alpha.Destination{} if request.Query.Destination != nil { - outputURI := q.computeManager.GetOutputURI(owner, compileResponse.ComputeResponse.PlanHash.Hash) + outputURI := q.computeManager.GetOutputURI(owner, compileResponse.PlanHash.Hash) switch kind := request.Query.Destination.Destination.(type) { case *v1alpha.Destination_ObjectStore: destination.Destination = &v1alpha.Destination_ObjectStore{ @@ -255,7 +226,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon // * send progress messages every progressReportingSeconds interval computeTimerContext, computeTimerContextCancel := context.WithCancel(ctx) defer computeTimerContextCancel() - queryContext, queryContextCancel := compute.GetNewQueryContext(ctx, owner, request.Query.ChangedSinceTime, compileResponse.ComputeResponse, dataToken, request.Query.FinalResultTime, dataTokenId == "", limits, destination, request.Query.Slice, tables) + queryContext, queryContextCancel := compute.GetNewQueryContext(ctx, owner, request.Query.ChangedSinceTime, compileResponse, dataToken, request.Query.FinalResultTime, dataTokenId == "", limits, destination, request.Query.Slice, tables) defer queryContextCancel() go func(ctx context.Context) { for { @@ -341,7 +312,7 @@ func (q *queryV1Service) CreateQuery(request *v1alpha.CreateQueryRequest, respon } } - outputURI := q.computeManager.GetOutputURI(owner, compileResponse.ComputeResponse.PlanHash.Hash) + outputURI := q.computeManager.GetOutputURI(owner, compileResponse.PlanHash.Hash) queryResponse.Destination = &v1alpha.Destination{ Destination: &v1alpha.Destination_ObjectStore{ ObjectStore: &v1alpha.ObjectStoreDestination{ diff --git a/wren/service/query_v2.go b/wren/service/query_v2.go index adc3556f8..ea967597d 100644 --- a/wren/service/query_v2.go +++ b/wren/service/query_v2.go @@ -33,7 +33,8 @@ type queryV2Service struct { // NewQueryV2Service creates a new query service func NewQueryV2Service(computeManager *compute.ComputeManager, dataTokenClient *internal.DataTokenClient, kaskadaQueryClient *internal.KaskadaQueryClient) apiv2alpha.QueryServiceServer { return &queryV2Service{ - computeManager: *computeManager, + computeManager: *computeManager, + dataTokenClient: *dataTokenClient, kaskadaQueryClient: *kaskadaQueryClient, } @@ -87,63 +88,59 @@ func (q *queryV2Service) CreateQuery(ctx context.Context, request *apiv2alpha.Cr // createQuery creates a query for a user if it compiles and it isn't a dry-run func (q *queryV2Service) createQuery(ctx context.Context, owner *ent.Owner, request *apiv2alpha.CreateQueryRequest) (*apiv2alpha.CreateQueryResponse, error) { - subLogger := log.Ctx(ctx).With().Str("method", "queryV2Service.createQuery").Logger() - - queryViews := request.Views - if queryViews == nil { - queryViews = &apiv2alpha.QueryViews{ - Views: []*apiv2alpha.QueryView{}, - } - } - queryConfig := request.Config if queryConfig == nil { queryConfig = &apiv2alpha.QueryConfig{} } - if queryConfig.DataToken == nil { queryConfig.DataToken = &apiv2alpha.DataToken{ DataToken: &apiv2alpha.DataToken_LatestDataToken{}, } } - if queryConfig.Destination == nil { - queryConfig.Destination = &v1alpha.Destination{ - Destination: &v1alpha.Destination_ObjectStore{ - ObjectStore: &v1alpha.ObjectStoreDestination{ - FileType: v1alpha.FileType_FILE_TYPE_PARQUET, + queryConfig.Destination = &apiv1alpha.Destination{ + Destination: &apiv1alpha.Destination_ObjectStore{ + ObjectStore: &apiv1alpha.ObjectStoreDestination{ + FileType: apiv1alpha.FileType_FILE_TYPE_PARQUET, }, }, } } - if queryConfig.ResultBehavior == nil { queryConfig.ResultBehavior = &apiv2alpha.ResultBehavior{ ResultBehavior: &apiv2alpha.ResultBehavior_AllResults{}, } } - formulas, err := q.computeManager.GetFormulas(ctx, owner, queryViews) - if err != nil { - subLogger.Error().Err(err).Msg("issue getting formulas") + if request.Views == nil { + request.Views = &apiv2alpha.QueryViews{} + } + if request.Views.Views == nil { + request.Views.Views = []*apiv2alpha.QueryView{} } - compileResp, err := q.computeManager.CompileQueryV2(ctx, owner, request.Expression, formulas, queryConfig) + compileResponse, views, err := q.computeManager.CompileV2Query(ctx, owner, request.Expression, request.Views.Views, queryConfig) if err != nil { - subLogger.Debug().Msg("returning from CompileQueryV2") return nil, err } newKaskadaQuery := &ent.KaskadaQuery{ Config: queryConfig, - CompileResponse: compileResp, + CompileResponse: compileResponse, Expression: request.Expression, Metrics: &apiv2alpha.QueryMetrics{}, - Views: q.computeManager.GetUsedViews(formulas, compileResp), + Views: &apiv2alpha.QueryViews{Views: make([]*apiv2alpha.QueryView, len(views))}, + } + + for i, view := range views { + newKaskadaQuery.Views.Views[i] = &apiv2alpha.QueryView{ + ViewName: view.ViewName, + Expression: view.Expression, + } } - if compileResp.FenlDiagnostics.NumErrors == 0 { + if compileResponse.FenlDiagnostics.NumErrors == 0 { newKaskadaQuery.State = property.QueryStateCompiled } else { newKaskadaQuery.State = property.QueryStateFailure diff --git a/wren/service/table.go b/wren/service/table.go index c6e631108..eaa7c2033 100644 --- a/wren/service/table.go +++ b/wren/service/table.go @@ -38,6 +38,7 @@ type tableService struct { v1alpha.UnimplementedTableServiceServer computeManager compute.ComputeManager + fileManager compute.FileManager kaskadaTableClient internal.KaskadaTableClient objectStoreClient client.ObjectStoreClient tableStore *store.TableStore @@ -45,9 +46,10 @@ type tableService struct { } // NewTableService creates a new table service -func NewTableService(computeManager *compute.ComputeManager, kaskadaTableClient *internal.KaskadaTableClient, objectStoreClient *client.ObjectStoreClient, tableStore *store.TableStore, dependencyAnalyzer *Analyzer) *tableService { +func NewTableService(computeManager *compute.ComputeManager, fileManager *compute.FileManager, kaskadaTableClient *internal.KaskadaTableClient, objectStoreClient *client.ObjectStoreClient, tableStore *store.TableStore, dependencyAnalyzer *Analyzer) *tableService { return &tableService{ computeManager: *computeManager, + fileManager: *fileManager, kaskadaTableClient: *kaskadaTableClient, objectStoreClient: *objectStoreClient, tableStore: tableStore, @@ -335,7 +337,7 @@ func (t *tableService) loadFileIntoTable(ctx context.Context, owner *ent.Owner, func (t *tableService) validateFileSchema(ctx context.Context, kaskadaTable ent.KaskadaTable, fileInput internal.FileInput) (*v1alpha.Schema, error) { subLogger := log.Ctx(ctx).With().Str("method", "table.validateFileSchema").Logger() - fileSchema, err := t.computeManager.GetFileSchema(ctx, fileInput) + fileSchema, err := t.fileManager.GetFileSchema(ctx, fileInput) if err != nil { subLogger.Error().Err(err).Msg("issue getting schema for file") return nil, reMapSparrowError(ctx, err) diff --git a/wren/service/view.go b/wren/service/view.go index d85e345a8..090e31d51 100644 --- a/wren/service/view.go +++ b/wren/service/view.go @@ -29,16 +29,16 @@ type ViewService interface { type viewService struct { pb.UnimplementedViewServiceServer - computeManager compute.ComputeManager + compileManager compute.CompileManager kaskadaTableClient internal.KaskadaTableClient kaskadaViewClient internal.KaskadaViewClient dependencyAnalyzer Analyzer } // NewViewService creates a new view service -func NewViewService(computeManager *compute.ComputeManager, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient, dependencyAnalyzer *Analyzer) *viewService { +func NewViewService(compileManager *compute.CompileManager, kaskadaTableClient *internal.KaskadaTableClient, kaskadaViewClient *internal.KaskadaViewClient, dependencyAnalyzer *Analyzer) *viewService { return &viewService{ - computeManager: *computeManager, + compileManager: *compileManager, kaskadaTableClient: *kaskadaTableClient, kaskadaViewClient: *kaskadaViewClient, dependencyAnalyzer: *dependencyAnalyzer, @@ -147,7 +147,7 @@ func (s *viewService) CreateView(ctx context.Context, request *pb.CreateViewRequ func (s *viewService) createView(ctx context.Context, owner *ent.Owner, request *pb.CreateViewRequest) (*pb.CreateViewResponse, error) { subLogger := log.Ctx(ctx).With().Str("method", "viewService.createView").Str("expression", request.View.Expression).Logger() - compileResp, err := s.computeManager.CompileQuery(ctx, owner, request.View.Expression, []*pb.WithView{}, true, false, nil, pb.Query_RESULT_BEHAVIOR_ALL_RESULTS) + compileResp, err := s.compileManager.CompileV1View(ctx, owner, request.View) if err != nil { subLogger.Error().Err(err).Msg("issue compiling view") return nil, err