Skip to content

Commit

Permalink
Replace channel code with rill
Browse files Browse the repository at this point in the history
  • Loading branch information
maguro committed Nov 22, 2024
1 parent c493ad1 commit 4dc14ae
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 339 deletions.
6 changes: 0 additions & 6 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func BenchmarkLondon(b *testing.B) {
}

pbs, _ := strconv.Atoi(os.Getenv("PBF_PROTO_BUFFER_SIZE"))
icl, _ := strconv.Atoi(os.Getenv("PBF_INPUT_CHANNEL_LENGTH"))
ocl, _ := strconv.Atoi(os.Getenv("PBF_OUTPUT_CHANNEL_LENGTH"))
dcl, _ := strconv.Atoi(os.Getenv("PBF_DECODED_CHANNEL_LENGTH"))
ncpu, _ := strconv.Atoi(os.Getenv("PBF_NCPU"))

for n := 0; n < b.N; n++ {
Expand All @@ -56,9 +53,6 @@ func BenchmarkLondon(b *testing.B) {

if decoder, err := NewDecoder(context.Background(), in,
WithProtoBufferSize(pbs),
WithInputChannelLength(icl),
WithOutputChannelLength(ocl),
WithDecodedChannelLength(dcl),
WithNCpus(uint16(ncpu))); err != nil {
b.Fatal(err)
} else {
Expand Down
175 changes: 175 additions & 0 deletions blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,91 @@ package pbf

import (
"bytes"
"compress/zlib"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log/slog"
"time"

"github.com/destel/rill"
"google.golang.org/protobuf/proto"

"m4o.io/pbf/protobuf"
)

type blob struct {
header *protobuf.BlobHeader
blob *protobuf.Blob
}

func generate(ctx context.Context, reader io.Reader) func(yield func(enc blob, err error) bool) {
return func(yield func(enc blob, err error) bool) {
buffer := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize))

for {
select {
case <-ctx.Done():
return
default:
}

h, err := readBlobHeader(buffer, reader)
if err != nil {
if err != io.EOF {
slog.Error(err.Error())
yield(blob{}, err)
}

return
}

b, err := readBlob(buffer, reader, h)
if err != nil {
slog.Error(err.Error())
yield(blob{}, err)

return
}

if !yield(blob{header: h, blob: b}, nil) {
return
}

buffer.Reset()
}
}
}

func decode(array []blob) (out <-chan rill.Try[[]Object]) {
ch := make(chan rill.Try[[]Object])
out = ch

go func() {
defer close(ch)

buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize))

for _, enc := range array {
elements, err := extract(enc.header, enc.blob, buf)
if err != nil {
slog.Error(err.Error())
ch <- rill.Try[[]Object]{Error: err}

return
}

ch <- rill.Try[[]Object]{Value: elements}

buf.Reset()
}
}()

return
}

// readBlobHeader unmarshals a header from an array of protobuf encoded bytes.
// The header is used when decoding blobs into OSM elements.
func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) {
Expand Down Expand Up @@ -67,3 +145,100 @@ func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader)

return blob, nil
}

// elements unmarshals an array of OSM elements from an array of protobuf encoded
// bytes. The bytes could possibly be compressed; zlibBuf is used to facilitate
// decompression.
func extract(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Buffer) ([]Object, error) {
var buf []byte

switch {
case blob.Raw != nil:
buf = blob.GetRaw()

case blob.ZlibData != nil:
r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData()))
if err != nil {
return nil, err
}

zlibBuf.Reset()

rawBufferSize := int(blob.GetRawSize() + bytes.MinRead)
if rawBufferSize > zlibBuf.Cap() {
zlibBuf.Grow(rawBufferSize)
}

_, err = zlibBuf.ReadFrom(r)
if err != nil {
return nil, err
}

if zlibBuf.Len() != int(blob.GetRawSize()) {
err = fmt.Errorf("raw blob data size %d but expected %d", zlibBuf.Len(), blob.GetRawSize())

return nil, err
}

buf = zlibBuf.Bytes()

default:
return nil, errors.New("unknown blob data type")
}

ht := *header.Type

switch ht {
case "OSMHeader":
{
h, err := parseOSMHeader(buf)
if err != nil {
return nil, err
}

return []Object{h}, nil
}
case "OSMData":
return parsePrimitiveBlock(buf)
default:
return nil, fmt.Errorf("unknown header type %s", ht)
}
}

// parseOSMHeader unmarshals the OSM header from an array of protobuf encoded bytes.
func parseOSMHeader(buffer []byte) (*Header, error) {
hb := &protobuf.HeaderBlock{}
if err := proto.Unmarshal(buffer, hb); err != nil {
return nil, err
}

header := &Header{
RequiredFeatures: hb.GetRequiredFeatures(),
OptionalFeatures: hb.GetOptionalFeatures(),
WritingProgram: hb.GetWritingprogram(),
Source: hb.GetSource(),
OsmosisReplicationBaseURL: hb.GetOsmosisReplicationBaseUrl(),
OsmosisReplicationSequenceNumber: hb.GetOsmosisReplicationSequenceNumber(),
}

if hb.Bbox != nil {
header.BoundingBox = BoundingBox{
Left: toDegrees(0, 1, hb.Bbox.GetLeft()),
Right: toDegrees(0, 1, hb.Bbox.GetRight()),
Top: toDegrees(0, 1, hb.Bbox.GetTop()),
Bottom: toDegrees(0, 1, hb.Bbox.GetBottom()),
}
}

if hb.OsmosisReplicationTimestamp != nil {
header.OsmosisReplicationTimestamp = time.Unix(*hb.OsmosisReplicationTimestamp, 0)
}

return header, nil
}

// toDegrees converts a coordinate into Degrees, given the offset and
// granularity of the coordinate.
func toDegrees(offset int64, granularity int32, coordinate int64) Degrees {
return coordinatesPerDegree * Degrees(offset+(int64(granularity)*coordinate))
}
2 changes: 1 addition & 1 deletion cmd/pbf/cli/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"io"
"os"

pb "gopkg.in/cheggaaa/pb.v1"
"gopkg.in/cheggaaa/pb.v1"
)

// progressBar is an instance of ReadCloser with an associated ProgressBar.
Expand Down
55 changes: 34 additions & 21 deletions cmd/pbf/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/spf13/cobra"

"m4o.io/pbf"
"m4o.io/pbf/cmd/pbf/cli"
)
Expand All @@ -51,10 +52,7 @@ func init() { //nolint:gochecknoinits
flags.BoolP("extended", "e", false, "provide extended information (scans entire file)")
flags.BoolP("json", "j", false, "format information in JSON")
flags.Uint32P("buffer-length", "b", pbf.DefaultBufferSize, "buffer size for protobuf un-marshaling")
flags.Uint32P("raw-length", "r", pbf.DefaultInputChannelLength, "channel length of raw blobs")
flags.Uint16P("output-length", "o", pbf.DefaultOutputChannelLength, "channel length of decoded arrays of element")
flags.Uint16P("decoded-length", "d", pbf.DefaultDecodedChannelLength,
"channel length of decoded elements coalesced from output channels")
flags.Uint32P("unprocessed-batch-size", "u", pbf.DefaultBatchSize, "batch size for unprocessed blobs")
flags.Uint16P("cpu", "c", pbf.DefaultNCpu(), "number of CPUs to use for scanning")
flags.BoolP("silent", "s", false, "silence progress bar")
}
Expand All @@ -81,18 +79,28 @@ var infoCmd = &cobra.Command{
log.Fatal(err)
}
}
extended, err := flags.GetBool("extended")
if err != nil {
log.Fatal(err)
}

var opts []pbf.DecoderOption

ncpu, err := flags.GetUint16("cpu")
if err != nil {
log.Fatal(err)
}

extended, err := flags.GetBool("extended")
opts = append(opts, pbf.WithNCpus(ncpu))

batchSize, err := flags.GetUint32("unprocessed-batch-size")
if err != nil {
log.Fatal(err)
}

info := runInfo(win, ncpu, extended)
opts = append(opts, pbf.WithProtoBatchSize(int(batchSize)))

info := runInfo(win, extended, opts...)

err = win.Close()
if err != nil {
Expand All @@ -111,11 +119,14 @@ var infoCmd = &cobra.Command{
},
}

func runInfo(in io.Reader, ncpu uint16, extended bool) *extendedHeader {
d, err := pbf.NewDecoder(context.Background(), in, pbf.WithNCpus(ncpu))
func runInfo(in io.Reader, extended bool, opts ...pbf.DecoderOption) *extendedHeader {
ctx := context.Background()

d, err := pbf.NewDecoder(ctx, in, opts...)
if err != nil {
log.Fatal(err)
}

defer d.Close()

info := &extendedHeader{Header: d.Header}
Expand All @@ -125,25 +136,27 @@ func runInfo(in io.Reader, ncpu uint16, extended bool) *extendedHeader {
if extended {
done:
for {
v, err := d.Decode()
objs, err := d.Decode()
switch {
case err == io.EOF:
break done
case err != nil:
panic(err.Error())
default:
switch v := v.(type) {
case *pbf.Node:
// Process Node v.
nc++
case *pbf.Way:
// Process Way v.
wc++
case *pbf.Relation:
// Process Relation v.
rc++
default:
panic(fmt.Sprintf("unknown type %T\n", v))
for _, obj := range objs {
switch t := obj.(type) {
case *pbf.Node:
// Process Node obj.
nc++
case *pbf.Way:
// Process Way obj.
wc++
case *pbf.Relation:
// Process Relation obj.
rc++
default:
panic(fmt.Sprintf("unknown type %T\n", t))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pbf/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func testRunInfoWith(t *testing.T, extended bool, node int64, way int64, relatio
t.Fatalf("Unable to read data file %v", err)
}

info := runInfo(f, 2, extended)
info := runInfo(f, extended)
bbox := pbf.BoundingBox{Left: -0.511482, Right: 0.335437, Top: 51.69344, Bottom: 51.28554}
ts, _ := time.Parse(time.RFC3339, "2014-03-24T21:55:02Z")

Expand Down
Loading

0 comments on commit 4dc14ae

Please sign in to comment.