Skip to content

Commit

Permalink
Directly read file
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Mar 2, 2022
1 parent 1b952fb commit e92c838
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions sdks/go/pkg/beam/io/avroio/avroio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package avroio

import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"os"
"reflect"
"testing"
Expand All @@ -26,6 +28,8 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"

"github.com/linkedin/goavro"
)

type Tweet struct {
Expand Down Expand Up @@ -61,7 +65,7 @@ func TestRead(t *testing.T) {
ptest.RunAndValidate(t, p)
}

type User struct {
type TwitterUser struct {
User string `json:"username"`
Info string `json:"info"`
}
Expand All @@ -79,11 +83,12 @@ const userSchema = `{
func TestWrite(t *testing.T) {
avroFile := "./user.avro"
testUsername := "user1"
testInfo := "userInfo"
p, s, sequence := ptest.CreateList([]string{testUsername})
format := beam.ParDo(s, func(username string, emit func(string)) {
newUser := User{
newUser := TwitterUser{
User: username,
Info: "Human",
Info: testInfo,
}

b, _ := json.Marshal(newUser)
Expand All @@ -100,14 +105,29 @@ func TestWrite(t *testing.T) {
t.Fatalf("Failed to write file %v", avroFile)
}

p = beam.NewPipeline()
s = p.Root()
users := Read(s, avroFile, reflect.TypeOf(User{}))
passert.Count(s, users, "NumUsers", 1)
passert.Equals(s, users, User{
User: testUsername,
Info: "Human",
})

ptest.RunAndValidate(t, p)
avroBytes, err := ioutil.ReadFile(avroFile)
if err != nil {
t.Fatalf("Failed to read avro file: %v", err)
}
ocf, err := goavro.NewOCFReader(bytes.NewReader(avroBytes))
var nativeData []interface{}
for ocf.Scan() {
datum, err := ocf.Read()
if err != nil {
break // Read error sets OCFReader error
}
nativeData = append(nativeData, datum)
}
if err := ocf.Err(); err != nil {
t.Fatalf("Error decoding avro data: %v", err)
}
if got, want := len(nativeData), 1; got != want {
t.Fatalf("Avro data, got %v records, want %v", got, want)
}
if got, want := nativeData[0].(map[string]interface{})["username"], testUsername; got != want {
t.Fatalf("User.User=%v, want %v", got, want)
}
if got, want := nativeData[0].(map[string]interface{})["info"], testInfo; got != want {
t.Fatalf("User.User=%v, want %v", got, want)
}
}

0 comments on commit e92c838

Please sign in to comment.