Skip to content

Commit

Permalink
apacheGH-43443: [Go] [IPC] Infer schema from first record if not spec…
Browse files Browse the repository at this point in the history
…ified (apache#43484)

### Rationale for this change

Fixes: apache#43443 

Makes usage of the IPC writer and any writers that use it such the flight writer simpler.

### What changes are included in this PR?

- Infer schema from first record if schema is not specified
- IPC and Flight tests

### Are these changes tested?

Yes

### Are there any user-facing changes?

Any `ipc.Writer` that does not specify the optional argument `ipc.WithSchema` will no longer return an error as long as the incoming stream of records has a consistent schema.

* GitHub Issue: apache#43443

Authored-by: Joel Lubinitsky <joellubi@gmail.com>
Signed-off-by: Joel Lubinitsky <joellubi@gmail.com>
  • Loading branch information
joellubi authored Jul 31, 2024
1 parent c6be2df commit e9f6667
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
35 changes: 35 additions & 0 deletions go/arrow/flight/flight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"io"
"testing"

"github.com/apache/arrow/go/v18/arrow"
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/flight"
"github.com/apache/arrow/go/v18/arrow/internal/arrdata"
"github.com/apache/arrow/go/v18/arrow/ipc"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -449,3 +451,36 @@ func TestReaderError(t *testing.T) {
t.Fatal("should have errored")
}
}

func TestWriterInferSchema(t *testing.T) {
recs, ok := arrdata.Records["primitives"]
require.True(t, ok)

fs := flightStreamWriter{}
w := flight.NewRecordWriter(&fs)

for _, rec := range recs {
require.NoError(t, w.Write(rec))
}

require.NoError(t, w.Close())
}

func TestWriterInconsistentSchema(t *testing.T) {
recs, ok := arrdata.Records["primitives"]
require.True(t, ok)

schema := arrow.NewSchema([]arrow.Field{{Name: "unknown", Type: arrow.PrimitiveTypes.Int8}}, nil)
fs := flightStreamWriter{}
w := flight.NewRecordWriter(&fs, ipc.WithSchema(schema))

require.ErrorContains(t, w.Write(recs[0]), "arrow/ipc: tried to write record batch with different schema")
require.NoError(t, w.Close())
}

type flightStreamWriter struct{}

// Send implements flight.DataStreamWriter.
func (f *flightStreamWriter) Send(data *flight.FlightData) error { return nil }

var _ flight.DataStreamWriter = (*flightStreamWriter)(nil)
8 changes: 6 additions & 2 deletions go/arrow/ipc/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,19 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
}
}()

incomingSchema := rec.Schema()

if !w.started {
if w.schema == nil {
w.schema = incomingSchema
}
err := w.start()
if err != nil {
return err
}
}

schema := rec.Schema()
if schema == nil || !schema.Equal(w.schema) {
if incomingSchema == nil || !incomingSchema.Equal(w.schema) {
return errInconsistentSchema
}

Expand Down
19 changes: 19 additions & 0 deletions go/arrow/ipc/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,22 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
}
}
}

func TestWriterInferSchema(t *testing.T) {
bldr := array.NewRecordBuilder(memory.DefaultAllocator, arrow.NewSchema([]arrow.Field{{Name: "col", Type: arrow.PrimitiveTypes.Int8}}, nil))
bldr.Field(0).(*array.Int8Builder).AppendValues([]int8{1, 2, 3, 4, 5}, nil)
rec := bldr.NewRecord()
defer rec.Release()

var buf bytes.Buffer
w := NewWriter(&buf)

require.NoError(t, w.Write(rec))
require.NoError(t, w.Close())

r, err := NewReader(&buf)
require.NoError(t, err)
defer r.Release()

require.True(t, r.Schema().Equal(rec.Schema()))
}

0 comments on commit e9f6667

Please sign in to comment.