From 4dc14ae9d71b5cd319cace2ca6d3a7a8a59cd2c7 Mon Sep 17 00:00:00 2001 From: "Alan D. Cabrera" Date: Mon, 28 Oct 2024 19:30:21 -0700 Subject: [PATCH] Replace channel code with rill --- bench_test.go | 6 - blob.go | 175 +++++++++++++++++++ cmd/pbf/cli/pb.go | 2 +- cmd/pbf/info/info.go | 55 +++--- cmd/pbf/info/info_test.go | 2 +- decoder.go | 344 ++++++-------------------------------- decoder_elements.go | 21 +-- decoder_test.go | 8 +- go.mod | 1 + go.sum | 2 + models.go | 12 ++ 11 files changed, 289 insertions(+), 339 deletions(-) diff --git a/bench_test.go b/bench_test.go index 9d5bd47..5c6e65d 100644 --- a/bench_test.go +++ b/bench_test.go @@ -44,9 +44,6 @@ func BenchmarkLondon(b *testing.B) { } pbs, _ := strconv.Atoi(os.Getenv("PBF_PROTO_BUFFER_SIZE")) - icl, _ := strconv.Atoi(os.Getenv("PBF_INPUT_CHANNEL_LENGTH")) - ocl, _ := strconv.Atoi(os.Getenv("PBF_OUTPUT_CHANNEL_LENGTH")) - dcl, _ := strconv.Atoi(os.Getenv("PBF_DECODED_CHANNEL_LENGTH")) ncpu, _ := strconv.Atoi(os.Getenv("PBF_NCPU")) for n := 0; n < b.N; n++ { @@ -56,9 +53,6 @@ func BenchmarkLondon(b *testing.B) { if decoder, err := NewDecoder(context.Background(), in, WithProtoBufferSize(pbs), - WithInputChannelLength(icl), - WithOutputChannelLength(ocl), - WithDecodedChannelLength(dcl), WithNCpus(uint16(ncpu))); err != nil { b.Fatal(err) } else { diff --git a/blob.go b/blob.go index 05faf04..1668a6a 100644 --- a/blob.go +++ b/blob.go @@ -16,13 +16,91 @@ package pbf import ( "bytes" + "compress/zlib" + "context" "encoding/binary" + "errors" + "fmt" "io" + "log/slog" + "time" + "github.com/destel/rill" "google.golang.org/protobuf/proto" + "m4o.io/pbf/protobuf" ) +type blob struct { + header *protobuf.BlobHeader + blob *protobuf.Blob +} + +func generate(ctx context.Context, reader io.Reader) func(yield func(enc blob, err error) bool) { + return func(yield func(enc blob, err error) bool) { + buffer := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) + + for { + select { + case <-ctx.Done(): + return + default: + } + + h, err := readBlobHeader(buffer, reader) + if err != nil { + if err != io.EOF { + slog.Error(err.Error()) + yield(blob{}, err) + } + + return + } + + b, err := readBlob(buffer, reader, h) + if err != nil { + slog.Error(err.Error()) + yield(blob{}, err) + + return + } + + if !yield(blob{header: h, blob: b}, nil) { + return + } + + buffer.Reset() + } + } +} + +func decode(array []blob) (out <-chan rill.Try[[]Object]) { + ch := make(chan rill.Try[[]Object]) + out = ch + + go func() { + defer close(ch) + + buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) + + for _, enc := range array { + elements, err := extract(enc.header, enc.blob, buf) + if err != nil { + slog.Error(err.Error()) + ch <- rill.Try[[]Object]{Error: err} + + return + } + + ch <- rill.Try[[]Object]{Value: elements} + + buf.Reset() + } + }() + + return +} + // readBlobHeader unmarshals a header from an array of protobuf encoded bytes. // The header is used when decoding blobs into OSM elements. func readBlobHeader(buffer *bytes.Buffer, rdr io.Reader) (header *protobuf.BlobHeader, err error) { @@ -67,3 +145,100 @@ func readBlob(buffer *bytes.Buffer, rdr io.Reader, header *protobuf.BlobHeader) return blob, nil } + +// elements unmarshals an array of OSM elements from an array of protobuf encoded +// bytes. The bytes could possibly be compressed; zlibBuf is used to facilitate +// decompression. +func extract(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Buffer) ([]Object, error) { + var buf []byte + + switch { + case blob.Raw != nil: + buf = blob.GetRaw() + + case blob.ZlibData != nil: + r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData())) + if err != nil { + return nil, err + } + + zlibBuf.Reset() + + rawBufferSize := int(blob.GetRawSize() + bytes.MinRead) + if rawBufferSize > zlibBuf.Cap() { + zlibBuf.Grow(rawBufferSize) + } + + _, err = zlibBuf.ReadFrom(r) + if err != nil { + return nil, err + } + + if zlibBuf.Len() != int(blob.GetRawSize()) { + err = fmt.Errorf("raw blob data size %d but expected %d", zlibBuf.Len(), blob.GetRawSize()) + + return nil, err + } + + buf = zlibBuf.Bytes() + + default: + return nil, errors.New("unknown blob data type") + } + + ht := *header.Type + + switch ht { + case "OSMHeader": + { + h, err := parseOSMHeader(buf) + if err != nil { + return nil, err + } + + return []Object{h}, nil + } + case "OSMData": + return parsePrimitiveBlock(buf) + default: + return nil, fmt.Errorf("unknown header type %s", ht) + } +} + +// parseOSMHeader unmarshals the OSM header from an array of protobuf encoded bytes. +func parseOSMHeader(buffer []byte) (*Header, error) { + hb := &protobuf.HeaderBlock{} + if err := proto.Unmarshal(buffer, hb); err != nil { + return nil, err + } + + header := &Header{ + RequiredFeatures: hb.GetRequiredFeatures(), + OptionalFeatures: hb.GetOptionalFeatures(), + WritingProgram: hb.GetWritingprogram(), + Source: hb.GetSource(), + OsmosisReplicationBaseURL: hb.GetOsmosisReplicationBaseUrl(), + OsmosisReplicationSequenceNumber: hb.GetOsmosisReplicationSequenceNumber(), + } + + if hb.Bbox != nil { + header.BoundingBox = BoundingBox{ + Left: toDegrees(0, 1, hb.Bbox.GetLeft()), + Right: toDegrees(0, 1, hb.Bbox.GetRight()), + Top: toDegrees(0, 1, hb.Bbox.GetTop()), + Bottom: toDegrees(0, 1, hb.Bbox.GetBottom()), + } + } + + if hb.OsmosisReplicationTimestamp != nil { + header.OsmosisReplicationTimestamp = time.Unix(*hb.OsmosisReplicationTimestamp, 0) + } + + return header, nil +} + +// toDegrees converts a coordinate into Degrees, given the offset and +// granularity of the coordinate. +func toDegrees(offset int64, granularity int32, coordinate int64) Degrees { + return coordinatesPerDegree * Degrees(offset+(int64(granularity)*coordinate)) +} diff --git a/cmd/pbf/cli/pb.go b/cmd/pbf/cli/pb.go index dfd83bb..b94131c 100644 --- a/cmd/pbf/cli/pb.go +++ b/cmd/pbf/cli/pb.go @@ -19,7 +19,7 @@ import ( "io" "os" - pb "gopkg.in/cheggaaa/pb.v1" + "gopkg.in/cheggaaa/pb.v1" ) // progressBar is an instance of ReadCloser with an associated ProgressBar. diff --git a/cmd/pbf/info/info.go b/cmd/pbf/info/info.go index d84ff22..22ac508 100644 --- a/cmd/pbf/info/info.go +++ b/cmd/pbf/info/info.go @@ -26,6 +26,7 @@ import ( "github.com/dustin/go-humanize" "github.com/spf13/cobra" + "m4o.io/pbf" "m4o.io/pbf/cmd/pbf/cli" ) @@ -51,10 +52,7 @@ func init() { //nolint:gochecknoinits flags.BoolP("extended", "e", false, "provide extended information (scans entire file)") flags.BoolP("json", "j", false, "format information in JSON") flags.Uint32P("buffer-length", "b", pbf.DefaultBufferSize, "buffer size for protobuf un-marshaling") - flags.Uint32P("raw-length", "r", pbf.DefaultInputChannelLength, "channel length of raw blobs") - flags.Uint16P("output-length", "o", pbf.DefaultOutputChannelLength, "channel length of decoded arrays of element") - flags.Uint16P("decoded-length", "d", pbf.DefaultDecodedChannelLength, - "channel length of decoded elements coalesced from output channels") + flags.Uint32P("unprocessed-batch-size", "u", pbf.DefaultBatchSize, "batch size for unprocessed blobs") flags.Uint16P("cpu", "c", pbf.DefaultNCpu(), "number of CPUs to use for scanning") flags.BoolP("silent", "s", false, "silence progress bar") } @@ -81,18 +79,28 @@ var infoCmd = &cobra.Command{ log.Fatal(err) } } + extended, err := flags.GetBool("extended") + if err != nil { + log.Fatal(err) + } + + var opts []pbf.DecoderOption ncpu, err := flags.GetUint16("cpu") if err != nil { log.Fatal(err) } - extended, err := flags.GetBool("extended") + opts = append(opts, pbf.WithNCpus(ncpu)) + + batchSize, err := flags.GetUint32("unprocessed-batch-size") if err != nil { log.Fatal(err) } - info := runInfo(win, ncpu, extended) + opts = append(opts, pbf.WithProtoBatchSize(int(batchSize))) + + info := runInfo(win, extended, opts...) err = win.Close() if err != nil { @@ -111,11 +119,14 @@ var infoCmd = &cobra.Command{ }, } -func runInfo(in io.Reader, ncpu uint16, extended bool) *extendedHeader { - d, err := pbf.NewDecoder(context.Background(), in, pbf.WithNCpus(ncpu)) +func runInfo(in io.Reader, extended bool, opts ...pbf.DecoderOption) *extendedHeader { + ctx := context.Background() + + d, err := pbf.NewDecoder(ctx, in, opts...) if err != nil { log.Fatal(err) } + defer d.Close() info := &extendedHeader{Header: d.Header} @@ -125,25 +136,27 @@ func runInfo(in io.Reader, ncpu uint16, extended bool) *extendedHeader { if extended { done: for { - v, err := d.Decode() + objs, err := d.Decode() switch { case err == io.EOF: break done case err != nil: panic(err.Error()) default: - switch v := v.(type) { - case *pbf.Node: - // Process Node v. - nc++ - case *pbf.Way: - // Process Way v. - wc++ - case *pbf.Relation: - // Process Relation v. - rc++ - default: - panic(fmt.Sprintf("unknown type %T\n", v)) + for _, obj := range objs { + switch t := obj.(type) { + case *pbf.Node: + // Process Node obj. + nc++ + case *pbf.Way: + // Process Way obj. + wc++ + case *pbf.Relation: + // Process Relation obj. + rc++ + default: + panic(fmt.Sprintf("unknown type %T\n", t)) + } } } } diff --git a/cmd/pbf/info/info_test.go b/cmd/pbf/info/info_test.go index ef56290..921df76 100644 --- a/cmd/pbf/info/info_test.go +++ b/cmd/pbf/info/info_test.go @@ -36,7 +36,7 @@ func testRunInfoWith(t *testing.T, extended bool, node int64, way int64, relatio t.Fatalf("Unable to read data file %v", err) } - info := runInfo(f, 2, extended) + info := runInfo(f, extended) bbox := pbf.BoundingBox{Left: -0.511482, Right: 0.335437, Top: 51.69344, Bottom: 51.28554} ts, _ := time.Parse(time.RFC3339, "2014-03-24T21:55:02Z") diff --git a/decoder.go b/decoder.go index 7be0d4a..9ccee84 100644 --- a/decoder.go +++ b/decoder.go @@ -16,17 +16,13 @@ package pbf import ( "bytes" - "compress/zlib" "context" - "errors" "fmt" "io" "reflect" "runtime" - "time" - "google.golang.org/protobuf/proto" - "m4o.io/pbf/protobuf" + "github.com/destel/rill" ) const ( @@ -35,53 +31,30 @@ const ( // DefaultBufferSize is the default buffer size for protobuf un-marshaling. DefaultBufferSize = 1024 * 1024 - // DefaultInputChannelLength is the default channel length of raw blobs. - DefaultInputChannelLength = 16 - - // DefaultOutputChannelLength is the default channel length of decoded arrays of element. - DefaultOutputChannelLength = 8 - - // DefaultDecodedChannelLength is the default channel length of decoded elements coalesced from output channels. - DefaultDecodedChannelLength = 8000 + // DefaultBatchSize is the default batch size for unprocessed blobs. + DefaultBatchSize = 16 coordinatesPerDegree = 1e-9 ) // DefaultNCpu provides the default number of CPUs. func DefaultNCpu() uint16 { - return uint16(runtime.GOMAXPROCS(-1)) -} - -type encoded struct { - header *protobuf.BlobHeader - blob *protobuf.Blob - err error -} - -type decoded struct { - elements []any - err error -} - -type pair struct { - element any - err error + cpus := uint16(runtime.GOMAXPROCS(-1)) + return max(cpus-1, 1) } // Decoder reads and decodes OpenStreetMap PBF data from an input stream. type Decoder struct { - Header Header - pairs chan pair - cancel context.CancelFunc + Header Header + Objects <-chan rill.Try[[]Object] + cancel context.CancelFunc } // decoderOptions provides optional configuration parameters for Decoder construction. type decoderOptions struct { - protoBufferSize int // buffer size for protobuf un-marshaling - inputChannelLength int // channel length of raw blobs - outputChannelLength int // channel length of decoded arrays of element - decodedChannelLength int // channel length of decoded elements coalesced from output channels - nCPU uint16 // the number of CPUs to use for background processing + protoBufferSize int // buffer size for protobuf un-marshaling + protoBatchSize int // batch size for protobuf un-marshaling + nCPU uint16 // the number of CPUs to use for background processing } // DecoderOption configures how we set up the decoder. @@ -94,24 +67,10 @@ func WithProtoBufferSize(s int) DecoderOption { } } -// WithInputChannelLength lets you set the channel length of raw blobs. -func WithInputChannelLength(l int) DecoderOption { - return func(o *decoderOptions) { - o.inputChannelLength = l - } -} - -// WithOutputChannelLength lets you set the channel length of decoded arrays of element. -func WithOutputChannelLength(l int) DecoderOption { +// WithProtoBatchSize lets you set the buffer size for protobuf un-marshaling. +func WithProtoBatchSize(s int) DecoderOption { return func(o *decoderOptions) { - o.outputChannelLength = l - } -} - -// WithDecodedChannelLength lets you set the channel length of decoded elements coalesced from output channels. -func WithDecodedChannelLength(l int) DecoderOption { - return func(o *decoderOptions) { - o.decodedChannelLength = l + o.protoBatchSize = s } } @@ -124,57 +83,34 @@ func WithNCpus(n uint16) DecoderOption { // defaultDecoderConfig provides a default configuration for decoders. var defaultDecoderConfig = decoderOptions{ - protoBufferSize: DefaultBufferSize, - inputChannelLength: DefaultInputChannelLength, - outputChannelLength: DefaultOutputChannelLength, - decodedChannelLength: DefaultDecodedChannelLength, - nCPU: DefaultNCpu(), + protoBufferSize: DefaultBufferSize, + protoBatchSize: DefaultBatchSize, + nCPU: DefaultNCpu(), } // NewDecoder returns a new decoder, configured with cfg, that reads from // reader. The decoder is initialized with the OSM header. -func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (*Decoder, error) { +func NewDecoder(ctx context.Context, rdr io.Reader, opts ...DecoderOption) (*Decoder, error) { d := &Decoder{} - c := defaultDecoderConfig + cfg := defaultDecoderConfig for _, opt := range opts { - opt(&c) + opt(&cfg) } ctx, d.cancel = context.WithCancel(ctx) - buf := bytes.NewBuffer(make([]byte, 0, c.protoBufferSize)) - - h, err := readBlobHeader(buf, reader) - if err != nil { + if err := d.loadHeader(rdr); err != nil { return nil, err } - b, err := readBlob(buf, reader, h) - if err != nil { - return nil, err - } + blobs := rill.FromSeq2(generate(ctx, rdr)) - e, err := elements(h, b, bytes.NewBuffer(make([]byte, 0, bufferSize))) - if err != nil { - return nil, err - } - - if hdr, ok := e[0].(*Header); !ok { - err = fmt.Errorf("expected header data but got %v", reflect.TypeOf(e[0])) - - return nil, err - } else { - d.Header = *hdr - } + batches := rill.Batch(blobs, cfg.protoBatchSize, -1) - // create decoding pipelines - var outputs []chan decoded - for _, input := range read(ctx, reader, c) { - outputs = append(outputs, decode(input, c)) - } + objects := rill.FlatMap(batches, int(cfg.nCPU), decode) - d.pairs = coalesce(c, outputs...) + d.Objects = objects return d, nil } @@ -183,233 +119,47 @@ func NewDecoder(ctx context.Context, reader io.Reader, opts ...DecoderOption) (* // or Relation struct representing the underlying OpenStreetMap PBF data, or // error encountered. The end of the input stream is reported by an io.EOF // error. -func (d *Decoder) Decode() (any, error) { - decoded, more := <-d.pairs +func (d *Decoder) Decode() ([]Object, error) { + decoded, more := <-d.Objects if !more { return nil, io.EOF } - return decoded.element, decoded.err + return decoded.Value, decoded.Error } // Close will cancel the background decoding pipeline. func (d *Decoder) Close() { - d.cancel() -} - -// read obtains OSM blobs and sends them down, in a round-robin manner, a list -// of channels to be decoded. -func read(ctx context.Context, rdr io.Reader, cfg decoderOptions) (inputs []chan encoded) { - n := cfg.nCPU - for i := uint16(0); i < n; i++ { - inputs = append(inputs, make(chan encoded, cfg.inputChannelLength)) - } - - go func() { - defer func() { - for _, input := range inputs { - close(input) - } - }() - - buffer := bytes.NewBuffer(make([]byte, 0, cfg.protoBufferSize)) - - var i uint16 - - for { - input := inputs[i] - i = (i + 1) % n - - h, err := readBlobHeader(buffer, rdr) - if err == io.EOF { - return - } else if err != nil { - input <- encoded{err: err} - - return - } - - b, err := readBlob(buffer, rdr, h) - if err != nil { - input <- encoded{err: err} - - return - } + rill.DrainNB(d.Objects) - select { - case <-ctx.Done(): - return - case input <- encoded{header: h, blob: b}: - } - } - }() - - return inputs -} - -// decode decodes blob/header pairs into an array of OSM elements. These -// arrays are placed onto an output channel where they will be coalesced into -// their correct order. -func decode(input <-chan encoded, cfg decoderOptions) (output chan decoded) { - output = make(chan decoded, cfg.outputChannelLength) - - buf := bytes.NewBuffer(make([]byte, 0, cfg.protoBufferSize)) - - go func() { - defer close(output) - - for { - raw, more := <-input - if !more { - return - } - - if raw.err != nil { - output <- decoded{nil, raw.err} - - return - } - - elements, err := elements(raw.header, raw.blob, buf) - - output <- decoded{elements, err} - } - }() - - return -} - -// coalesce merges the list of channels in a round-robin manner and sends the -// elements in pairs down a channel of pairs. -func coalesce(cfg decoderOptions, outputs ...chan decoded) (pairs chan pair) { - pairs = make(chan pair, cfg.decodedChannelLength) - - go func() { - defer close(pairs) - - n := len(outputs) - - var i int - - for { - output := outputs[i] - i = (i + 1) % n - - decoded, more := <-output - if !more { - // Since the channels are inspected round-robin, when one channel - // is done, all subsequent channels are done. - return - } - - if decoded.err != nil { - pairs <- pair{nil, decoded.err} - - return - } - - for _, e := range decoded.elements { - pairs <- pair{e, nil} - } - } - }() - - return pairs + d.cancel() } -// elements unmarshals an array of OSM elements from an array of protobuf encoded -// bytes. The bytes could possibly be compressed; zlibBuf is used to facilitate -// decompression. -func elements(header *protobuf.BlobHeader, blob *protobuf.Blob, zlibBuf *bytes.Buffer) ([]any, error) { - var buf []byte - - switch { - case blob.Raw != nil: - buf = blob.GetRaw() +func (d *Decoder) loadHeader(reader io.Reader) error { + buf := bytes.NewBuffer(make([]byte, 0, DefaultBufferSize)) - case blob.ZlibData != nil: - r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData())) - if err != nil { - return nil, err - } - - zlibBuf.Reset() - - rawBufferSize := int(blob.GetRawSize() + bytes.MinRead) - if rawBufferSize > zlibBuf.Cap() { - zlibBuf.Grow(rawBufferSize) - } - - _, err = zlibBuf.ReadFrom(r) - if err != nil { - return nil, err - } - - if zlibBuf.Len() != int(blob.GetRawSize()) { - err = fmt.Errorf("raw blob data size %d but expected %d", zlibBuf.Len(), blob.GetRawSize()) - - return nil, err - } - - buf = zlibBuf.Bytes() - - default: - return nil, errors.New("unknown blob data type") - } - - ht := *header.Type - - switch ht { - case "OSMHeader": - { - h, err := parseOSMHeader(buf) - if err != nil { - return nil, err - } - - return []any{h}, nil - } - case "OSMData": - return parsePrimitiveBlock(buf) - default: - return nil, fmt.Errorf("unknown header type %s", ht) + h, err := readBlobHeader(buf, reader) + if err != nil { + return err } -} -// parseOSMHeader unmarshals the OSM header from an array of protobuf encoded bytes. -func parseOSMHeader(buffer []byte) (*Header, error) { - hb := &protobuf.HeaderBlock{} - if err := proto.Unmarshal(buffer, hb); err != nil { - return nil, err + b, err := readBlob(buf, reader, h) + if err != nil { + return err } - header := &Header{ - RequiredFeatures: hb.GetRequiredFeatures(), - OptionalFeatures: hb.GetOptionalFeatures(), - WritingProgram: hb.GetWritingprogram(), - Source: hb.GetSource(), - OsmosisReplicationBaseURL: hb.GetOsmosisReplicationBaseUrl(), - OsmosisReplicationSequenceNumber: hb.GetOsmosisReplicationSequenceNumber(), + e, err := extract(h, b, bytes.NewBuffer(make([]byte, 0, bufferSize))) + if err != nil { + return err } - if hb.Bbox != nil { - header.BoundingBox = BoundingBox{ - Left: toDegrees(0, 1, hb.Bbox.GetLeft()), - Right: toDegrees(0, 1, hb.Bbox.GetRight()), - Top: toDegrees(0, 1, hb.Bbox.GetTop()), - Bottom: toDegrees(0, 1, hb.Bbox.GetBottom()), - } - } + if hdr, ok := e[0].(*Header); !ok { + err = fmt.Errorf("expected header data but got %v", reflect.TypeOf(e[0])) - if hb.OsmosisReplicationTimestamp != nil { - header.OsmosisReplicationTimestamp = time.Unix(*hb.OsmosisReplicationTimestamp, 0) + return err + } else { + d.Header = *hdr } - return header, nil -} - -// toDegrees converts a coordinate into Degrees, given the offset and -// granularity of the coordinate. -func toDegrees(offset int64, granularity int32, coordinate int64) Degrees { - return coordinatesPerDegree * Degrees(offset+(int64(granularity)*coordinate)) + return nil } diff --git a/decoder_elements.go b/decoder_elements.go index 94fa0ef..f4a4694 100644 --- a/decoder_elements.go +++ b/decoder_elements.go @@ -18,10 +18,11 @@ import ( "time" "google.golang.org/protobuf/proto" + "m4o.io/pbf/protobuf" ) -func parsePrimitiveBlock(buffer []byte) ([]any, error) { +func parsePrimitiveBlock(buffer []byte) ([]Object, error) { pb := &protobuf.PrimitiveBlock{} if err := proto.Unmarshal(buffer, pb); err != nil { return nil, err @@ -29,7 +30,7 @@ func parsePrimitiveBlock(buffer []byte) ([]any, error) { c := newBlockContext(pb) - elements := make([]any, 0) + elements := make([]Object, 0) for _, pg := range pb.GetPrimitivegroup() { elements = append(elements, c.decodeNodes(pg.GetNodes())...) elements = append(elements, c.decodeDenseNodes(pg.GetDense())...) @@ -40,8 +41,8 @@ func parsePrimitiveBlock(buffer []byte) ([]any, error) { return elements, nil } -func (c *blockContext) decodeNodes(nodes []*protobuf.Node) (elements []any) { - elements = make([]any, len(nodes)) +func (c *blockContext) decodeNodes(nodes []*protobuf.Node) (elements []Object) { + elements = make([]Object, len(nodes)) for i, node := range nodes { elements[i] = &Node{ @@ -56,9 +57,9 @@ func (c *blockContext) decodeNodes(nodes []*protobuf.Node) (elements []any) { return elements } -func (c *blockContext) decodeDenseNodes(nodes *protobuf.DenseNodes) []any { +func (c *blockContext) decodeDenseNodes(nodes *protobuf.DenseNodes) []Object { ids := nodes.GetId() - elements := make([]any, len(ids)) + elements := make([]Object, len(ids)) tic := c.newTagsContext(nodes.GetKeysVals()) dic := c.newDenseInfoContext(nodes.GetDenseinfo()) @@ -83,8 +84,8 @@ func (c *blockContext) decodeDenseNodes(nodes *protobuf.DenseNodes) []any { return elements } -func (c *blockContext) decodeWays(nodes []*protobuf.Way) []any { - elements := make([]any, len(nodes)) +func (c *blockContext) decodeWays(nodes []*protobuf.Way) []Object { + elements := make([]Object, len(nodes)) for i, node := range nodes { refs := node.GetRefs() @@ -108,8 +109,8 @@ func (c *blockContext) decodeWays(nodes []*protobuf.Way) []any { return elements } -func (c *blockContext) decodeRelations(nodes []*protobuf.Relation) []any { - elements := make([]any, len(nodes)) +func (c *blockContext) decodeRelations(nodes []*protobuf.Relation) []Object { + elements := make([]Object, len(nodes)) for i, node := range nodes { elements[i] = &Relation{ diff --git a/decoder_test.go b/decoder_test.go index 1b2046b..0e67c5a 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -48,7 +48,7 @@ func publicDecodeOsmPbf(t *testing.T, file string, expectedEntries int) { var nEntries int for { - e, err := decoder.Decode() + objs, err := decoder.Decode() if err != nil { if err != io.EOF { t.Errorf("Error decoding%v", err) @@ -57,9 +57,11 @@ func publicDecodeOsmPbf(t *testing.T, file string, expectedEntries int) { } } - assert.NotEqual(t, reflect.TypeOf(Header{}), reflect.TypeOf(e)) + for _, obj := range objs { + assert.NotEqual(t, reflect.TypeOf(Header{}), reflect.TypeOf(obj)) + } - nEntries++ + nEntries = nEntries + len(objs) } assert.Equal(t, expectedEntries, nEntries, "Incorrect number of elements") diff --git a/go.mod b/go.mod index c20b08b..8fa6119 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23 toolchain go1.23.2 require ( + github.com/destel/rill v0.5.0 github.com/dustin/go-humanize v1.0.1 github.com/golang/geo v0.0.0-20230421003525-6adc56603217 github.com/spf13/cobra v1.8.1 diff --git a/go.sum b/go.sum index 08405db..10c03ae 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/destel/rill v0.5.0 h1:+1F/IqH8rjkScTEHDPiYL2SJomEtnGGczO9uIJKNuFY= +github.com/destel/rill v0.5.0/go.mod h1:srKuXzvGqINUEGYR5b/iwvW+L9/S35RxVHWGYbXNoO4= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= diff --git a/models.go b/models.go index 7d91250..30c4edc 100644 --- a/models.go +++ b/models.go @@ -145,6 +145,8 @@ type Header struct { OsmosisReplicationBaseURL string } +func (r Header) foo() {} + // Info represents information common to Node, Way, and Relation elements. type Info struct { Version int32 @@ -155,6 +157,10 @@ type Info struct { Visible bool } +type Object interface { + foo() +} + // Node represents a specific point on the earth's surface defined by its // latitude and longitude. Each node comprises at least an id number and a // pair of coordinates. @@ -166,6 +172,8 @@ type Node struct { Lon Degrees } +func (r Node) foo() {} + // Way is an ordered list of between 2 and 2,000 nodes that define a polyline. type Way struct { ID uint64 @@ -174,6 +182,8 @@ type Way struct { NodeIDs []uint64 } +func (r Way) foo() {} + // ElementType is an enumeration of relation types. type ElementType int @@ -203,3 +213,5 @@ type Relation struct { Info *Info Members []Member } + +func (r Relation) foo() {}