diff --git a/playground/backend/internal/code_processing/code_processing_test.go b/playground/backend/internal/code_processing/code_processing_test.go index cae46b653d06c..13eaa1417517b 100644 --- a/playground/backend/internal/code_processing/code_processing_test.go +++ b/playground/backend/internal/code_processing/code_processing_test.go @@ -19,13 +19,16 @@ import ( pb "beam.apache.org/playground/backend/internal/api/v1" "beam.apache.org/playground/backend/internal/cache" "beam.apache.org/playground/backend/internal/cache/local" + "beam.apache.org/playground/backend/internal/cache/redis" "beam.apache.org/playground/backend/internal/environment" "beam.apache.org/playground/backend/internal/executors" "beam.apache.org/playground/backend/internal/fs_tool" "beam.apache.org/playground/backend/internal/utils" "beam.apache.org/playground/backend/internal/validators" "context" + "encoding/json" "fmt" + "github.com/go-redis/redismock/v8" "github.com/google/uuid" "go.uber.org/goleak" "io/fs" @@ -43,9 +46,14 @@ const ( javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n \"bin\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n \"bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n \"JUnit\"\n ]\n}" pythonConfig = "{\n \"compile_cmd\": \"\",\n \"run_cmd\": \"python3\",\n \"compile_args\": [],\n \"run_args\": []\n}" goConfig = "{\n \"compile_cmd\": \"go\",\n \"run_cmd\": \"\",\n \"compile_args\": [\n \"build\",\n \"-o\",\n \"bin\"\n ],\n \"run_args\": [\n ]\n}" - fileName = "fakeFileName" pipelinesFolder = "executable_files" configFolder = "configs" + resourcesFolder = "resources" + helloWordPython = "if __name__ == \"__main__\":\n print(\"Hello world!\")\n" + helloWordGo = "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\")\n}\n" + helloWordJava = "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}" + graphFilePath = "resources/graph.dot" + jsonExtension = ".json" ) var opt goleak.Option @@ -61,16 +69,26 @@ func TestMain(m *testing.M) { func setup() { // create configs for java - err := os.MkdirAll("configs", fs.ModePerm) + err := os.MkdirAll(configFolder, fs.ModePerm) if err != nil { panic(err) } - filePath := filepath.Join("configs", pb.Sdk_SDK_JAVA.String()+".json") + filePath := filepath.Join(configFolder, fmt.Sprintf("%s%s", pb.Sdk_SDK_JAVA.String(), jsonExtension)) err = os.WriteFile(filePath, []byte(javaConfig), 0600) if err != nil { panic(err) } + // create dir with graph file + err = os.MkdirAll(resourcesFolder, fs.ModePerm) + if err != nil { + panic(err) + } + err = os.WriteFile(graphFilePath, []byte("graph"), 0600) + if err != nil { + panic(err) + } + path, err := os.Getwd() if err != nil { panic(err) @@ -90,6 +108,10 @@ func teardown() { if err != nil { panic(fmt.Errorf("error during test teardown: %s", err.Error())) } + err = os.RemoveAll(resourcesFolder) + if err != nil { + panic(fmt.Errorf("error during test teardown: %s", err.Error())) + } os.Clearenv() } @@ -99,11 +121,13 @@ func Test_Process(t *testing.T) { if err != nil { panic(err) } - sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) + sdkJavaEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) if err != nil { panic(err) } - + sdkGoEnv := *sdkJavaEnv + sdkGoEnv.ApacheBeamSdk = pb.Sdk_SDK_GO + incorrectGoHelloWord := "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\").\n}\n" type args struct { ctx context.Context appEnv *environment.ApplicationEnvs @@ -125,7 +149,7 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode method with small timeout. // As a result status into cache should be set as Status_STATUS_RUN_TIMEOUT. - name: "small pipeline execution timeout", + name: "Small pipeline execution timeout", createExecFile: false, code: "", cancelFunc: false, @@ -136,7 +160,7 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: &environment.ApplicationEnvs{}, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -144,7 +168,7 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode method without preparing files with code. // As a result status into cache should be set as Status_STATUS_VALIDATION_ERROR. - name: "validation failed", + name: "Validation failed", createExecFile: false, code: "", cancelFunc: false, @@ -155,7 +179,7 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: appEnvs, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -163,7 +187,7 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode method with incorrect code. // As a result status into cache should be set as Status_STATUS_COMPILE_ERROR. - name: "compilation failed", + name: "Compilation failed", createExecFile: true, code: "MOCK_CODE", cancelFunc: false, @@ -174,7 +198,7 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: appEnvs, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -182,7 +206,7 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode method with incorrect logic into code. // As a result status into cache should be set as Status_STATUS_RUN_ERROR. - name: "run failed", + name: "Run failed", createExecFile: true, code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(1/0);\n }\n}", cancelFunc: false, @@ -193,7 +217,7 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: appEnvs, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -201,7 +225,7 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode with canceling code processing. // As a result status into cache should be set as Status_STATUS_CANCELED. - name: "cancel", + name: "Cancel", createExecFile: true, code: "class HelloWorld {\n public static void main(String[] args) {\n while(true){}\n }\n}", cancelFunc: true, @@ -212,7 +236,7 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: appEnvs, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -220,10 +244,10 @@ func Test_Process(t *testing.T) { { // Test case with calling processCode without any error cases. // As a result status into cache should be set as Status_STATUS_FINISHED. - name: "processing complete successfully", + name: "Processing complete successfully on java sdk", createExecFile: true, cancelFunc: false, - code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}", + code: helloWordJava, expectedStatus: pb.Status_STATUS_FINISHED, expectedCompileOutput: "", expectedRunOutput: "Hello world!\n", @@ -231,7 +255,26 @@ func Test_Process(t *testing.T) { args: args{ ctx: context.Background(), appEnv: appEnvs, - sdkEnv: sdkEnv, + sdkEnv: sdkJavaEnv, + pipelineId: uuid.New(), + pipelineOptions: "", + }, + }, + { + // Test case with calling processCode method with incorrect go code. + // As a result status into cache should be set as Status_STATUS_PREPARATION_ERROR. + name: "Prepare step failed", + createExecFile: true, + code: incorrectGoHelloWord, + cancelFunc: false, + expectedStatus: pb.Status_STATUS_PREPARATION_ERROR, + expectedCompileOutput: nil, + expectedRunOutput: nil, + expectedRunError: nil, + args: args{ + ctx: context.Background(), + appEnv: appEnvs, + sdkEnv: &sdkGoEnv, pipelineId: uuid.New(), pipelineOptions: "", }, @@ -317,7 +360,7 @@ func TestGetProcessingOutput(t *testing.T) { { // Test case with calling GetProcessingOutput with pipelineId which doesn't contain run output. // As a result, want to receive an error. - name: "get run output with incorrect pipelineId", + name: "Get run output with incorrect pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -331,7 +374,7 @@ func TestGetProcessingOutput(t *testing.T) { { // Test case with calling GetProcessingOutput with pipelineId which contains incorrect run output. // As a result, want to receive an error. - name: "get run output with incorrect run output", + name: "Get run output with incorrect run output", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -345,7 +388,7 @@ func TestGetProcessingOutput(t *testing.T) { { // Test case with calling GetProcessingOutput with pipelineId which contains run output. // As a result, want to receive an expected string. - name: "get run output with correct pipelineId", + name: "Get run output with correct pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -399,7 +442,7 @@ func TestGetProcessingStatus(t *testing.T) { { // Test case with calling GetProcessingStatus with pipelineId which doesn't contain status. // As a result, want to receive an error. - name: "get status with incorrect pipelineId", + name: "Get status with incorrect pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -412,7 +455,7 @@ func TestGetProcessingStatus(t *testing.T) { { // Test case with calling GetProcessingStatus with pipelineId which contains incorrect status value in cache. // As a result, want to receive an error. - name: "get status with incorrect cache value", + name: "Get status with incorrect cache value", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -425,7 +468,7 @@ func TestGetProcessingStatus(t *testing.T) { { // Test case with calling GetProcessingStatus with pipelineId which contains status. // As a result, want to receive an expected status. - name: "get status with correct pipelineId", + name: "Get status with correct pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -475,7 +518,7 @@ func TestGetLastIndex(t *testing.T) { { // Test case with calling GetLastIndex with pipelineId which doesn't contain last index. // As a result, want to receive an error. - name: "get last index with incorrect pipelineId", + name: "Get last index with incorrect pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -489,7 +532,7 @@ func TestGetLastIndex(t *testing.T) { { // Test case with calling GetLastIndex with pipelineId which contains incorrect status value in cache. // As a result, want to receive an error. - name: "get last index with incorrect cache value", + name: "Get last index with incorrect cache value", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -503,7 +546,7 @@ func TestGetLastIndex(t *testing.T) { { // Test case with calling GetLastIndex with pipelineId which contains last index. // As a result, want to receive an expected last index. - name: "get last index with correct pipelineId", + name: "Get last index with correct pipelineId", args: args{ ctx: context.Background(), cacheService: cacheService, @@ -559,7 +602,7 @@ func Test_getRunOrTestCmd(t *testing.T) { }{ { //Get cmd objects with set run executor - name: "get run cmd", + name: "Get run cmd", args: args{ isUnitTest: false, executor: &runEx, @@ -569,7 +612,7 @@ func Test_getRunOrTestCmd(t *testing.T) { }, { //Get cmd objects with set test executor - name: "get test cmd", + name: "Get test cmd", args: args{ isUnitTest: true, executor: &testEx, @@ -587,12 +630,25 @@ func Test_getRunOrTestCmd(t *testing.T) { } } -func setupBenchmarks(sdk pb.Sdk) { +func getSdkEnv(sdk pb.Sdk) (*environment.BeamEnvs, error) { + setupSDK(sdk) + appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + if err != nil { + return nil, err + } + sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) + if err != nil { + return nil, err + } + return sdkEnv, nil +} + +func setupSDK(sdk pb.Sdk) { err := os.MkdirAll(configFolder, fs.ModePerm) if err != nil { panic(err) } - filePath := filepath.Join(configFolder, sdk.String()+".json") + filePath := filepath.Join(configFolder, fmt.Sprintf("%s%s", sdk.String(), jsonExtension)) switch sdk { case pb.Sdk_SDK_JAVA: err = os.WriteFile(filePath, []byte(javaConfig), 0600) @@ -640,7 +696,7 @@ func prepareFiles(b *testing.B, pipelineId uuid.UUID, code string, sdk pb.Sdk) * } func Benchmark_ProcessJava(b *testing.B) { - setupBenchmarks(pb.Sdk_SDK_JAVA) + setupSDK(pb.Sdk_SDK_JAVA) defer teardownBenchmarks() appEnv, err := environment.GetApplicationEnvsFromOsEnvs() @@ -670,7 +726,7 @@ func Benchmark_ProcessJava(b *testing.B) { } func Benchmark_ProcessPython(b *testing.B) { - setupBenchmarks(pb.Sdk_SDK_PYTHON) + setupSDK(pb.Sdk_SDK_PYTHON) defer teardownBenchmarks() appEnv, err := environment.GetApplicationEnvsFromOsEnvs() @@ -683,24 +739,24 @@ func Benchmark_ProcessPython(b *testing.B) { } ctx := context.Background() - code := "if __name__ == \"__main__\":\n print(\"Hello world!\")\n" - + wordCountCode := "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()" + pipelineOptions := "--output t.txt" b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() pipelineId := uuid.New() - lc := prepareFiles(b, pipelineId, code, pb.Sdk_SDK_PYTHON) + lc := prepareFiles(b, pipelineId, wordCountCode, pb.Sdk_SDK_PYTHON) if err = utils.SetToCache(ctx, cacheService, pipelineId, cache.Canceled, false); err != nil { b.Fatal("error during set cancel flag to cache") } b.StartTimer() - Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, "") + Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, pipelineOptions) } } func Benchmark_ProcessGo(b *testing.B) { - setupBenchmarks(pb.Sdk_SDK_GO) + setupSDK(pb.Sdk_SDK_GO) defer teardownBenchmarks() appEnv, err := environment.GetApplicationEnvsFromOsEnvs() @@ -778,13 +834,13 @@ func Benchmark_GetLastIndex(b *testing.B) { } func Test_validateStep(t *testing.T) { - appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + javaSdkEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA) if err != nil { panic(err) } - sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) - if err != nil { - panic(err) + incorrectSdkEnv := &environment.BeamEnvs{ + ApacheBeamSdk: pb.Sdk_SDK_UNSPECIFIED, + ExecutorConfig: nil, } type args struct { ctx context.Context @@ -807,13 +863,27 @@ func Test_validateStep(t *testing.T) { ctx: context.Background(), cacheService: cacheService, pipelineId: uuid.New(), - sdkEnv: sdkEnv, + sdkEnv: javaSdkEnv, pipelineLifeCycleCtx: context.Background(), validationResults: &sync.Map{}, cancelChannel: make(chan bool, 1), }, want: 3, - code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}", + code: helloWordJava, + }, + { + name: "Test validation step with incorrect sdkEnv", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + sdkEnv: incorrectSdkEnv, + pipelineLifeCycleCtx: context.Background(), + validationResults: &sync.Map{}, + cancelChannel: make(chan bool, 1), + }, + want: 0, + code: helloWordJava, }, } for _, tt := range tests { @@ -834,17 +904,18 @@ func Test_validateStep(t *testing.T) { } func Test_prepareStep(t *testing.T) { - appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + javaSdkEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA) if err != nil { panic(err) } - sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) - if err != nil { - panic(err) + incorrectSdkEnv := &environment.BeamEnvs{ + ApacheBeamSdk: pb.Sdk_SDK_UNSPECIFIED, + ExecutorConfig: nil, } validationResults := sync.Map{} validationResults.Store(validators.UnitTestValidatorName, false) validationResults.Store(validators.KatasValidatorName, false) + pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1) type args struct { ctx context.Context cacheService cache.Cache @@ -855,10 +926,10 @@ func Test_prepareStep(t *testing.T) { cancelChannel chan bool } tests := []struct { - name string - args args - want *executors.Executor - code string + name string + args args + code string + expectedStatus pb.Status }{ { name: "Test preparer step working without an error", @@ -866,12 +937,41 @@ func Test_prepareStep(t *testing.T) { ctx: context.Background(), cacheService: cacheService, pipelineId: uuid.New(), - sdkEnv: sdkEnv, + sdkEnv: javaSdkEnv, pipelineLifeCycleCtx: context.Background(), validationResults: &validationResults, cancelChannel: make(chan bool, 1), }, - code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}", + code: helloWordJava, + expectedStatus: pb.Status_STATUS_COMPILING, + }, + { + name: "Test preparer step working with incorrect sdkEnv", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + sdkEnv: incorrectSdkEnv, + pipelineLifeCycleCtx: context.Background(), + validationResults: &validationResults, + cancelChannel: make(chan bool, 1), + }, + code: "", + expectedStatus: pb.Status_STATUS_ERROR, + }, + { + name: "Error during expired context of the example", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + sdkEnv: javaSdkEnv, + pipelineLifeCycleCtx: pipelineLifeCycleCtx, + validationResults: &validationResults, + cancelChannel: make(chan bool, 1), + }, + code: "", + expectedStatus: pb.Status_STATUS_RUN_TIMEOUT, }, } for _, tt := range tests { @@ -882,22 +982,25 @@ func Test_prepareStep(t *testing.T) { t.Fatalf("error during prepare folders: %s", err.Error()) } _ = lc.CreateSourceCodeFile(tt.code) - if got := prepareStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel); got == nil { - t.Errorf("prepareStep(): got nil instead of preparer executor") + prepareStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.pipelineLifeCycleCtx, tt.args.validationResults, tt.args.cancelChannel) + status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status) + if status != tt.expectedStatus { + t.Errorf("prepareStep: got status = %v, want %v", status, tt.expectedStatus) } }) } } func Test_compileStep(t *testing.T) { - appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + sdkJavaEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA) if err != nil { panic(err) } - sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) + sdkPythonEnv, err := getSdkEnv(pb.Sdk_SDK_PYTHON) if err != nil { panic(err) } + pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1) type args struct { ctx context.Context cacheService cache.Cache @@ -908,51 +1011,84 @@ func Test_compileStep(t *testing.T) { cancelChannel chan bool } tests := []struct { - name string - args args - want *executors.Executor - code string + name string + args args + code string + expectedStatus pb.Status }{ { - name: "Test compilation step working without an error", + name: "Test compilation step finishes successfully on java sdk", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + sdkEnv: sdkJavaEnv, + isUnitTest: false, + pipelineLifeCycleCtx: context.Background(), + cancelChannel: make(chan bool, 1), + }, + code: helloWordJava, + expectedStatus: pb.Status_STATUS_EXECUTING, + }, + { + name: "Test compilation step finishes successfully on python sdk", args: args{ ctx: context.Background(), cacheService: cacheService, pipelineId: uuid.New(), - sdkEnv: sdkEnv, + sdkEnv: sdkPythonEnv, isUnitTest: false, pipelineLifeCycleCtx: context.Background(), cancelChannel: make(chan bool, 1), }, - want: nil, - code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}", + code: helloWordPython, + expectedStatus: pb.Status_STATUS_EXECUTING, + }, + { + name: "Error during expired context of the example", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + sdkEnv: sdkJavaEnv, + isUnitTest: false, + pipelineLifeCycleCtx: pipelineLifeCycleCtx, + cancelChannel: make(chan bool, 1), + }, + code: helloWordJava, + expectedStatus: pb.Status_STATUS_RUN_TIMEOUT, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_JAVA, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder)) + lc, _ := fs_tool.NewLifeCycle(tt.args.sdkEnv.ApacheBeamSdk, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder)) err := lc.CreateFolders() if err != nil { t.Fatalf("error during prepare folders: %s", err.Error()) } _ = lc.CreateSourceCodeFile(tt.code) - if got := compileStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel); got == nil { - t.Errorf("compileStep: got nil instead of compiler executor") + compileStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.sdkEnv, tt.args.isUnitTest, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel) + status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status) + if status != tt.expectedStatus { + t.Errorf("compileStep: got status = %v, want %v", status, tt.expectedStatus) } }) } } func Test_runStep(t *testing.T) { - appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + sdkJavaEnv, err := getSdkEnv(pb.Sdk_SDK_JAVA) if err != nil { panic(err) } - sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir()) + sdkPythonEnv, err := getSdkEnv(pb.Sdk_SDK_PYTHON) + if err != nil { + panic(err) + } + sdkGoEnv, err := getSdkEnv(pb.Sdk_SDK_GO) if err != nil { panic(err) } - sdkEnv.ApacheBeamSdk = pb.Sdk_SDK_PYTHON type args struct { ctx context.Context cacheService cache.Cache @@ -962,36 +1098,86 @@ func Test_runStep(t *testing.T) { pipelineOptions string pipelineLifeCycleCtx context.Context cancelChannel chan bool + createExecFile bool } tests := []struct { - name string - args args - code string + name string + args args + code string + expectedStatus pb.Status }{ { - name: "Test run step working without an error", + // Test case with calling runStep method on python sdk. + // cmd.Run return error during saving output. + // As a result, the pipeline status should be Status_STATUS_RUN_ERROR. + name: "Test run step working on python sdk", args: args{ ctx: context.Background(), cacheService: cacheService, - pipelineId: uuid.UUID{}, + pipelineId: uuid.New(), isUnitTest: false, - sdkEnv: sdkEnv, + sdkEnv: sdkPythonEnv, + pipelineOptions: "", + pipelineLifeCycleCtx: context.Background(), + cancelChannel: make(chan bool, 1), + createExecFile: true, + }, + code: helloWordPython, + expectedStatus: pb.Status_STATUS_RUN_ERROR, + }, + { + // Test case with calling runStep method on go sdk. + // cmd.Run return error due to missing executable file. + // As a result, the pipeline status should be Status_STATUS_RUN_ERROR. + name: "Test run step working on go sdk", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.New(), + isUnitTest: true, + sdkEnv: sdkGoEnv, + pipelineOptions: "", + pipelineLifeCycleCtx: context.Background(), + cancelChannel: make(chan bool, 1), + createExecFile: true, + }, + code: helloWordGo, + expectedStatus: pb.Status_STATUS_RUN_ERROR, + }, + { + // Test case with calling runStep method without preparing files with code. + // As a result, the pipeline status should be Status_STATUS_ERROR. + name: "Test run step without preparing files with code", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + pipelineId: uuid.UUID{}, + isUnitTest: true, + sdkEnv: sdkJavaEnv, pipelineOptions: "", pipelineLifeCycleCtx: context.Background(), cancelChannel: make(chan bool, 1), + createExecFile: false, }, - code: "if __name__ == \"__main__\":\n print(\"Hello world!\")\n", + code: helloWordJava, + expectedStatus: pb.Status_STATUS_ERROR, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_PYTHON, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder)) - err := lc.CreateFolders() - if err != nil { - t.Fatalf("error during prepare folders: %s", err.Error()) + lc, _ := fs_tool.NewLifeCycle(tt.args.sdkEnv.ApacheBeamSdk, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder)) + if tt.args.createExecFile { + err := lc.CreateFolders() + if err != nil { + t.Fatalf("error during prepare folders: %s", err.Error()) + } + _ = lc.CreateSourceCodeFile(tt.code) } - _ = lc.CreateSourceCodeFile(tt.code) runStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel) + status, _ := cacheService.GetValue(tt.args.ctx, tt.args.pipelineId, cache.Status) + if status != tt.expectedStatus { + t.Errorf("runStep() got status = %v, want %v", status, tt.expectedStatus) + } }) } } @@ -1004,3 +1190,354 @@ func syncMapLen(syncMap *sync.Map) int { }) return length } + +func TestGetGraph(t *testing.T) { + ctx := context.Background() + pipelineId1 := uuid.New() + graph := "GRAPH" + err := cacheService.SetValue(ctx, pipelineId1, cache.Graph, graph) + if err != nil { + return + } + pipelineId2 := uuid.New() + err = cacheService.SetValue(ctx, pipelineId2, cache.Graph, 1) + if err != nil { + return + } + type args struct { + ctx context.Context + cacheService cache.Cache + key uuid.UUID + errorTitle string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "Get graph when key exist in cache", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + key: pipelineId1, + errorTitle: "error", + }, + want: graph, + wantErr: false, + }, + { + name: "Get graph when key doesn't exist in cache", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + key: uuid.New(), + errorTitle: "error", + }, + want: "", + wantErr: true, + }, + { + name: "Get graph when value from cache by key couldn't be converted to a string", + args: args{ + ctx: context.Background(), + cacheService: cacheService, + key: pipelineId2, + errorTitle: "error", + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetGraph(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.errorTitle) + if (err != nil) != tt.wantErr { + t.Errorf("GetGraph error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetGraph got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_processSetupError(t *testing.T) { + client, mock := redismock.NewClientMock() + pipelineId := uuid.New() + errorMessage := "MOCK_ERROR" + type args struct { + err error + pipelineId uuid.UUID + cacheService cache.Cache + ctxWithTimeout context.Context + } + tests := []struct { + name string + mocks func() + args args + wantErr bool + }{ + { + name: "Error during HSet operation", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), "MOCK_VALUE").SetErr(fmt.Errorf(errorMessage)) + }, + args: args{ + err: fmt.Errorf(errorMessage), + pipelineId: pipelineId, + cacheService: &redis.Cache{ + Client: client, + }, + ctxWithTimeout: nil, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mocks() + if err := processSetupError(tt.args.err, tt.args.pipelineId, tt.args.cacheService, tt.args.ctxWithTimeout); (err != nil) != tt.wantErr { + t.Errorf("processSetupError() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_processErrorWithSavingOutput(t *testing.T) { + client, mock := redismock.NewClientMock() + pipelineId := uuid.New() + errorMessage := "MOCK_ERROR" + subKey := cache.RunOutput + type args struct { + ctx context.Context + err error + errorOutput []byte + pipelineId uuid.UUID + subKey cache.SubKey + cacheService cache.Cache + errorTitle string + newStatus pb.Status + } + tests := []struct { + name string + mocks func() + args args + wantErr bool + }{ + { + name: "Error during HSet operation", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), subKey).SetErr(fmt.Errorf(errorMessage)) + }, + args: args{ + ctx: context.Background(), + err: fmt.Errorf(errorMessage), + errorOutput: nil, + pipelineId: pipelineId, + subKey: subKey, + cacheService: &redis.Cache{Client: client}, + errorTitle: "", + newStatus: 0, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mocks() + if err := processErrorWithSavingOutput(tt.args.ctx, tt.args.err, tt.args.errorOutput, tt.args.pipelineId, tt.args.subKey, tt.args.cacheService, tt.args.errorTitle, tt.args.newStatus); (err != nil) != tt.wantErr { + t.Errorf("processErrorWithSavingOutput() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_processRunError(t *testing.T) { + client, mock := redismock.NewClientMock() + pipelineId := uuid.New() + errorMessage := "MOCK_ERROR" + subKey := cache.RunError + errorChannel := make(chan error, 1) + errorChannel <- fmt.Errorf(errorMessage) + type args struct { + ctx context.Context + errorChannel chan error + errorOutput []byte + pipelineId uuid.UUID + cacheService cache.Cache + stopReadLogsChannel chan bool + finishReadLogsChannel chan bool + } + tests := []struct { + name string + mocks func() + args args + wantErr bool + }{ + { + name: "Error during HSet operation", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), subKey).SetErr(fmt.Errorf(errorMessage)) + }, + args: args{ + ctx: context.Background(), + errorChannel: errorChannel, + errorOutput: nil, + pipelineId: pipelineId, + cacheService: &redis.Cache{Client: client}, + stopReadLogsChannel: nil, + finishReadLogsChannel: nil, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mocks() + if err := processRunError(tt.args.ctx, tt.args.errorChannel, tt.args.errorOutput, tt.args.pipelineId, tt.args.cacheService, tt.args.stopReadLogsChannel, tt.args.finishReadLogsChannel); (err != nil) != tt.wantErr { + t.Errorf("processRunError() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_processCompileSuccess(t *testing.T) { + client, mock := redismock.NewClientMock() + pipelineId := uuid.New() + output := "output" + cacheMock := &redis.Cache{Client: client} + marshalLogs, _ := json.Marshal(cache.Logs) + marshalCompileOutput, _ := json.Marshal(cache.CompileOutput) + marshalRunOutput, _ := json.Marshal(cache.RunOutput) + marshalRunError, _ := json.Marshal(cache.RunError) + outputMarshal, _ := json.Marshal(output) + marshalEmptyString, _ := json.Marshal("") + type args struct { + ctx context.Context + output []byte + pipelineId uuid.UUID + cacheService cache.Cache + } + tests := []struct { + name string + mocks func() + args args + wantErr bool + }{ + { + name: "Error during set value to CompileOutput subKey", + mocks: func() { + }, + args: args{ + ctx: context.Background(), + output: []byte(output), + pipelineId: pipelineId, + cacheService: cacheMock, + }, + wantErr: true, + }, + { + name: "Error during set value to RunOutput subKey", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1) + }, + args: args{ + ctx: context.Background(), + output: []byte(output), + pipelineId: pipelineId, + cacheService: cacheMock, + }, + wantErr: true, + }, + { + name: "Error during set value to RunError subKey", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1) + }, + args: args{ + ctx: context.Background(), + output: []byte(output), + pipelineId: pipelineId, + cacheService: cacheMock, + }, + wantErr: true, + }, + { + name: "Error during set value to Logs subKey", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalRunError, marshalEmptyString).SetVal(1) + }, + args: args{ + ctx: context.Background(), + output: []byte(output), + pipelineId: pipelineId, + cacheService: cacheMock, + }, + wantErr: true, + }, + { + name: "Error during set value to Graph subKey", + mocks: func() { + mock.ExpectHSet(pipelineId.String(), marshalCompileOutput, outputMarshal).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalRunOutput, marshalEmptyString).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalRunError, marshalEmptyString).SetVal(1) + mock.ExpectHSet(pipelineId.String(), marshalLogs, marshalEmptyString).SetVal(1) + }, + args: args{ + ctx: context.Background(), + output: []byte(output), + pipelineId: pipelineId, + cacheService: cacheMock, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.mocks() + if err := processCompileSuccess(tt.args.ctx, tt.args.output, tt.args.pipelineId, tt.args.cacheService); (err != nil) != tt.wantErr { + t.Errorf("processCompileSuccess() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_readGraphFile(t *testing.T) { + pipelineLifeCycleCtx, _ := context.WithTimeout(context.Background(), 1*time.Second) + type args struct { + pipelineLifeCycleCtx context.Context + backgroundCtx context.Context + cacheService cache.Cache + graphFilePath string + pipelineId uuid.UUID + } + tests := []struct { + name string + args args + }{ + { + name: "Successfully saving the prepared graph to the cache", + args: args{ + pipelineLifeCycleCtx: pipelineLifeCycleCtx, + backgroundCtx: context.Background(), + cacheService: cacheService, + graphFilePath: graphFilePath, + pipelineId: uuid.New(), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + readGraphFile(tt.args.pipelineLifeCycleCtx, tt.args.backgroundCtx, tt.args.cacheService, tt.args.graphFilePath, tt.args.pipelineId) + if v, _ := cacheService.GetValue(tt.args.backgroundCtx, tt.args.pipelineId, cache.Graph); v == nil { + t.Errorf("readGraphFile() error: the graph was not cached") + } + }) + } +} diff --git a/playground/backend/internal/setup_tools/builder/setup_builder.go b/playground/backend/internal/setup_tools/builder/setup_builder.go index c4d05b1bc410d..9d4ecaa8dc928 100644 --- a/playground/backend/internal/setup_tools/builder/setup_builder.go +++ b/playground/backend/internal/setup_tools/builder/setup_builder.go @@ -58,7 +58,7 @@ func Preparer(paths *fs_tool.LifeCyclePaths, sdkEnv *environment.BeamEnvs, valRe WithPreparer(). WithSdkPreparers(prep). ExecutorBuilder - return &builder, err + return &builder, nil } // Compiler return executor with set args for compiler