diff --git a/blob.go b/blob.go index 1668a6a..84b704a 100644 --- a/blob.go +++ b/blob.go @@ -28,6 +28,7 @@ import ( "github.com/destel/rill" "google.golang.org/protobuf/proto" + "m4o.io/pbf/internal/core" "m4o.io/pbf/protobuf" ) @@ -38,7 +39,8 @@ type blob struct { func generate(ctx context.Context, reader io.Reader) func(yield func(enc blob, err error) bool) { return func(yield func(enc blob, err error) bool) { - buffer := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) + buffer := core.NewPooledBuffer() + defer buffer.Close() for { select { @@ -81,10 +83,8 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) { go func() { defer close(ch) - buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) - for _, enc := range array { - elements, err := extract(enc.header, enc.blob, buf) + elements, err := extract(enc.header, enc.blob) if err != nil { slog.Error(err.Error()) ch <- rill.Try[[]Object]{Error: err} @@ -93,8 +93,6 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) { } ch <- rill.Try[[]Object]{Value: elements} - - buf.Reset() } }() @@ -103,7 +101,7 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) { // readBlobHeader unmarshals a header from an array of protobuf encoded bytes. // The header is used when decoding blobs into OSM elements. -func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) { +func readBlobHeader(buffer *core.PooledBuffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) { var size uint32 err = binary.Read(rdr, binary.BigEndian, &size) @@ -128,7 +126,7 @@ func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobH // readBlob unmarshals a blob from an array of protobuf encoded bytes. The // blob still needs to be decoded into OSM elements using decode(). -func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) { +func readBlob(buffer *core.PooledBuffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) { size := header.GetDatasize() buffer.Reset() @@ -149,7 +147,7 @@ func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) // elements unmarshals an array of OSM elements from an array of protobuf encoded // bytes. The bytes could possibly be compressed; zlibBuf is used to facilitate // decompression. -func extract(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Buffer) ([]Object, error) { +func extract(header *protobuf.BlobHeader, blob *protobuf.Blob) ([]Object, error) { var buf []byte switch { @@ -157,6 +155,9 @@ func extract(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Bu buf = blob.GetRaw() case blob.ZlibData != nil: + zlibBuf := core.NewPooledBuffer() + defer zlibBuf.Close() + r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData())) if err != nil { return nil, err diff --git a/decoder.go b/decoder.go index 9ccee84..3aa05d2 100644 --- a/decoder.go +++ b/decoder.go @@ -15,19 +15,19 @@ package pbf import ( - "bytes" "context" "fmt" "io" "reflect" "runtime" + "time" "github.com/destel/rill" + + "m4o.io/pbf/internal/core" ) const ( - bufferSize = 1024 - // DefaultBufferSize is the default buffer size for protobuf un-marshaling. DefaultBufferSize = 1024 * 1024 @@ -106,7 +106,7 @@ func NewDecoder(ctx context.Context, rdr io.Reader, opts ...DecoderOption) (*Dec blobs := rill.FromSeq2(generate(ctx, rdr)) - batches := rill.Batch(blobs, cfg.protoBatchSize, -1) + batches := rill.Batch(blobs, cfg.protoBatchSize, time.Second) objects := rill.FlatMap(batches, int(cfg.nCPU), decode) @@ -136,7 +136,8 @@ func (d *Decoder) Close() { } func (d *Decoder) loadHeader(reader io.Reader) error { - buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) + buf := core.NewPooledBuffer() + defer buf.Close() h, err := readBlobHeader(buf, reader) if err != nil { @@ -148,7 +149,7 @@ func (d *Decoder) loadHeader(reader io.Reader) error { return err } - e, err := extract(h, b, bytes.NewBuffer(make([]byte, 0, bufferSize))) + e, err := extract(h, b) if err != nil { return err } diff --git a/internal/core/buffer.go b/internal/core/buffer.go new file mode 100644 index 0000000..9c611a0 --- /dev/null +++ b/internal/core/buffer.go @@ -0,0 +1,40 @@ +// Copyright 2017-24 the original author or authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "bytes" + "sync" +) + +var bufferPool = sync.Pool{ + New: func() any { + return bytes.NewBuffer(make([]byte, 0, 1024)) + }, +} + +type PooledBuffer struct { + *bytes.Buffer +} + +func NewPooledBuffer() *PooledBuffer { + return &PooledBuffer{Buffer: bufferPool.Get().(*bytes.Buffer)} +} + +func (b *PooledBuffer) Close() error { + b.Reset() + bufferPool.Put(b.Buffer) + return nil +}