From c493ad17e8200b6408d0a5b3e74d108a6ef257c3 Mon Sep 17 00:00:00 2001 From: "Alan D. Cabrera" Date: Sun, 27 Oct 2024 16:48:51 -0700 Subject: [PATCH] Remove blobReader It was an unnecessary struct. --- decoder_reader.go => blob.go | 18 +++++------------- decoder.go | 13 ++++++------- 2 files changed, 11 insertions(+), 20 deletions(-) rename decoder_reader.go => blob.go (78%) diff --git a/decoder_reader.go b/blob.go similarity index 78% rename from decoder_reader.go rename to blob.go index 414426e..05faf04 100644 --- a/decoder_reader.go +++ b/blob.go @@ -23,27 +23,19 @@ import ( "m4o.io/pbf/protobuf" ) -type blobReader struct { - r io.Reader -} - -func newBlobReader(r io.Reader) blobReader { - return blobReader{r: r} -} - // readBlobHeader unmarshals a header from an array of protobuf encoded bytes. // The header is used when decoding blobs into OSM elements. -func (b blobReader) readBlobHeader(buffer *bytes.Buffer) (header *protobuf.BlobHeader, err error) { +func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) { var size uint32 - err = binary.Read(b.r, binary.BigEndian, &size) + err = binary.Read(rdr, binary.BigEndian, &size) if err != nil { return nil, err } buffer.Reset() - if _, err := io.CopyN(buffer, b.r, int64(size)); err != nil { + if _, err := io.CopyN(buffer, rdr, int64(size)); err != nil { return nil, err } @@ -58,12 +50,12 @@ func (b blobReader) readBlobHeader(buffer *bytes.Buffer) (header *protobuf.BlobH // readBlob unmarshals a blob from an array of protobuf encoded bytes. The // blob still needs to be decoded into OSM elements using decode(). -func (b blobReader) readBlob(buffer *bytes.Buffer, header *protobuf.BlobHeader) (*protobuf.Blob, error) { +func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) { size := header.GetDatasize() buffer.Reset() - if _, err := io.CopyN(buffer, b.r, int64(size)); err != nil { + if _, err := io.CopyN(buffer, rdr, int64(size)); err != nil { return nil, err } diff --git a/decoder.go b/decoder.go index 2f3201b..7be0d4a 100644 --- a/decoder.go +++ b/decoder.go @@ -143,15 +143,14 @@ func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (* ctx, d.cancel = context.WithCancel(ctx) - r := newBlobReader(reader) buf := bytes.NewBuffer(make([]byte, 0, c.protoBufferSize)) - h, err := r.readBlobHeader(buf) + h, err := readBlobHeader(buf, reader) if err != nil { return nil, err } - b, err := r.readBlob(buf, h) + b, err := readBlob(buf, reader, h) if err != nil { return nil, err } @@ -171,7 +170,7 @@ func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (* // create decoding pipelines var outputs []chan decoded - for _, input := range read(ctx, r, c) { + for _, input := range read(ctx, reader, c) { outputs = append(outputs, decode(input, c)) } @@ -200,7 +199,7 @@ func (d *Decoder) Close() { // read obtains OSM blobs and sends them down, in a round-robin manner, a list // of channels to be decoded. -func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan encoded) { +func read(ctx context.Context, rdr io.Reader, cfg decoderOptions) (inputs []chan encoded) { n := cfg.nCPU for i := uint16(0); i < n; i++ { inputs = append(inputs, make(chan encoded, cfg.inputChannelLength)) @@ -221,7 +220,7 @@ func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan input := inputs[i] i = (i + 1) % n - h, err := b.readBlobHeader(buffer) + h, err := readBlobHeader(buffer, rdr) if err == io.EOF { return } else if err != nil { @@ -230,7 +229,7 @@ func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan return } - b, err := b.readBlob(buffer, h) + b, err := readBlob(buffer, rdr, h) if err != nil { input <- encoded{err: err}