Skip to content

Commit

Permalink
Add pooled memory buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
maguro committed Nov 22, 2024
1 parent 4dc14ae commit ddc4381
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
19 changes: 10 additions & 9 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/destel/rill"
"google.golang.org/protobuf/proto"

"m4o.io/pbf/internal/core"
"m4o.io/pbf/protobuf"
)

Expand All @@ -38,7 +39,8 @@ type blob struct {

func generate(ctx context.Context, reader io.Reader) func(yield func(enc blob, err error) bool) {
return func(yield func(enc blob, err error) bool) {
buffer := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize))
buffer := core.NewPooledBuffer()
defer buffer.Close()

for {
select {
Expand Down Expand Up @@ -81,10 +83,8 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) {
go func() {
defer close(ch)

buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize))

for _, enc := range array {
elements, err := extract(enc.header, enc.blob, buf)
elements, err := extract(enc.header, enc.blob)
if err != nil {
slog.Error(err.Error())
ch <- rill.Try[[]Object]{Error: err}
Expand All @@ -93,8 +93,6 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) {
}

ch <- rill.Try[[]Object]{Value: elements}

buf.Reset()
}
}()

Expand All @@ -103,7 +101,7 @@ func decode(array []blob) (out <-chan rill.Try[[]Object]) {

// readBlobHeader unmarshals a header from an array of protobuf encoded bytes.
// The header is used when decoding blobs into OSM elements.
func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) {
func readBlobHeader(buffer *core.PooledBuffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) {
var size uint32

err = binary.Read(rdr, binary.BigEndian, &size)
Expand All @@ -128,7 +126,7 @@ func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobH

// readBlob unmarshals a blob from an array of protobuf encoded bytes. The
// blob still needs to be decoded into OSM elements using decode().
func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) {
func readBlob(buffer *core.PooledBuffer, rdr io.Reader, header *protobuf.BlobHeader) (*protobuf.Blob, error) {
size := header.GetDatasize()

buffer.Reset()
Expand All @@ -149,14 +147,17 @@ func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader)
// elements unmarshals an array of OSM elements from an array of protobuf encoded
// bytes. The bytes could possibly be compressed; zlibBuf is used to facilitate
// decompression.
func extract(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Buffer) ([]Object, error) {
func extract(header *protobuf.BlobHeader, blob *protobuf.Blob) ([]Object, error) {
var buf []byte

switch {
case blob.Raw != nil:
buf = blob.GetRaw()

case blob.ZlibData != nil:
zlibBuf := core.NewPooledBuffer()
defer zlibBuf.Close()

r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData()))
if err != nil {
return nil, err
Expand Down
13 changes: 7 additions & 6 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
package pbf

import (
"bytes"
"context"
"fmt"
"io"
"reflect"
"runtime"
"time"

"github.com/destel/rill"

"m4o.io/pbf/internal/core"
)

const (
bufferSize = 1024

// DefaultBufferSize is the default buffer size for protobuf un-marshaling.
DefaultBufferSize = 1024 * 1024

Expand Down Expand Up @@ -106,7 +106,7 @@ func NewDecoder(ctx context.Context, rdr io.Reader, opts ...DecoderOption) (*Dec

blobs := rill.FromSeq2(generate(ctx, rdr))

batches := rill.Batch(blobs, cfg.protoBatchSize, -1)
batches := rill.Batch(blobs, cfg.protoBatchSize, time.Second)

objects := rill.FlatMap(batches, int(cfg.nCPU), decode)

Expand Down Expand Up @@ -136,7 +136,8 @@ func (d *Decoder) Close() {
}

func (d *Decoder) loadHeader(reader io.Reader) error {
buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize))
buf := core.NewPooledBuffer()
defer buf.Close()

h, err := readBlobHeader(buf, reader)
if err != nil {
Expand All @@ -148,7 +149,7 @@ func (d *Decoder) loadHeader(reader io.Reader) error {
return err
}

e, err := extract(h, b, bytes.NewBuffer(make([]byte, 0, bufferSize)))
e, err := extract(h, b)
if err != nil {
return err
}
Expand Down
40 changes: 40 additions & 0 deletions internal/core/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2017-24 the original author or authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"bytes"
"sync"
)

var bufferPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}

type PooledBuffer struct {
*bytes.Buffer
}

func NewPooledBuffer() *PooledBuffer {
return &PooledBuffer{Buffer: bufferPool.Get().(*bytes.Buffer)}
}

func (b *PooledBuffer) Close() error {
b.Reset()
bufferPool.Put(b.Buffer)
return nil
}

0 comments on commit ddc4381

Please sign in to comment.