From 7092c0e189d83fe67a931f16b0bd97632d505304 Mon Sep 17 00:00:00 2001 From: Eric Pinzur Date: Thu, 25 May 2023 14:58:36 +0200 Subject: [PATCH] added initial compile unit test --- wren/.mockery.yaml | 7 + wren/client/compute_clients.go | 18 ++- wren/compute/compile_manager.go | 7 +- wren/compute/compile_manager_test.go | 195 ++++++++++++++++++++++++ wren/compute/compute_manager.go | 8 +- wren/compute/compute_suite_test.go | 73 +++++++++ wren/compute/file_manager.go | 2 +- wren/compute/materialization_manager.go | 6 +- wren/compute/prepare_manager.go | 4 +- wren/main.go | 10 +- 10 files changed, 306 insertions(+), 24 deletions(-) create mode 100644 wren/compute/compile_manager_test.go create mode 100644 wren/compute/compute_suite_test.go diff --git a/wren/.mockery.yaml b/wren/.mockery.yaml index f352f3fbf..73593a41b 100644 --- a/wren/.mockery.yaml +++ b/wren/.mockery.yaml @@ -21,3 +21,10 @@ packages: FileManager: MaterializationManager: PrepareManager: + github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha: + config: + dir: "{{.InterfaceDir}}" + interfaces: + ComputeServiceClient: + FileServiceClient: + PreparationServiceClient: diff --git a/wren/client/compute_clients.go b/wren/client/compute_clients.go index a192189a8..51f22fcb6 100644 --- a/wren/client/compute_clients.go +++ b/wren/client/compute_clients.go @@ -6,16 +6,22 @@ import ( "github.com/rs/zerolog/log" ) +type ComputeClients interface { + NewFileServiceClient(ctx context.Context) FileServiceClient + NewPrepareServiceClient(ctx context.Context) PrepareServiceClient + NewComputeServiceClient(ctx context.Context) ComputeServiceClient +} + // ComputeClients is the container to hold client for communicating with compute services -type ComputeClients struct { +type computeClients struct { fileServiceConfig *HostConfig prepareServiceConfig *HostConfig computeServiceConfig *HostConfig } // CreateComputeClients initializes the computeClients -func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) *ComputeClients { - return &ComputeClients{ +func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *HostConfig, computeServiceConfig *HostConfig) ComputeClients { + return &computeClients{ fileServiceConfig: fileServiceConfig, prepareServiceConfig: prepareServiceConfig, computeServiceConfig: computeServiceConfig, @@ -23,7 +29,7 @@ func CreateComputeClients(fileServiceConfig *HostConfig, prepareServiceConfig *H } // FileServiceClient creates a new FileServiceClient from the configuration and context -func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClient { +func (c *computeClients) NewFileServiceClient(ctx context.Context) FileServiceClient { conn, err := connection(ctx, c.fileServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.fileServiceConfig).Msg("unable to dial FileServiceClient") @@ -35,7 +41,7 @@ func (c *ComputeClients) FileServiceClient(ctx context.Context) FileServiceClien } // PrepareServiceClient creates a new PrepareServiceClient from the configuration and context -func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServiceClient { +func (c *computeClients) NewPrepareServiceClient(ctx context.Context) PrepareServiceClient { conn, err := connection(ctx, c.prepareServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.prepareServiceConfig).Msg("unable to dial PrepareServiceClient") @@ -47,7 +53,7 @@ func (c *ComputeClients) PrepareServiceClient(ctx context.Context) PrepareServic } // ComputeServiceClient creates a new ComputeServiceClient from the configuration and context -func (c *ComputeClients) ComputeServiceClient(ctx context.Context) ComputeServiceClient { +func (c *computeClients) NewComputeServiceClient(ctx context.Context) ComputeServiceClient { conn, err := connection(ctx, c.computeServiceConfig) if err != nil { log.Ctx(ctx).Fatal().Err(err).Interface("host_config", c.computeServiceConfig).Msg("unable to dial ComputeServiceClient") diff --git a/wren/compute/compile_manager.go b/wren/compute/compile_manager.go index d4789db03..14b2383e9 100644 --- a/wren/compute/compile_manager.go +++ b/wren/compute/compile_manager.go @@ -48,6 +48,7 @@ func NewCompileManager(computeClients *client.ComputeClients, kaskadaTableClient kaskadaViewClient: *kaskadaViewClient, } } + func (m *compileManager) CompileEntMaterialization(ctx context.Context, owner *ent.Owner, materialization *ent.Materialization) (*v1alpha.CompileResponse, []*v1alpha.View, error) { compileRequest := &compileRequest{ Expression: materialization.Expression, @@ -95,7 +96,7 @@ func (m *compileManager) CompileV1Query(ctx context.Context, owner *ent.Owner, q } func (m *compileManager) CompileV2Query(ctx context.Context, owner *ent.Owner, expression string, views []*v2alpha.QueryView, queryConfig *v2alpha.QueryConfig) (*v1alpha.CompileResponse, []*v1alpha.View, error) { - + compileRequest := &compileRequest{ Expression: expression, Views: make([]*v1alpha.WithView, len(views)), @@ -208,7 +209,7 @@ func (m *compileManager) compile(ctx context.Context, owner *ent.Owner, request compileRequest.ExpressionKind = v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE } - computeClient := m.computeClients.ComputeServiceClient(ctx) + computeClient := m.computeClients.NewComputeServiceClient(ctx) defer computeClient.Close() subLogger.Info().Interface("request", compileRequest).Msg("sending compile request") @@ -265,7 +266,7 @@ func (m *compileManager) getFormulaMap(ctx context.Context, owner *ent.Owner, re formulaMap[requestView.Name] = &v1alpha.Formula{ Name: requestView.Name, Formula: requestView.Expression, - SourceLocation: fmt.Sprintf("Requested View %s", requestView.Name), + SourceLocation: fmt.Sprintf("Requested View: %s", requestView.Name), } } diff --git a/wren/compute/compile_manager_test.go b/wren/compute/compile_manager_test.go new file mode 100644 index 000000000..c3b8ebfef --- /dev/null +++ b/wren/compute/compile_manager_test.go @@ -0,0 +1,195 @@ +package compute + +import ( + "context" + + "github.com/google/uuid" + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + "github.com/kaskada-ai/kaskada/wren/ent" + "github.com/kaskada-ai/kaskada/wren/internal" + "github.com/stretchr/testify/mock" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("CompileManager", func() { + + var ( + ctx context.Context + owner *ent.Owner + + defaultUUID = uuid.MustParse("00000000-0000-0000-0000-000000000000") + + mockComputeServiceClient *v1alpha.MockComputeServiceClient + mockFileServiceClient *v1alpha.MockFileServiceClient + mockPreparationServiceClient *v1alpha.MockPreparationServiceClient + mockKaskadaTableClient *internal.MockKaskadaTableClient + mockKaskadaViewClient *internal.MockKaskadaViewClient + + objectStoreDestination = &v1alpha.Destination{ + Destination: &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: v1alpha.FileType_FILE_TYPE_CSV, + OutputPrefixUri: "gs://some-bucket/some-prefix", + }, + }, + } + + sliceRequest = &v1alpha.SliceRequest{ + Slice: &v1alpha.SliceRequest_Percent{ + Percent: &v1alpha.SliceRequest_PercentSlice{ + Percent: 42, + }, + }, + } + + persistedViews = []*ent.KaskadaView{ + { + Name: "persisted_view", + Expression: "persisted_view_expression", + }, + { + Name: "overwritten_view", + Expression: "overwritten_view_expression", + }, + } + + persistedTables = []*ent.KaskadaTable{ + { + ID: defaultUUID, + Name: "persisted_table1", + MergedSchema: &v1alpha.Schema{}, + }, + { + ID: defaultUUID, + Name: "persisted_table2", + MergedSchema: &v1alpha.Schema{}, + }, + } + ) + + BeforeEach(func() { + ctx = context.Background() + owner = &ent.Owner{} + + mockKaskadaTableClient = internal.NewMockKaskadaTableClient(GinkgoT()) + mockKaskadaViewClient = internal.NewMockKaskadaViewClient(GinkgoT()) + mockComputeServiceClient = v1alpha.NewMockComputeServiceClient(GinkgoT()) + mockFileServiceClient = v1alpha.NewMockFileServiceClient(GinkgoT()) + mockPreparationServiceClient = v1alpha.NewMockPreparationServiceClient(GinkgoT()) + + }) + + Context("CompileEntMaterialization", func() { + It("should compile a materialization", func() { + mockKaskadaViewClient.EXPECT().GetAllKaskadaViews(ctx, owner).Return(persistedViews, nil) + + mockKaskadaTableClient.EXPECT().GetAllKaskadaTables(ctx, owner).Return(persistedTables, nil) + + entMaterialization := &ent.Materialization{ + Name: "ent_materialization", + Expression: "ent_materialization_expression", + Destination: objectStoreDestination, + SliceRequest: sliceRequest, + WithViews: &v1alpha.WithViews{ + Views: []*v1alpha.WithView{ + { + Name: "with_view", + Expression: "with_view_expression", + }, + { + Name: "overwritten_view", + Expression: "overwritten_view_expression2", + }, + }, + }, + } + + computeTables := []*v1alpha.ComputeTable{ + { + Config: &v1alpha.TableConfig{ + Name: "persisted_table1", + Uuid: defaultUUID.String(), + }, + Metadata: &v1alpha.TableMetadata{ + Schema: &v1alpha.Schema{}, + }, + FileSets: []*v1alpha.ComputeTable_FileSet{}, + }, + { + Config: &v1alpha.TableConfig{ + Name: "persisted_table2", + Uuid: defaultUUID.String(), + }, + Metadata: &v1alpha.TableMetadata{ + Schema: &v1alpha.Schema{}, + }, + FileSets: []*v1alpha.ComputeTable_FileSet{}, + }, + } + + formulas := []*v1alpha.Formula{ + { + Name: "persisted_view", + Formula: "persisted_view_expression", + SourceLocation: "Persisted View: persisted_view", + }, + { + Name: "overwritten_view", + Formula: "overwritten_view_expression2", + SourceLocation: "Requested View: overwritten_view", + }, + { + Name: "with_view", + Formula: "with_view_expression", + SourceLocation: "Requested View: with_view", + }, + } + + compileRequest := &v1alpha.CompileRequest{ + Experimental: false, + FeatureSet: &v1alpha.FeatureSet{ + Formulas: formulas, + Query: entMaterialization.Expression, + }, + PerEntityBehavior: v1alpha.PerEntityBehavior_PER_ENTITY_BEHAVIOR_FINAL, + SliceRequest: sliceRequest, + Tables: computeTables, + ExpressionKind: v1alpha.CompileRequest_EXPRESSION_KIND_COMPLETE, + } + + compileResponse := &v1alpha.CompileResponse{ + FreeNames: []string{"with_view", "overwritten_view", "persisted_table1"}, + } + + mockComputeServiceClient.EXPECT().Compile(mock.Anything, compileRequest).Return(compileResponse, nil) + + computeClients := newMockComputeServiceClients(mockFileServiceClient, mockPreparationServiceClient, mockComputeServiceClient) + compManager := &compileManager{ + computeClients: computeClients, + kaskadaTableClient: mockKaskadaTableClient, + kaskadaViewClient: mockKaskadaViewClient, + } + + compileResponse, views, err := compManager.CompileEntMaterialization(ctx, owner, entMaterialization) + Expect(err).ToNot(HaveOccurred()) + Expect(compileResponse).ToNot(BeNil()) + Expect(views).ToNot(BeNil()) + + expectedViews := []*v1alpha.View{ + { + ViewName: "with_view", + Expression: "with_view_expression", + }, + { + ViewName: "overwritten_view", + Expression: "overwritten_view_expression2", + }, + } + + Expect(views).To(Equal(expectedViews)) + + }) + }) +}) diff --git a/wren/compute/compute_manager.go b/wren/compute/compute_manager.go index db6d197ee..3a7e842be 100644 --- a/wren/compute/compute_manager.go +++ b/wren/compute/compute_manager.go @@ -47,7 +47,7 @@ type computeManager struct { CompileManager prepareManager PrepareManager - computeClients *client.ComputeClients + computeClients client.ComputeClients errGroup *errgroup.Group dataTokenClient internal.DataTokenClient kaskadaTableClient internal.KaskadaTableClient @@ -60,7 +60,7 @@ type computeManager struct { func NewComputeManager(errGroup *errgroup.Group, compileManager *CompileManager, computeClients *client.ComputeClients, dataTokenClient *internal.DataTokenClient, kaskadaTableClient *internal.KaskadaTableClient, materializationClient *internal.MaterializationClient, objectStoreClient *client.ObjectStoreClient, prepareManager *PrepareManager) ComputeManager { return &computeManager{ CompileManager: *compileManager, - computeClients: computeClients, + computeClients: *computeClients, errGroup: errGroup, dataTokenClient: *dataTokenClient, kaskadaTableClient: *kaskadaTableClient, @@ -102,7 +102,7 @@ func (m *computeManager) InitiateQuery(queryContext *QueryContext) (client.Compu return nil, nil, err } - queryClient := m.computeClients.ComputeServiceClient(queryContext.ctx) + queryClient := m.computeClients.NewComputeServiceClient(queryContext.ctx) subLogger.Info().Bool("incremental_enabled", queryContext.compileResp.IncrementalEnabled).Bool("is_current_data_token", queryContext.isCurrentDataToken).Msg("Populating snapshot config if needed") if queryContext.compileResp.IncrementalEnabled && queryContext.isCurrentDataToken && queryContext.compileResp.PlanHash != nil { @@ -346,7 +346,7 @@ func (m *computeManager) processMaterializations(requestCtx context.Context, own // gets the current snapshot cache buster func (m *computeManager) getSnapshotCacheBuster(ctx context.Context) (*int32, error) { subLogger := log.Ctx(ctx).With().Str("method", "manager.getSnapshotCacheBuster").Logger() - queryClient := m.computeClients.ComputeServiceClient(ctx) + queryClient := m.computeClients.NewComputeServiceClient(ctx) defer queryClient.Close() res, err := queryClient.GetCurrentSnapshotVersion(ctx, &v1alpha.GetCurrentSnapshotVersionRequest{}) diff --git a/wren/compute/compute_suite_test.go b/wren/compute/compute_suite_test.go new file mode 100644 index 000000000..e4e50523a --- /dev/null +++ b/wren/compute/compute_suite_test.go @@ -0,0 +1,73 @@ +package compute + +import ( + "context" + "testing" + + v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" + + "github.com/kaskada-ai/kaskada/wren/client" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestCompute(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Compute Suite") +} + +type mockComputeServiceClients struct { + fileServiceClient v1alpha.MockFileServiceClient + preparationServiceClient v1alpha.MockPreparationServiceClient + computeServiceClient v1alpha.MockComputeServiceClient +} + +func newMockComputeServiceClients(fileServiceClient *v1alpha.MockFileServiceClient, preparationServiceClient *v1alpha.MockPreparationServiceClient, computeServiceClient *v1alpha.MockComputeServiceClient) client.ComputeClients { + return &mockComputeServiceClients{ + fileServiceClient: *fileServiceClient, + preparationServiceClient: *preparationServiceClient, + computeServiceClient: *computeServiceClient, + } +} + +func (m *mockComputeServiceClients) NewFileServiceClient(ctx context.Context) client.FileServiceClient { + return &mockFileServiceClient{ + MockFileServiceClient: m.fileServiceClient, + } +} + +func (m *mockComputeServiceClients) NewPrepareServiceClient(ctx context.Context) client.PrepareServiceClient { + return &mockPrepareServiceClient{ + MockPreparationServiceClient: m.preparationServiceClient, + } +} + +func (m *mockComputeServiceClients) NewComputeServiceClient(ctx context.Context) client.ComputeServiceClient { + return &mockComputeServiceClient{ + MockComputeServiceClient: m.computeServiceClient, + } +} + +type mockFileServiceClient struct { + v1alpha.MockFileServiceClient +} + +func (s mockFileServiceClient) Close() error { + return nil +} + +type mockPrepareServiceClient struct { + v1alpha.MockPreparationServiceClient +} + +func (s mockPrepareServiceClient) Close() error { + return nil +} + +type mockComputeServiceClient struct { + v1alpha.MockComputeServiceClient +} + +func (s mockComputeServiceClient) Close() error { + return nil +} diff --git a/wren/compute/file_manager.go b/wren/compute/file_manager.go index 56c58414f..bec9ada9f 100644 --- a/wren/compute/file_manager.go +++ b/wren/compute/file_manager.go @@ -42,7 +42,7 @@ func (m *fileManager) GetFileSchema(ctx context.Context, fileInput internal.File sourceData = &v1alpha.SourceData{Source: &v1alpha.SourceData_ParquetPath{ParquetPath: fileInput.GetURI()}} } - fileClient := m.computeClients.FileServiceClient(ctx) + fileClient := m.computeClients.NewFileServiceClient(ctx) defer fileClient.Close() metadataReq := &v1alpha.GetMetadataRequest{ diff --git a/wren/compute/materialization_manager.go b/wren/compute/materialization_manager.go index bec11fc16..c86fd3835 100644 --- a/wren/compute/materialization_manager.go +++ b/wren/compute/materialization_manager.go @@ -56,7 +56,7 @@ func (m *materializationManager) StartMaterialization(ctx context.Context, owner Destination: destination, } - computeClient := m.computeClients.ComputeServiceClient(ctx) + computeClient := m.computeClients.NewComputeServiceClient(ctx) defer computeClient.Close() subLogger.Info(). @@ -79,7 +79,7 @@ func (m *materializationManager) StopMaterialization(ctx context.Context, materi MaterializationId: materializationID, } - computeClient := m.computeClients.ComputeServiceClient(ctx) + computeClient := m.computeClients.NewComputeServiceClient(ctx) defer computeClient.Close() _, err := computeClient.StopMaterialization(ctx, stopRequest) @@ -98,7 +98,7 @@ func (m *materializationManager) GetMaterializationStatus(ctx context.Context, m MaterializationId: materializationID, } - computeClient := m.computeClients.ComputeServiceClient(ctx) + computeClient := m.computeClients.NewComputeServiceClient(ctx) defer computeClient.Close() statusResponse, err := computeClient.GetMaterializationStatus(ctx, statusRequest) diff --git a/wren/compute/prepare_manager.go b/wren/compute/prepare_manager.go index dafcbd9b7..50ddd747f 100644 --- a/wren/compute/prepare_manager.go +++ b/wren/compute/prepare_manager.go @@ -262,7 +262,7 @@ func (m *prepareManager) executePrepare(ctx context.Context, owner *ent.Owner, p } // Send the preparation request to the prepare client - prepareClient := m.computeClients.PrepareServiceClient(ctx) + prepareClient := m.computeClients.NewPrepareServiceClient(ctx) defer prepareClient.Close() prepareReq := &v1alpha.PrepareDataRequest{ SourceData: sourceData, @@ -356,7 +356,7 @@ func (m *prepareManager) getOrCreatePrepareJobs(ctx context.Context, owner *ent. // gets the current prepare cache buster func (m *prepareManager) GetPrepareCacheBuster(ctx context.Context) (*int32, error) { subLogger := log.Ctx(ctx).With().Str("method", "manager.getPrepareCacheBuster").Logger() - prepareClient := m.computeClients.PrepareServiceClient(ctx) + prepareClient := m.computeClients.NewPrepareServiceClient(ctx) defer prepareClient.Close() res, err := prepareClient.GetCurrentPrepID(ctx, &v1alpha.GetCurrentPrepIDRequest{}) if err != nil { diff --git a/wren/main.go b/wren/main.go index 39d4de3ac..451f9092b 100644 --- a/wren/main.go +++ b/wren/main.go @@ -215,11 +215,11 @@ func main() { // connect to stores tableStore := store.NewTableStore(&objectStoreClient) - compileManager := compute.NewCompileManager(computeClients, &kaskadaTableClient, &kaskadaViewClient) - prepareManager := compute.NewPrepareManager(computeClients, &kaskadaTableClient, &prepareJobClient, parallelizeConfig, tableStore) - computeManager := compute.NewComputeManager(g, &compileManager, computeClients, &dataTokenClient, &kaskadaTableClient, &materializationClient, &objectStoreClient, &prepareManager) - fileManager := compute.NewFileManager(computeClients) - materializationManager := compute.NewMaterializationManager(&compileManager, computeClients, &kaskadaTableClient, &materializationClient) + compileManager := compute.NewCompileManager(&computeClients, &kaskadaTableClient, &kaskadaViewClient) + prepareManager := compute.NewPrepareManager(&computeClients, &kaskadaTableClient, &prepareJobClient, parallelizeConfig, tableStore) + computeManager := compute.NewComputeManager(g, &compileManager, &computeClients, &dataTokenClient, &kaskadaTableClient, &materializationClient, &objectStoreClient, &prepareManager) + fileManager := compute.NewFileManager(&computeClients) + materializationManager := compute.NewMaterializationManager(&compileManager, &computeClients, &kaskadaTableClient, &materializationClient) // gRPC Health Server healthServer := health.NewServer()