Skip to content

Commit

Permalink
Remove blobReader
Browse files Browse the repository at this point in the history
It was an unnecessary struct.
  • Loading branch information
maguro committed Nov 22, 2024
1 parent 4278b22 commit c493ad1
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
18 changes: 5 additions & 13 deletions decoder_reader.go → blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,19 @@ import (
"m4o.io/pbf/protobuf"
)

type blobReader struct {
r io.Reader
}

func newBlobReader(r io.Reader) blobReader {
return blobReader{r: r}
}

// readBlobHeader unmarshals a header from an array of protobuf encoded bytes.
// The header is used when decoding blobs into OSM elements.
func (b blobReader) readBlobHeader(buffer *bytes.Buffer) (header *protobuf.BlobHeader, err error) {
func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) {
var size uint32

err = binary.Read(b.r, binary.BigEndian, &size)
err = binary.Read(rdr, binary.BigEndian, &size)
if err != nil {
return nil, err
}

buffer.Reset()

if _, err := io.CopyN(buffer, b.r, int64(size)); err != nil {
if _, err := io.CopyN(buffer, rdr, int64(size)); err != nil {
return nil, err
}

Expand All @@ -58,12 +50,12 @@ func (b blobReader) readBlobHeader(buffer *bytes.Buffer) (header *protobuf.BlobH

// readBlob unmarshals a blob from an array of protobuf encoded bytes. The
// blob still needs to be decoded into OSM elements using decode().
func (b blobReader) readBlob(buffer *bytes.Buffer, header *protobuf.BlobHeader) (*protobuf.Blob, error) {
func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) {
size := header.GetDatasize()

buffer.Reset()

if _, err := io.CopyN(buffer, b.r, int64(size)); err != nil {
if _, err := io.CopyN(buffer, rdr, int64(size)); err != nil {
return nil, err
}

Expand Down
13 changes: 6 additions & 7 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,14 @@ func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (*

ctx, d.cancel = context.WithCancel(ctx)

r := newBlobReader(reader)
buf := bytes.NewBuffer(make([]byte, 0, c.protoBufferSize))

h, err := r.readBlobHeader(buf)
h, err := readBlobHeader(buf, reader)
if err != nil {
return nil, err
}

b, err := r.readBlob(buf, h)
b, err := readBlob(buf, reader, h)
if err != nil {
return nil, err
}
Expand All @@ -171,7 +170,7 @@ func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (*

// create decoding pipelines
var outputs []chan decoded
for _, input := range read(ctx, r, c) {
for _, input := range read(ctx, reader, c) {
outputs = append(outputs, decode(input, c))
}

Expand Down Expand Up @@ -200,7 +199,7 @@ func (d *Decoder) Close() {

// read obtains OSM blobs and sends them down, in a round-robin manner, a list
// of channels to be decoded.
func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan encoded) {
func read(ctx context.Context, rdr io.Reader, cfg decoderOptions) (inputs []chan encoded) {
n := cfg.nCPU
for i := uint16(0); i < n; i++ {
inputs = append(inputs, make(chan encoded, cfg.inputChannelLength))
Expand All @@ -221,7 +220,7 @@ func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan
input := inputs[i]
i = (i + 1) % n

h, err := b.readBlobHeader(buffer)
h, err := readBlobHeader(buffer, rdr)
if err == io.EOF {
return
} else if err != nil {
Expand All @@ -230,7 +229,7 @@ func read(ctx context.Context, b blobReader, cfg decoderOptions) (inputs []chan
return
}

b, err := b.readBlob(buffer, h)
b, err := readBlob(buffer, rdr, h)
if err != nil {
input <- encoded{err: err}

Expand Down

0 comments on commit c493ad1

Please sign in to comment.