Skip to content

Commit

Permalink
Merge pull request #2886 from redpanda-data/couchbase
Browse files Browse the repository at this point in the history
couchbase: add output
  • Loading branch information
rockwotj authored Sep 24, 2024
2 parents 3e6168c + 86d6de6 commit 8c75b81
Show file tree
Hide file tree
Showing 8 changed files with 787 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ All notable changes to this project will be documented in this file.
- Field `metadata_max_age` added to the `kafka_franz` input. (@Scarjit)
- Field `metadata_max_age` added to the `kafka_migrator` input. (@mihaitodor)
- New experimental `cypher` output. (@rockwotj)
- New experimental `couchbase` output. (@rockwotj)

### Fixed

- Fixed a bug with the `input_resource` field for the `kafka_migrator` output where new topics weren't created as expected. (@mihaitodor)
- Fixed a bug in the `kafka_migrator` input which could lead to extra duplicate messages during a consumer group rebalance. (@mihaitodor)
- `kafka_migrator`, `kafka_migrator_offsets` and `kafka_migrator_bundle` components renamed to `redpanda_migrator`, `redpanda_migrator_offsets` and `redpanda_migrator_bundle` (@mihaitodor)

## 4.36.0 - 2024-09-11

### Added

- Fields `replication_factor` and `replication_factor_override` added to the `kafka_migrator` input and output. (@mihaitodor)
Expand Down
343 changes: 343 additions & 0 deletions docs/modules/components/pages/outputs/couchbase.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
= couchbase
:type: output
:status: experimental
:categories: ["Integration"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Performs operations against Couchbase for each message, allowing you to store or delete data.
Introduced in version 4.37.0.
[tabs]
======
Common::
+
--
```yml
# Common config fields, showing default values
output:
label: ""
couchbase:
url: couchbase://localhost:11210 # No default (required)
username: "" # No default (optional)
password: "" # No default (optional)
bucket: "" # No default (required)
id: ${! json("id") } # No default (required)
content: "" # No default (optional)
operation: upsert
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
```
--
Advanced::
+
--
```yml
# All config fields, showing default values
output:
label: ""
couchbase:
url: couchbase://localhost:11210 # No default (required)
username: "" # No default (optional)
password: "" # No default (optional)
bucket: "" # No default (required)
collection: _default
transcoder: legacy
timeout: 15s
id: ${! json("id") } # No default (required)
content: "" # No default (optional)
operation: upsert
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
```
--
======
When inserting, replacing or upserting documents, each must have the `content` property set.
== Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc].
== Fields
=== `url`
Couchbase connection string.
*Type*: `string`
```yml
# Examples
url: couchbase://localhost:11210
```
=== `username`
Username to connect to the cluster.
*Type*: `string`
=== `password`
Password to connect to the cluster.
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
=== `bucket`
Couchbase bucket.
*Type*: `string`
=== `collection`
Bucket collection.
*Type*: `string`
*Default*: `"_default"`
=== `transcoder`
Couchbase transcoder to use.
*Type*: `string`
*Default*: `"legacy"`
|===
| Option | Summary
| `json`
| JSONTranscoder implements the default transcoding behavior and applies JSON transcoding to all values. This will apply the following behavior to the value: binary ([]byte) -> error. default -> JSON value, JSON Flags.
| `legacy`
| LegacyTranscoder implements the behavior for a backward-compatible transcoder. This transcoder implements behavior matching that of gocb v1.This will apply the following behavior to the value: binary ([]byte) -> binary bytes, Binary expectedFlags. string -> string bytes, String expectedFlags. default -> JSON value, JSON expectedFlags.
| `raw`
| RawBinaryTranscoder implements passthrough behavior of raw binary data. This transcoder does not apply any serialization. This will apply the following behavior to the value: binary ([]byte) -> binary bytes, binary expectedFlags. default -> error.
| `rawjson`
| RawJSONTranscoder implements passthrough behavior of JSON data. This transcoder does not apply any serialization. It will forward data across the network without incurring unnecessary parsing costs. This will apply the following behavior to the value: binary ([]byte) -> JSON bytes, JSON expectedFlags. string -> JSON bytes, JSON expectedFlags. default -> error.
| `rawstring`
| RawStringTranscoder implements passthrough behavior of raw string data. This transcoder does not apply any serialization. This will apply the following behavior to the value: string -> string bytes, string expectedFlags. default -> error.
|===
=== `timeout`
Operation timeout.
*Type*: `string`
*Default*: `"15s"`
=== `id`
Document id.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
id: ${! json("id") }
```
=== `content`
Document content.
*Type*: `string`
=== `operation`
Couchbase operation to perform.
*Type*: `string`
*Default*: `"upsert"`
|===
| Option | Summary
| `insert`
| insert a new document.
| `remove`
| delete a document.
| `replace`
| replace the contents of a document.
| `upsert`
| creates a new document if it does not exist, if it does exist then it updates it.
|===
=== `max_in_flight`
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
*Type*: `int`
*Default*: `64`
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
*Type*: `object`
```yml
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```
=== `batching.count`
A number of messages at which the batch should be flushed. If `0` disables count based batching.
*Type*: `int`
*Default*: `0`
=== `batching.byte_size`
An amount of bytes at which the batch should be flushed. If `0` disables size based batching.
*Type*: `int`
*Default*: `0`
=== `batching.period`
A period in which an incomplete batch should be flushed regardless of its size.
*Type*: `string`
*Default*: `""`
```yml
# Examples
period: 1s
period: 1m
period: 500ms
```
=== `batching.check`
A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.
*Type*: `string`
*Default*: `""`
```yml
# Examples
check: this.type == "end_of_transaction"
```
=== `batching.processors`
A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
*Type*: `array`
```yml
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
```
2 changes: 1 addition & 1 deletion internal/impl/couchbase/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Cache struct {

// NewCache returns a Couchbase cache.
func NewCache(conf *service.ParsedConfig, mgr *service.Resources) (*Cache, error) {
cl, err := getClient(conf, mgr)
cl, err := getClient(conf)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 8c75b81

Please sign in to comment.