Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opensearch output fails when deleting non-existing document #162

Closed
arnitolog opened this issue Nov 13, 2024 · 0 comments · Fixed by #163
Closed

Opensearch output fails when deleting non-existing document #162

arnitolog opened this issue Nov 13, 2024 · 0 comments · Fixed by #163

Comments

@arnitolog
Copy link
Contributor

I'm creating a pipeline that will read data from kafka topic and ingest/delete it in Opensearch. Everything works fine, except the case when bento is trying to delete a non-existing document. In this case it fails with the error below:

time="2024-11-13T23:04:09Z" level=info msg="Running main config from specified file" bento_version=v1.3.0 path=/bento.yaml service=bento
time="2024-11-13T23:04:09Z" level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" service=bento
time="2024-11-13T23:04:09Z" level=info msg="Launching a Bento instance, use CTRL+C to close" service=bento
time="2024-11-13T23:04:09Z" level=info msg="Output type stdout is now active" label="" path=root.output.fallback.1 service=bento
time="2024-11-13T23:04:09Z" level=info msg="Output type opensearch is now active" label="" path=root.output.fallback.0.reject_errored service=bento
time="2024-11-13T23:04:09Z" level=info msg="Input type kafka is now active" label=kafka_topic path=root.input service=bento
time="2024-11-13T23:04:09Z" level=debug msg="Starting consumer group" label=kafka_topic path=root.input service=bento
time="2024-11-13T23:04:12Z" level=debug msg="Consuming messages from topic 'ap-test' partition '0'" label=kafka_topic path=root.input service=bento
time="2024-11-13T23:04:13Z" level=debug msg="Successfully dispatched [%!s(uint64=2)] documents in 334ms (%!s(int64=5) docs/sec)" label="" path=root.output.fallback.0.reject_errored service=bento
time="2024-11-13T23:06:51Z" level=debug msg="Successfully dispatched [%!s(uint64=1)] documents in 25ms (%!s(int64=40) docs/sec)" label="" path=root.output.fallback.0.reject_errored service=bento
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x1b11fdc]

goroutine 105 [running]:
github.com/warpstreamlabs/bento/internal/impl/opensearch.(*Output).buildBulkableRequest.func1({_, _}, {{0xc002410420, 0x11}, {0x59a3ca0, 0x6}, {0xc001bbaa28, 0x3}, 0x0, 0x0, ...}, ...)
	/go/src/github.com/warpstreamlabs/bento/internal/impl/opensearch/output.go:386 +0x3c
github.com/opensearch-project/opensearch-go/v3/opensearchutil.(*worker).flush(0xc001b89800, {0x665de40?, 0xc00215a960?})
	/go/pkg/mod/github.com/opensearch-project/opensearch-go/v3@v3.0.0/opensearchutil/bulk_indexer.go:531 +0x98d
github.com/opensearch-project/opensearch-go/v3/opensearchutil.(*bulkIndexer).Close(0xc002a9e840, {0x665de40, 0xc00215a960})
	/go/pkg/mod/github.com/opensearch-project/opensearch-go/v3@v3.0.0/opensearchutil/bulk_indexer.go:237 +0x190
github.com/warpstreamlabs/bento/internal/impl/opensearch.(*Output).WriteBatch(0xc001b21320, {0x665de40, 0xc00215a960}, {0xc0019af010?, 0x1, 0x1})
	/go/src/github.com/warpstreamlabs/bento/internal/impl/opensearch/output.go:315 +0x767
github.com/warpstreamlabs/bento/public/service.(*airGapBatchWriter).WriteBatch(0xc0026a0ae0, {0x665de40, 0xc00215a960}, {0xc0019aeff0, 0x1, 0x1})
	/go/src/github.com/warpstreamlabs/bento/public/service/output.go:117 +0xc5
github.com/warpstreamlabs/bento/internal/component/output.(*AsyncWriter).latencyMeasuringWrite(0xc000b0ba00, {0x665de40, 0xc00215a960}, {0xc0019aeff0, 0x1, 0x1})
	/go/src/github.com/warpstreamlabs/bento/internal/component/output/async_writer.go:81 +0x89
github.com/warpstreamlabs/bento/internal/component/output.(*AsyncWriter).loop.func4()
	/go/src/github.com/warpstreamlabs/bento/internal/component/output/async_writer.go:209 +0x38d
created by github.com/warpstreamlabs/bento/internal/component/output.(*AsyncWriter).loop in goroutine 54
	/go/src/github.com/warpstreamlabs/bento/internal/component/output/async_writer.go:247 +0x725

the config is the following:

  input:
    label: "kafka_topic"
    kafka:
      addresses:
        - kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9093
      topics:
        - ap-test
      target_version: 3.8.0 
      tls:
        enabled: true
        skip_cert_verify: true
      consumer_group: "bento-opensearch-sink"
      start_from_oldest: true
      checkpoint_limit: 1024
      auto_replay_nacks: true
      commit_period: 1s
      max_processing_period: 100ms
      #extract_tracing_map: root = @ # No default (optional)

  pipeline:
    processors:
      - mapping: |
          root.tmp.key = metadata("kafka_key").not_null()
          root.tmp.key = metadata("kafka_key").not_empty()
          root.tmp.header = metadata("docid").not_null()
          root.tmp.header = metadata("docid").not_empty()
          root.tmp = deleted()
          meta action = match {
            metadata("delete") == "true" => "delete",
            _ => "index",
          }
          root.doc = this
  output:
    label: "opensearch_index"
    fallback:
      - reject_errored:
          opensearch:
            urls:
              - https://time-cluster.elasticsearch.svc.cluster.local:9200
            index: time-ap-test-${! metadata("kafka_key") }
            action: ${! metadata("action") }
            id: ${! metadata("docid") }
            tls:
              enabled: true
              skip_cert_verify: true
            max_in_flight: 64
            basic_auth:
              enabled: true
              username: "test-user"
              password: "xxxxxxxxxx"
            batching:
              count: 1000
              byte_size: 0
              period: "1s"
              #jitter: 0.1
      - stdout:
          codec: lines

  logger:
    level: DEBUG
    format: logfmt
    level_name: level
    message_name: msg
    timestamp_name: time
    add_timestamp: true
    static_fields:
      'service': bento

  metrics:
    prometheus: {}
    mapping: ""
@arnitolog arnitolog changed the title Opensearch output fails on delete non-existing document Opensearch output fails when deleting non-existing document Nov 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant