From 6b06cbb928706bae7694ea5388ea8d3f97f92cf6 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 28 Feb 2022 21:31:17 -0500 Subject: [PATCH] [BEAM-13907] Improve coverage of textio package (#16937) --- sdks/go/pkg/beam/io/textio/sdf_test.go | 14 ++++- sdks/go/pkg/beam/io/textio/textio_test.go | 71 ++++++++++++++++++++++- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go index 881b9ab519992..7b1b416637617 100644 --- a/sdks/go/pkg/beam/io/textio/sdf_test.go +++ b/sdks/go/pkg/beam/io/textio/sdf_test.go @@ -28,9 +28,19 @@ import ( // outputs the correct number of lines for it, even for an exceedingly long // line. func TestReadSdf(t *testing.T) { - f := "../../../../data/textio_test.txt" p, s := beam.NewPipelineWithRoot() - lines := ReadSdf(s, f) + lines := ReadSdf(s, testFilePath) + passert.Count(s, lines, "NumLines", 1) + + if _, err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} + +func TestReadAllSdf(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + files := beam.Create(s, testFilePath) + lines := ReadAllSdf(s, files) passert.Count(s, lines, "NumLines", 1) if _, err := beam.Run(context.Background(), "direct", p); err != nil { diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go index 4090535226c77..0cd1699e657ea 100644 --- a/sdks/go/pkg/beam/io/textio/textio_test.go +++ b/sdks/go/pkg/beam/io/textio/textio_test.go @@ -17,20 +17,25 @@ package textio import ( + "errors" + "os" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) -func TestRead(t *testing.T) { - f := "../../../../data/textio_test.txt" +const testFilePath = "../../../../data/textio_test.txt" +func TestReadFn(t *testing.T) { receivedLines := []string{} getLines := func(line string) { receivedLines = append(receivedLines, line) } - err := readFn(nil, f, getLines) + err := readFn(nil, testFilePath, getLines) if err != nil { t.Fatalf("failed with %v", err) } @@ -40,3 +45,63 @@ func TestRead(t *testing.T) { } } + +func TestRead(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + lines := Read(s, testFilePath) + passert.Count(s, lines, "NumLines", 1) + + ptest.RunAndValidate(t, p) +} + +func TestReadAll(t *testing.T) { + p, s, files := ptest.CreateList([]string{testFilePath}) + lines := ReadAll(s, files) + passert.Count(s, lines, "NumLines", 1) + + ptest.RunAndValidate(t, p) +} + +func TestWrite(t *testing.T) { + out := "text.txt" + p, s := beam.NewPipelineWithRoot() + lines := Read(s, testFilePath) + Write(s, out, lines) + + ptest.RunAndValidate(t, p) + + if _, err := os.Stat(out); errors.Is(err, os.ErrNotExist) { + t.Fatalf("Failed to write %v", out) + } + t.Cleanup(func() { + os.Remove(out) + }) + + outfileContents, _ := os.ReadFile(out) + infileContents, _ := os.ReadFile(testFilePath) + if got, want := string(outfileContents), string(infileContents); got != want { + t.Fatalf("Write() wrote the wrong contents. Got: %v Want: %v", got, want) + } +} + +func TestImmediate(t *testing.T) { + f, err := os.CreateTemp("", "test2.txt") + if err != nil { + t.Fatalf("Failed to create temp file, err: %v", err) + } + t.Cleanup(func() { + os.Remove(f.Name()) + }) + if err := os.WriteFile(f.Name(), []byte("hello\ngo\n"), 0644); err != nil { + t.Fatalf("Failed to write file %v, err: %v", f, err) + } + + p, s := beam.NewPipelineWithRoot() + lines, err := Immediate(s, f.Name()) + if err != nil { + t.Fatalf("Failed to insert Immediate: %v", err) + } + passert.Count(s, lines, "NumLines", 2) + + ptest.RunAndValidate(t, p) +}