diff --git a/go.mod b/go.mod index 44853877f..69f32d143 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/warpstreamlabs/bento replace github.com/99designs/keyring => github.com/Jeffail/keyring v1.2.3 require ( - cloud.google.com/go/bigquery v1.59.0 + cloud.google.com/go/bigquery v1.59.1 cloud.google.com/go/pubsub v1.36.1 cloud.google.com/go/storage v1.37.0 cuelang.org/go v0.7.0 @@ -49,7 +49,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/clbanning/mxj/v2 v2.7.0 github.com/colinmarc/hdfs v1.1.3 - github.com/couchbase/gocb/v2 v2.8.0 + github.com/couchbase/gocb/v2 v2.9.1 github.com/denisenkom/go-mssqldb v0.12.3 github.com/dgraph-io/ristretto v0.1.1 github.com/dop251/goja v0.0.0-20231014103939-873a1496dc8e @@ -154,10 +154,10 @@ require ( require ( cloud.google.com/go v0.112.0 // indirect - cloud.google.com/go/compute v1.23.3 // indirect + cloud.google.com/go/compute v1.24.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.6 // indirect - cloud.google.com/go/trace v1.10.4 // indirect + cloud.google.com/go/trace v1.10.5 // indirect dario.cat/mergo v1.0.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect @@ -207,10 +207,10 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/apd/v3 v3.2.1 // indirect github.com/containerd/continuity v0.3.0 // indirect - github.com/couchbase/gocbcore/v10 v10.4.0 // indirect - github.com/couchbase/gocbcoreps v0.1.2 // indirect + github.com/couchbase/gocbcore/v10 v10.5.1 // indirect + github.com/couchbase/gocbcoreps v0.1.3 // indirect github.com/couchbase/goprotostellar v1.0.2 // indirect - github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131 // indirect + github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/danieljoos/wincred v1.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -320,7 +320,7 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect @@ -333,10 +333,10 @@ require ( golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect - google.golang.org/grpc v1.62.1 // indirect + google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.63.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect diff --git a/go.sum b/go.sum index 694122df4..64126e666 100644 --- a/go.sum +++ b/go.sum @@ -13,10 +13,10 @@ cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnP cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= -cloud.google.com/go/bigquery v1.59.0 h1:0NVDUJ9gRrPCZY6pkigoIUpDmYRWd+Dvp77cuAM4BMU= -cloud.google.com/go/bigquery v1.59.0/go.mod h1:VP1UJYgevyTwsV7desjzNzDND5p6hZB+Z8gZJN1GQUc= -cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= -cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/bigquery v1.59.1 h1:CpT+/njKuKT3CEmswm6IbhNu9u35zt5dO4yPDLW+nG4= +cloud.google.com/go/bigquery v1.59.1/go.mod h1:VP1UJYgevyTwsV7desjzNzDND5p6hZB+Z8gZJN1GQUc= +cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= +cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datacatalog v1.19.3 h1:A0vKYCQdxQuV4Pi0LL9p39Vwvg4jH5yYveMv50gU5Tw= @@ -25,14 +25,14 @@ cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7 cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= -cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= -cloud.google.com/go/kms v1.15.5/go.mod h1:cU2H5jnp6G2TDpUGZyqTCoy1n16fbubHZjmVXSMtwDI= +cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM= +cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI= cloud.google.com/go/logging v1.9.0 h1:iEIOXFO9EmSiTjDmfpbRjOxECO7R8C7b8IXUGOj7xZw= cloud.google.com/go/logging v1.9.0/go.mod h1:1Io0vnZv4onoUnsVUQY3HZ3Igb1nBchky0A0y7BBBhE= cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= -cloud.google.com/go/monitoring v1.17.0 h1:blrdvF0MkPPivSO041ihul7rFMhXdVp8Uq7F59DKXTU= -cloud.google.com/go/monitoring v1.17.0/go.mod h1:KwSsX5+8PnXv5NJnICZzW2R8pWTis8ypC4zmdRD63Tw= +cloud.google.com/go/monitoring v1.18.0 h1:NfkDLQDG2UR3WYZVQE8kwSbUIEyIqJUPl+aOQdFH1T4= +cloud.google.com/go/monitoring v1.18.0/go.mod h1:c92vVBCeq/OB4Ioyo+NbN2U7tlg5ZH41PZcdvfc+Lcg= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -43,8 +43,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.37.0 h1:WI8CsaFO8Q9KjPVtsZ5Cmi0dXV25zMoX0FklT7c3Jm4= cloud.google.com/go/storage v1.37.0/go.mod h1:i34TiT2IhiNDmcj65PqwCjcoUX7Z5pLzS8DEmoiFq1k= -cloud.google.com/go/trace v1.10.4 h1:2qOAuAzNezwW3QN+t41BtkDJOG42HywL73q8x/f6fnM= -cloud.google.com/go/trace v1.10.4/go.mod h1:Nso99EDIK8Mj5/zmB+iGr9dosS/bzWCJ8wGmE6TXNWY= +cloud.google.com/go/trace v1.10.5 h1:0pr4lIKJ5XZFYD9GtxXEWr0KkVeigc3wlGpZco0X1oA= +cloud.google.com/go/trace v1.10.5/go.mod h1:9hjCV1nGBCtXbAE4YK7OqJ8pmPYSxPA0I67JwRd5s3M= cuelabs.dev/go/oci/ociregistry v0.0.0-20231103182354-93e78c079a13 h1:zkiIe8AxZ/kDjqQN+mDKc5BxoVJOqioSdqApjc+eB1I= cuelabs.dev/go/oci/ociregistry v0.0.0-20231103182354-93e78c079a13/go.mod h1:XGKYSMtsJWfqQYPwq51ZygxAPqpEUj/9bdg16iDPTAA= cuelang.org/go v0.7.0 h1:gMztinxuKfJwMIxtboFsNc6s8AxwJGgsJV+3CuLffHI= @@ -312,8 +312,6 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= -github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/apd/v3 v3.2.1 h1:U+8j7t0axsIgvQUqthuNm82HIrYXodOV2iWLWtEaIwg= @@ -325,19 +323,18 @@ github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvA github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/couchbase/gocb/v2 v2.8.0 h1:KoG44zWrP4QgK724D7D2rXHgRlztwkAPFQVApJCJaB4= -github.com/couchbase/gocb/v2 v2.8.0/go.mod h1:GL6M8F4eB5ZuoTYh2RzwCUheVVi4EADdCQ3yc52kqUI= -github.com/couchbase/gocbcore/v10 v10.4.0 h1:ItBAQdxl5I9CBkt/XqlRB/Ni4Ej2k2OK1ClB2HHipVE= -github.com/couchbase/gocbcore/v10 v10.4.0/go.mod h1:lYQIIk+tzoMcwtwU5GzPbDdqEkwkH3isI2rkSpfL0oM= -github.com/couchbase/gocbcoreps v0.1.2 h1:wlGyyMnkWpCNOlTtfy8UG+8XZsFtqTJtPXz63+QKC58= -github.com/couchbase/gocbcoreps v0.1.2/go.mod h1:33hSdOKnrUVaBqw4+RiqW+2JoD8ylkbvqm89Wg81uXk= +github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ= +github.com/couchbase/gocb/v2 v2.9.1/go.mod h1:TMAeK34yUdcASdV4mGcYuwtkAWckRBYN5uvMCEgPfXo= +github.com/couchbase/gocbcore/v10 v10.5.1 h1:bwlV/zv/fSQLuO14M9k49K7yWgcWfjSgMyfRGhW1AyU= +github.com/couchbase/gocbcore/v10 v10.5.1/go.mod h1:rulbgUK70EuyRUiLQ0LhQAfSI/Rl+jWws8tTbHzvB6M= +github.com/couchbase/gocbcoreps v0.1.3 h1:fILaKGCjxFIeCgAUG8FGmRDSpdrRggohOMKEgO9CUpg= +github.com/couchbase/gocbcoreps v0.1.3/go.mod h1:hBFpDNPnRno6HH5cRXExhqXYRmTsFJlFHQx7vztcXPk= github.com/couchbase/goprotostellar v1.0.2 h1:yoPbAL9sCtcyZ5e/DcU5PRMOEFaJrF9awXYu3VPfGls= github.com/couchbase/goprotostellar v1.0.2/go.mod h1:5/yqVnZlW2/NSbAWu1hPJCFBEwjxgpe0PFFOlRixnp4= -github.com/couchbaselabs/gocaves/client v0.0.0-20230307083111-cc3960c624b1/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259 h1:2TXy68EGEzIMHOx9UvczR5ApVecwCfQZ0LjkmwMI6g4= github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= -github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131 h1:2EAfFswAfgYn3a05DVcegiw6DgMgn1Mv5eGz6IHt1Cw= -github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20230515165046-68b522a21131/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= +github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 h1:lhGOw8rNG6RAadmmaJAF3PJ7MNt7rFuWG7BHCYMgnGE= +github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= @@ -404,8 +401,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= @@ -1086,8 +1081,8 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= @@ -1463,12 +1458,12 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe h1:USL2DhxfgRchafRvt/wYyyQNzwgL7ZiURcozOE/Pkvo= -google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= -google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 h1:x9PwdEgd11LgK+orcck69WVRo7DezSO4VUMPI4xpc8A= -google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014/go.mod h1:rbHMSEDyoYX62nRVLOCc4Qt1HbsdytAYoVwgjiOhF3I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1484,8 +1479,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/impl/couchbase/cache_test.go b/internal/impl/couchbase/cache_test.go index 5b4450fde..e47e7eca1 100644 --- a/internal/impl/couchbase/cache_test.go +++ b/internal/impl/couchbase/cache_test.go @@ -48,13 +48,17 @@ cache_resources: ) } -func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ +func getCluster(ctx context.Context, tb testing.TB, port string) (*gocb.Cluster, error) { + return gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ Authenticator: gocb.PasswordAuthenticator{ Username: username, Password: password, }, }) +} + +func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error { + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } @@ -65,12 +69,7 @@ func removeBucket(ctx context.Context, tb testing.TB, port, bucket string) error } func createBucket(ctx context.Context, tb testing.TB, port, bucket string) error { - cluster, err := gocb.Connect(fmt.Sprintf("couchbase://localhost:%v", port), gocb.ClusterOptions{ - Authenticator: gocb.PasswordAuthenticator{ - Username: username, - Password: password, - }, - }) + cluster, err := getCluster(ctx, tb, port) if err != nil { return err } diff --git a/internal/impl/couchbase/couchbase.go b/internal/impl/couchbase/couchbase.go index 91bf60a5a..9ede4885a 100644 --- a/internal/impl/couchbase/couchbase.go +++ b/internal/impl/couchbase/couchbase.go @@ -6,56 +6,72 @@ import ( "github.com/couchbase/gocb/v2" ) -func valueFromOp(op gocb.BulkOp) (out any, err error) { +func valueFromOp(op gocb.BulkOp) (out any, cas gocb.Cas, err error) { switch o := op.(type) { case *gocb.GetOp: if o.Err != nil { - return nil, o.Err + return nil, gocb.Cas(0), o.Err } err := o.Result.Content(&out) - return out, err + + return out, o.Result.Cas(), err case *gocb.InsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.RemoveOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.ReplaceOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err case *gocb.UpsertOp: - return nil, o.Err + if o.Result != nil { + return nil, o.Result.Cas(), o.Err + } + return nil, gocb.Cas(0), o.Err } - return nil, errors.New("type not supported") + return nil, gocb.Cas(0), errors.New("type not supported") } -func get(key string, _ []byte) gocb.BulkOp { +func get(key string, _ []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.GetOp{ ID: key, } } -func insert(key string, data []byte) gocb.BulkOp { +func insert(key string, data []byte, _ gocb.Cas) gocb.BulkOp { return &gocb.InsertOp{ ID: key, Value: data, } } -func remove(key string, _ []byte) gocb.BulkOp { +func remove(key string, _ []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.RemoveOp{ - ID: key, + ID: key, + Cas: cas, } } -func replace(key string, data []byte) gocb.BulkOp { +func replace(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.ReplaceOp{ ID: key, Value: data, + Cas: cas, } } -func upsert(key string, data []byte) gocb.BulkOp { +func upsert(key string, data []byte, cas gocb.Cas) gocb.BulkOp { return &gocb.UpsertOp{ ID: key, Value: data, + Cas: cas, } } diff --git a/internal/impl/couchbase/integration_test.go b/internal/impl/couchbase/integration_test.go index 64f82f840..6d21d939c 100644 --- a/internal/impl/couchbase/integration_test.go +++ b/internal/impl/couchbase/integration_test.go @@ -2,15 +2,20 @@ package couchbase_test import ( "bytes" + "context" "fmt" "os" "sync" "testing" "time" + "github.com/go-faker/faker/v4" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" + "github.com/warpstreamlabs/bento/public/service/integration" ) var ( @@ -106,3 +111,260 @@ func setupCouchbase(tb testing.TB) (*dockertest.Pool, *dockertest.Resource, erro return pool, resource, nil } + +func TestIntegrationCouchbaseProcessor(t *testing.T) { + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + + bucket := fmt.Sprintf("testing-processor-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + uid := faker.UUIDHyphenated() + payload := fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + + t.Run("Insert", func(t *testing.T) { + testCouchbaseProcessorInsert(uid, payload, bucket, servicePort, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) + }) + t.Run("Remove", func(t *testing.T) { + testCouchbaseProcessorRemove(uid, bucket, servicePort, t) + }) + t.Run("GetMissing", func(t *testing.T) { + testCouchbaseProcessorGetMissing(uid, bucket, servicePort, t) + }) + + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Upsert", func(t *testing.T) { + testCouchbaseProcessorUpsert(uid, payload, bucket, servicePort, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) + }) + + payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) + t.Run("Replace", func(t *testing.T) { + testCouchbaseProcessorReplace(uid, payload, bucket, servicePort, t) + }) + t.Run("Get", func(t *testing.T) { + testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) + }) +} + +func TestIntegrationCouchbaseStream(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + for _, clearCAS := range []bool{true, false} { + t.Run(fmt.Sprintf("%t", clearCAS), func(t *testing.T) { + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + cas_enabled: true + id: '${! json("key") }' + content: 'root = this' + operation: 'insert' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + + // replace + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + cas_enabled: true + id: '${! json("key") }' + content: 'root = this' + operation: 'replace' +`, servicePort, bucket, username, password))) + + if clearCAS { // ignore cas check + require.NoError(t, streamOutBuilder.AddProcessorYAML(` +mapping: | + meta couchbase_cas = deleted() +`)) + } + // remove + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + cas_enabled: true + id: '${! json("key") }' + operation: 'remove' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch processing should be fine and contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.NoError(t, outBatches[0][0].GetError()) + }) + } +} + +func TestIntegrationCouchbaseStreamError(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + + servicePort := requireCouchbase(t) + bucket := fmt.Sprintf("testing-stream-error-%d", time.Now().Unix()) + require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) + t.Cleanup(func() { + require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) + }) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + + inFn, err := streamOutBuilder.AddBatchProducerFunc() + require.NoError(t, err) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(c context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb) + outBatchMut.Unlock() + return nil + })) + + // insert + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +couchbase: + url: 'couchbase://localhost:%s' + bucket: %s + username: %s + password: %s + cas_enabled: true + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'insert' +`, servicePort, bucket, username, password))) + + // upsert and remove in parallel + require.NoError(t, streamOutBuilder.AddProcessorYAML(fmt.Sprintf(` +workflow: + meta_path: "" + branches: + write: + processors: + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + cas_enabled: true + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'replace' + remove: + processors: + - sleep: + duration: "1s" + - couchbase: + url: 'couchbase://localhost:%[1]s' + bucket: %[2]s + username: %[3]s + password: %[4]s + cas_enabled: true + id: '${! json("key") }' + content: | + root = this + root.at = timestamp_unix_micro() + operation: 'replace' +`, servicePort, bucket, username, password))) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + go func() { + err = streamOut.Run(context.Background()) + require.NoError(t, err) + }() + + require.NoError(t, inFn(ctx, service.MessageBatch{ + service.NewMessage([]byte(`{"key":"hello","value":"word"}`)), + })) + require.NoError(t, streamOut.StopWithin(time.Second*15)) + + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + return len(outBatches) == 1 + }, time.Second*5, time.Millisecond*100) + + // batch contain one message. + assert.NoError(t, err) + assert.Len(t, outBatches, 1) + assert.Len(t, outBatches[0], 1) + + // message should contain an error. + assert.Error(t, outBatches[0][0].GetError()) +} diff --git a/internal/impl/couchbase/processor.go b/internal/impl/couchbase/processor.go index 8a60aac56..5a6a46cff 100644 --- a/internal/impl/couchbase/processor.go +++ b/internal/impl/couchbase/processor.go @@ -12,6 +12,11 @@ import ( "github.com/warpstreamlabs/bento/public/service" ) +const ( + // MetaCASKey holds the CAS value of an entry. + MetaCASKey = "couchbase_cas" +) + var ( // ErrInvalidOperation specified operation is not supported. ErrInvalidOperation = errors.New("invalid operation") @@ -26,7 +31,7 @@ func ProcessorConfig() *service.ConfigSpec { Version("1.0.0"). Categories("Integration"). Summary("Performs operations against Couchbase for each message, allowing you to store or retrieve data within message payloads."). - Description("When inserting, replacing or upserting documents, each must have the `content` property set."). + Description("When inserting, replacing or upserting documents, each must have the `content` property set.\n\n### Concurrent Document Mutations\nTo prevent read/write conflicts, Couchbase returns a [_Compare And Swap_ (CAS)](https://docs.couchbase.com/go-sdk/current/howtos/concurrent-document-mutations.html) value with each accessed document. Bento stores these as key/value pairs in metadata with the `couchbase_cas` field. Note: CAS checks are disabled by default. You can configure this by changing the value of `cas_enabled: true`. Future versions will see this enabled by default."). Field(service.NewInterpolatedStringField("id").Description("Document id.").Example(`${! json("id") }`)). Field(service.NewBloblangField("content").Description("Document content.").Optional()). Field(service.NewStringAnnotatedEnumField("operation", map[string]string{ @@ -36,6 +41,7 @@ func ProcessorConfig() *service.ConfigSpec { string(client.OperationReplace): "replace the contents of a document.", string(client.OperationUpsert): "creates a new document if it does not exist, if it does exist then it updates it.", }).Description("Couchbase operation to perform.").Default(string(client.OperationGet))). + Field(service.NewBoolField("cas_enabled").Description("Enable CAS validation.").Default(false)). // TODO: Enable by default in next release LintRule(`root = if ((this.operation == "insert" || this.operation == "replace" || this.operation == "upsert") && !this.exists("content")) { [ "content must be set for insert, replace and upsert operations." ] }`) } @@ -56,9 +62,10 @@ func init() { // batch. type Processor struct { *couchbaseClient - id *service.InterpolatedString - content *bloblang.Executor - op func(key string, data []byte) gocb.BulkOp + id *service.InterpolatedString + content *bloblang.Executor + op func(key string, data []byte, cas gocb.Cas) gocb.BulkOp + casEnabled bool } // NewProcessor returns a Couchbase processor. @@ -81,6 +88,11 @@ func NewProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*Processo } } + p.casEnabled, err = conf.FieldBool("cas_enabled") + if err != nil { + return nil, err + } + op, err := conf.FieldString("operation") if err != nil { return nil, err @@ -123,7 +135,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } // generate query - for index := range newMsg { + for index, msg := range newMsg { // generate id k, err := inBatch.TryInterpolatedString(index, p.id) if err != nil { @@ -143,7 +155,16 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } } - ops[index] = p.op(k, content) + var cas gocb.Cas // retrieve cas if set and enabled + if p.casEnabled { + if val, ok := msg.MetaGetMut(MetaCASKey); ok { + if v, ok := val.(gocb.Cas); ok { + cas = v + } + } + } + + ops[index] = p.op(k, content, cas) } // execute @@ -154,7 +175,7 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat // set results for index, part := range newMsg { - out, err := valueFromOp(ops[index]) + out, cas, err := valueFromOp(ops[index]) if err != nil { part.SetError(fmt.Errorf("couchbase operator failed: %w", err)) } @@ -164,6 +185,8 @@ func (p *Processor) ProcessBatch(ctx context.Context, inBatch service.MessageBat } else if out != nil { part.SetStructured(out) } + + part.MetaSetMut(MetaCASKey, cas) } return []service.MessageBatch{newMsg}, nil diff --git a/internal/impl/couchbase/processor_test.go b/internal/impl/couchbase/processor_test.go index ba9469303..74918d997 100644 --- a/internal/impl/couchbase/processor_test.go +++ b/internal/impl/couchbase/processor_test.go @@ -4,15 +4,14 @@ import ( "context" "fmt" "testing" - "time" - "github.com/go-faker/faker/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/warpstreamlabs/bento/internal/impl/couchbase" "github.com/warpstreamlabs/bento/public/service" - "github.com/warpstreamlabs/bento/public/service/integration" + + _ "github.com/warpstreamlabs/bento/public/components/pure" ) func TestProcessorConfigLinting(t *testing.T) { @@ -102,50 +101,6 @@ couchbase: } } -func TestIntegrationCouchbaseProcessor(t *testing.T) { - integration.CheckSkip(t) - - servicePort := requireCouchbase(t) - - bucket := fmt.Sprintf("testing-processor-%d", time.Now().Unix()) - require.NoError(t, createBucket(context.Background(), t, servicePort, bucket)) - t.Cleanup(func() { - require.NoError(t, removeBucket(context.Background(), t, servicePort, bucket)) - }) - - uid := faker.UUIDHyphenated() - payload := fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) - - t.Run("Insert", func(t *testing.T) { - testCouchbaseProcessorInsert(uid, payload, bucket, servicePort, t) - }) - t.Run("Get", func(t *testing.T) { - testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) - }) - t.Run("Remove", func(t *testing.T) { - testCouchbaseProcessorRemove(uid, bucket, servicePort, t) - }) - t.Run("GetMissing", func(t *testing.T) { - testCouchbaseProcessorGetMissing(uid, bucket, servicePort, t) - }) - - payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) - t.Run("Upsert", func(t *testing.T) { - testCouchbaseProcessorUpsert(uid, payload, bucket, servicePort, t) - }) - t.Run("Get", func(t *testing.T) { - testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) - }) - - payload = fmt.Sprintf(`{"id": %q, "data": %q}`, uid, faker.Sentence()) - t.Run("Replace", func(t *testing.T) { - testCouchbaseProcessorReplace(uid, payload, bucket, servicePort, t) - }) - t.Run("Get", func(t *testing.T) { - testCouchbaseProcessorGet(uid, payload, bucket, servicePort, t) - }) -} - func getProc(tb testing.TB, config string) *couchbase.Processor { tb.Helper() @@ -181,6 +136,11 @@ operation: 'insert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -207,6 +167,11 @@ operation: 'upsert' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -233,6 +198,11 @@ operation: 'replace' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -258,6 +228,11 @@ operation: 'get' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message should contain expected payload. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -283,6 +258,11 @@ operation: 'remove' assert.Len(t, msgOut, 1) assert.Len(t, msgOut[0], 1) + // check CAS + cas, ok := msgOut[0][0].MetaGetMut(couchbase.MetaCASKey) + assert.True(t, ok) + assert.NotEmpty(t, cas) + // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() assert.NoError(t, err) @@ -309,7 +289,7 @@ operation: 'get' assert.Len(t, msgOut[0], 1) // message should contain an error. - assert.Error(t, msgOut[0][0].GetError(), "TODO") + assert.Error(t, msgOut[0][0].GetError()) // message content should stay the same. dataOut, err := msgOut[0][0].AsBytes() diff --git a/website/docs/components/processors/couchbase.md b/website/docs/components/processors/couchbase.md index 1b9a388e4..33c265ef5 100644 --- a/website/docs/components/processors/couchbase.md +++ b/website/docs/components/processors/couchbase.md @@ -41,6 +41,7 @@ couchbase: id: ${! json("id") } # No default (required) content: "" # No default (optional) operation: get + cas_enabled: false ``` @@ -60,6 +61,7 @@ couchbase: id: ${! json("id") } # No default (required) content: "" # No default (optional) operation: get + cas_enabled: false ``` @@ -67,6 +69,9 @@ couchbase: When inserting, replacing or upserting documents, each must have the `content` property set. +### Concurrent Document Mutations +To prevent read/write conflicts, Couchbase returns a [_Compare And Swap_ (CAS)](https://docs.couchbase.com/go-sdk/current/howtos/concurrent-document-mutations.html) value with each accessed document. Bento stores these as key/value pairs in metadata with the `couchbase_cas` field. Note: CAS checks are disabled by default. You can configure this by changing the value of `cas_enabled: true`. Future versions will see this enabled by default. + ## Fields ### `url` @@ -177,4 +182,12 @@ Default: `"get"` | `upsert` | creates a new document if it does not exist, if it does exist then it updates it. | +### `cas_enabled` + +Enable CAS validation. + + +Type: `bool` +Default: `false` +