Skip to content

Commit

Permalink
[BEAM-13907] Improve coverage of textio package (#16937)
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm authored Mar 1, 2022
1 parent 0631a5e commit 6b06cbb
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 5 deletions.
14 changes: 12 additions & 2 deletions sdks/go/pkg/beam/io/textio/sdf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,19 @@ import (
// outputs the correct number of lines for it, even for an exceedingly long
// line.
func TestReadSdf(t *testing.T) {
f := "../../../../data/textio_test.txt"
p, s := beam.NewPipelineWithRoot()
lines := ReadSdf(s, f)
lines := ReadSdf(s, testFilePath)
passert.Count(s, lines, "NumLines", 1)

if _, err := beam.Run(context.Background(), "direct", p); err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
}

func TestReadAllSdf(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
files := beam.Create(s, testFilePath)
lines := ReadAllSdf(s, files)
passert.Count(s, lines, "NumLines", 1)

if _, err := beam.Run(context.Background(), "direct", p); err != nil {
Expand Down
71 changes: 68 additions & 3 deletions sdks/go/pkg/beam/io/textio/textio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@
package textio

import (
"errors"
"os"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestRead(t *testing.T) {
f := "../../../../data/textio_test.txt"
const testFilePath = "../../../../data/textio_test.txt"

func TestReadFn(t *testing.T) {
receivedLines := []string{}
getLines := func(line string) {
receivedLines = append(receivedLines, line)
}

err := readFn(nil, f, getLines)
err := readFn(nil, testFilePath, getLines)
if err != nil {
t.Fatalf("failed with %v", err)
}
Expand All @@ -40,3 +45,63 @@ func TestRead(t *testing.T) {
}

}

func TestRead(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
lines := Read(s, testFilePath)
passert.Count(s, lines, "NumLines", 1)

ptest.RunAndValidate(t, p)
}

func TestReadAll(t *testing.T) {
p, s, files := ptest.CreateList([]string{testFilePath})
lines := ReadAll(s, files)
passert.Count(s, lines, "NumLines", 1)

ptest.RunAndValidate(t, p)
}

func TestWrite(t *testing.T) {
out := "text.txt"
p, s := beam.NewPipelineWithRoot()
lines := Read(s, testFilePath)
Write(s, out, lines)

ptest.RunAndValidate(t, p)

if _, err := os.Stat(out); errors.Is(err, os.ErrNotExist) {
t.Fatalf("Failed to write %v", out)
}
t.Cleanup(func() {
os.Remove(out)
})

outfileContents, _ := os.ReadFile(out)
infileContents, _ := os.ReadFile(testFilePath)
if got, want := string(outfileContents), string(infileContents); got != want {
t.Fatalf("Write() wrote the wrong contents. Got: %v Want: %v", got, want)
}
}

func TestImmediate(t *testing.T) {
f, err := os.CreateTemp("", "test2.txt")
if err != nil {
t.Fatalf("Failed to create temp file, err: %v", err)
}
t.Cleanup(func() {
os.Remove(f.Name())
})
if err := os.WriteFile(f.Name(), []byte("hello\ngo\n"), 0644); err != nil {
t.Fatalf("Failed to write file %v, err: %v", f, err)
}

p, s := beam.NewPipelineWithRoot()
lines, err := Immediate(s, f.Name())
if err != nil {
t.Fatalf("Failed to insert Immediate: %v", err)
}
passert.Count(s, lines, "NumLines", 2)

ptest.RunAndValidate(t, p)
}

0 comments on commit 6b06cbb

Please sign in to comment.