From 431e8b5223865cb7421e820c4094a6bad24ed786 Mon Sep 17 00:00:00 2001 From: Steven Sklar Date: Wed, 18 Sep 2024 11:15:52 -0400 Subject: [PATCH] Add QuestDB output component (#115) --- go.mod | 5 +- go.sum | 45 +- internal/impl/questdb/integration_test.go | 93 ++++ internal/impl/questdb/output.go | 506 +++++++++++++++++++++ internal/impl/questdb/output_test.go | 269 +++++++++++ internal/impl/questdb/timestamp.go | 50 ++ public/components/all/package.go | 1 + public/components/questdb/package.go | 6 + website/docs/components/outputs/questdb.md | 481 ++++++++++++++++++++ 9 files changed, 1452 insertions(+), 4 deletions(-) create mode 100644 internal/impl/questdb/integration_test.go create mode 100644 internal/impl/questdb/output.go create mode 100644 internal/impl/questdb/output_test.go create mode 100644 internal/impl/questdb/timestamp.go create mode 100644 public/components/questdb/package.go create mode 100644 website/docs/components/outputs/questdb.md diff --git a/go.mod b/go.mod index e15b12142..9838201d4 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/itchyny/gojq v0.12.14 github.com/itchyny/timefmt-go v0.1.5 + github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v4 v4.18.2 github.com/jhump/protoreflect v1.15.6 github.com/jmespath/go-jmespath v0.4.0 @@ -103,6 +104,7 @@ require ( github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.46.0 github.com/pusher/pusher-http-go v4.0.1+incompatible + github.com/questdb/go-questdb-client/v3 v3.2.0 github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc github.com/r3labs/diff/v3 v3.0.1 github.com/rabbitmq/amqp091-go v1.9.0 @@ -166,7 +168,7 @@ require ( github.com/AthenZ/athenz v1.10.62 // indirect github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect - github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect github.com/ClickHouse/ch-go v0.61.5 // indirect github.com/DataDog/zstd v1.5.2 // indirect @@ -265,7 +267,6 @@ require ( github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.3 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.3 // indirect diff --git a/go.sum b/go.sum index f5c1513f4..6e66c1c06 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8 github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc= github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= @@ -145,6 +145,8 @@ github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA4 github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8= +github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= @@ -340,8 +342,12 @@ github.com/cockroachdb/apd/v3 v3.2.1/go.mod h1:klXJcjp+FffLTHlhIG69tezTDvdP065na github.com/colinmarc/hdfs v1.1.3 h1:662salalXLFmp+ctD+x0aG+xOg62lnVnOJHksXYpFBw= github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463TymfZG02I= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= +github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0= +github.com/containerd/containerd v1.7.12/go.mod h1:/5OMpE1p0ylxtEUGY8kuCYkDRzJm9NO1TFMWjUpdevk= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -360,6 +366,8 @@ github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259 h1:2T github.com/couchbaselabs/gocaves/client v0.0.0-20230404095311-05e3ba4f0259/go.mod h1:AVekAZwIY2stsJOMWLAS/0uA/+qdp7pjO8EHnl61QkY= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 h1:lhGOw8rNG6RAadmmaJAF3PJ7MNt7rFuWG7BHCYMgnGE= github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E= +github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= +github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= @@ -381,6 +389,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= +github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= +github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= @@ -489,6 +499,8 @@ github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL98+wF9xc8zWvFonSJ8= @@ -828,6 +840,10 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= +github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a h1:N9zuLhTvBSRt0gWSiJswwQ2HqDmtX/ZCDJURnKUt1Ik= +github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= +github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= +github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -866,7 +882,13 @@ github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTS github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= +github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= +github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= +github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= +github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -878,6 +900,8 @@ github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3P github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY/JU= github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mpvl/unique v0.0.0-20150818121801-cbe035fff7de h1:D5x39vF5KCwKQaw+OC9ZPiLVHXz3UFw2+psEX+gYcto= github.com/mpvl/unique v0.0.0-20150818121801-cbe035fff7de/go.mod h1:kJun4WP5gFuHZgRjZUWWuH1DTxCtxbHDOIJsudS8jzY= github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs= @@ -979,6 +1003,8 @@ github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo= github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= +github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -1007,6 +1033,8 @@ github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0 h1:sadMIs github.com/protocolbuffers/txtpbfmt v0.0.0-20230328191034-3462fbc510c0/go.mod h1:jgxiZysxFPM+iWKwQwPR+y+Jvo54ARd4EisXxKYpB5c= github.com/pusher/pusher-http-go v4.0.1+incompatible h1:4u6tomPG1WhHaST7Wi9mw83Y+MS/j2EplR2YmDh8Xp4= github.com/pusher/pusher-http-go v4.0.1+incompatible/go.mod h1:XAv1fxRmVTI++2xsfofDhg7whapsLRG/gH/DXbF3a18= +github.com/questdb/go-questdb-client/v3 v3.2.0 h1:rFlkc3tD+vNucd4dkNv2xN5xqcFJGwqxt3F5p2H8zrg= +github.com/questdb/go-questdb-client/v3 v3.2.0/go.mod h1:kXoftTVQZlksdJ9tsHQRWfdWO5Kyl4bZuKotyyeWa3c= github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc h1:hK577yxEJ2f5s8w2iy2KimZmgrdAUZUNftE1ESmg2/Q= github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc/go.mod h1:OQt6Zo5B3Zs+C49xul8kcHo+fZ1mCLPvd0LFxiZ2DHc= github.com/r3labs/diff/v3 v3.0.1 h1:CBKqf3XmNRHXKmdU7mZP1w7TV0pDyVCis1AUHtA4Xtg= @@ -1047,6 +1075,11 @@ github.com/segmentio/encoding v0.4.0 h1:MEBYvRqiUB2nfR2criEXWqwdY6HJOUrCn5hboVOV github.com/segmentio/encoding v0.4.0/go.mod h1:/d03Cd8PoaDeceuhUUUQWjU0KhWjrmYrWPgtJHYZSnI= github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4= +github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= +github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= @@ -1091,11 +1124,17 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/testcontainers/testcontainers-go v0.28.0 h1:1HLm9qm+J5VikzFDYhOd+Zw12NtOl+8drH2E8nTY1r8= +github.com/testcontainers/testcontainers-go v0.28.0/go.mod h1:COlDpUXbwW3owtpMkEB1zo9gwb1CoKVKlyrVPejF4AU= github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tilinna/z85 v1.0.0 h1:uqFnJBlD01dosSeo5sK1G1YGbPuwqVHqR+12OJDRjUw= github.com/tilinna/z85 v1.0.0/go.mod h1:EfpFU/DUY4ddEy6CRvk2l+UQNEzHbh+bqBQS+04Nkxs= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/trinodb/trino-go-client v0.313.0 h1:lp8N9JKTqMuZ9LlAwLjgUtkwDnJc8fjpJmunpZ3afjk= github.com/trinodb/trino-go-client v0.313.0/go.mod h1:YpZf2WAClFhU+n0ZhdkmMbugYaMRM/mjywiQru0wpeQ= github.com/trivago/grok v1.0.0 h1:oV2ljyZT63tgXkmgEHg2U0jMqiKKuL0hkn49s6aRavQ= @@ -1149,6 +1188,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= diff --git a/internal/impl/questdb/integration_test.go b/internal/impl/questdb/integration_test.go new file mode 100644 index 000000000..4ca5b4067 --- /dev/null +++ b/internal/impl/questdb/integration_test.go @@ -0,0 +1,93 @@ +package questdb + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "testing" + "time" + + "github.com/jackc/pgconn" + qdb "github.com/questdb/go-questdb-client/v3" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/warpstreamlabs/bento/public/service/integration" +) + +func TestIntegrationQuestDB(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute * 3 + resource, err := pool.Run("questdb/questdb", "8.0.0", []string{ + "JAVA_OPTS=-Xms512m -Xmx512m", + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + if err = pool.Retry(func() error { + clientConfStr := fmt.Sprintf("http::addr=localhost:%v", resource.GetPort("9000/tcp")) + sender, err := qdb.LineSenderFromConf(ctx, clientConfStr) + if err != nil { + return err + } + defer sender.Close(ctx) + err = sender.Table("ping").Int64Column("test", 42).AtNow(ctx) + if err != nil { + return err + } + return sender.Flush(ctx) + }); err != nil { + t.Fatalf("Could not connect to docker resource: %s", err) + } + + _ = resource.Expire(900) + + template := ` +output: + questdb: + address: "localhost:$PORT" + table: $ID +` + queryGetFn := func(ctx context.Context, testID, messageID string) (string, []string, error) { + pgConn, err := pgconn.Connect(ctx, fmt.Sprintf("postgresql://admin:quest@localhost:%v", resource.GetPort("8812/tcp"))) + require.NoError(t, err) + defer pgConn.Close(ctx) + + result := pgConn.ExecParams(ctx, fmt.Sprintf("SELECT content, id FROM '%v' WHERE id=%v", testID, messageID), nil, nil, nil, nil) + + result.NextRow() + id, err := strconv.Atoi(string(result.Values()[1])) + assert.NoError(t, err) + data := map[string]any{ + "content": string(result.Values()[0]), + "id": id, + } + + assert.False(t, result.NextRow()) + + outputBytes, err := json.Marshal(data) + require.NoError(t, err) + return string(outputBytes), nil, nil + } + + suite := integration.StreamTests( + integration.StreamTestOutputOnlySendSequential(10, queryGetFn), + integration.StreamTestOutputOnlySendBatch(10, queryGetFn), + ) + suite.Run( + t, template, + integration.StreamTestOptPort(resource.GetPort("9000/tcp")), + ) +} diff --git a/internal/impl/questdb/output.go b/internal/impl/questdb/output.go new file mode 100644 index 000000000..1dcad87c0 --- /dev/null +++ b/internal/impl/questdb/output.go @@ -0,0 +1,506 @@ +package questdb + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + qdb "github.com/questdb/go-questdb-client/v3" + "github.com/warpstreamlabs/bento/public/service" +) + +func questdbOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Summary("Pushes messages to a QuestDB table."). + Description(` +:::warning Important +We recommend that the dedupe feature is enabled on the QuestDB server. Please visit https://questdb.io/docs/ for more information about deploying, configuring, and using QuestDB. +:::`+service.OutputPerformanceDocs(true, true)). + Categories("Services"). + Fields( + service.NewOutputMaxInFlightField(), + service.NewBatchPolicyField("batching"), + service.NewTLSToggledField("tls"), + service.NewStringField("address"). + Description("Address of the QuestDB server's HTTP port (excluding protocol)"). + Example("localhost:9000"), + service.NewStringField("username"). + Description("Username for HTTP basic auth"). + Optional(), + service.NewStringField("password"). + Description("Password for HTTP basic auth"). + Optional(). + Secret(), + service.NewStringField("token"). + Description("Bearer token for HTTP auth (takes precedence over basic auth username & password)"). + Optional(). + Secret(), + service.NewDurationField("retry_timeout"). + Description("The time to continue retrying after a failed HTTP request. The interval between retries is an exponential "+ + "backoff starting at 10ms and doubling after each failed attempt up to a maximum of 1 second."). + Optional(). + Advanced(), + service.NewDurationField("request_timeout"). + Description("The time to wait for a response from the server. This is in addition to the calculation "+ + "derived from the request_min_throughput parameter."). + Optional(). + Advanced(), + service.NewIntField("request_min_throughput"). + Description("Minimum expected throughput in bytes per second for HTTP requests. If the throughput is lower than this value, "+ + "the connection will time out. This is used to calculate an additional timeout on top of request_timeout. This is useful for large requests. "+ + "You can set this value to 0 to disable this logic."). + Optional(). + Advanced(), + service.NewStringField("table"). + Description("Destination table"). + Example("trades"), + service.NewStringField("designated_timestamp_field"). + Description("Name of the designated timestamp field"). + Optional(), + service.NewStringEnumField("designated_timestamp_unit", "nanos", "micros", "millis", "seconds", "auto"). + Description("Designated timestamp field units"). + Default("auto"). + Optional(), + service.NewStringListField("timestamp_string_fields"). + Description("String fields with textual timestamps"). + Optional(), + service.NewStringField("timestamp_string_format"). + Description("Timestamp format, used when parsing timestamp string fields. Specified in golang's time.Parse layout"). + Default(time.StampMicro+"Z0700"). + Optional(), + service.NewStringListField("symbols"). + Description("Columns that should be the SYMBOL type (string values default to STRING)"). + Optional(), + service.NewStringListField("doubles"). + Description("Columns that should be double type, (int is default)"). + Optional(), + service.NewBoolField("error_on_empty_messages"). + Description("Mark a message as errored if it is empty after field validation"). + Optional(). + Default(false), + ) +} + +type questdbWriter struct { + log *service.Logger + + pool *qdb.LineSenderPool + transport *http.Transport + + address string + symbols map[string]bool + doubles map[string]bool + table string + designatedTimestampField string + designatedTimestampUnit timestampUnit + timestampStringFormat string + timestampStringFields map[string]bool + errorOnEmptyMessages bool +} + +func fromConf(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPol service.BatchPolicy, mif int, err error) { + + if batchPol, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + + if mif, err = conf.FieldMaxInFlight(); err != nil { + return + } + + // We force the use of HTTP connections (instead of TCP) and + // disable the QuestDB LineSender[s] auto flush to force the client + // to send data over the wire only once, when a MessageBatch has been + // completely processed. + opts := []qdb.LineSenderOption{ + qdb.WithHttp(), + qdb.WithAutoFlushDisabled(), + } + + // Now, we process options for and construct the LineSenderPool + // which is used to send data to QuestDB using Influx Line Protocol + + var addr string + if addr, err = conf.FieldString("address"); err != nil { + return + } + opts = append(opts, qdb.WithAddress(addr)) + + if conf.Contains("retry_timeout") { + var retryTimeout time.Duration + if retryTimeout, err = conf.FieldDuration("retry_timeout"); err != nil { + return + } + opts = append(opts, qdb.WithRetryTimeout(retryTimeout)) + } + + if conf.Contains("request_timeout") { + var requestTimeout time.Duration + if requestTimeout, err = conf.FieldDuration("request_timeout"); err != nil { + return + } + opts = append(opts, qdb.WithRequestTimeout(requestTimeout)) + } + + if conf.Contains("request_min_throughput") { + var requestMinThroughput int + if requestMinThroughput, err = conf.FieldInt("request_min_throughput"); err != nil { + return + } + opts = append(opts, qdb.WithMinThroughput(requestMinThroughput)) + } + + if conf.Contains("token") { + var token string + if token, err = conf.FieldString("token"); err != nil { + return + } + opts = append(opts, qdb.WithBearerToken(token)) + } + + if conf.Contains("username") && conf.Contains("password") { + var username, password string + if username, err = conf.FieldString("username"); err != nil { + return + } + if password, err = conf.FieldString("password"); err != nil { + return + } + opts = append(opts, qdb.WithBasicAuth(username, password)) + + } + + // Use a common http transport with user-defined TLS config + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxConnsPerHost: 0, + MaxIdleConns: 64, + MaxIdleConnsPerHost: 64, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + } + + tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") + if err != nil { + return + } + + if tlsEnabled { + opts = append(opts, qdb.WithTls()) + transport.TLSClientConfig = tlsConf + } + + opts = append(opts, qdb.WithHttpTransport(transport)) + + // Allocate the QuestDBWriter which wraps the LineSenderPool + w := &questdbWriter{ + address: addr, + log: mgr.Logger(), + symbols: map[string]bool{}, + doubles: map[string]bool{}, + timestampStringFields: map[string]bool{}, + transport: transport, + } + out = w + w.pool, err = qdb.PoolFromOptions(opts...) + if err != nil { + return + } + + // Apply pool-level options + // todo: is this the correct interpretation of max-in-flight? + qdb.WithMaxSenders(mif)(w.pool) + + // Configure the questdbWriter with additional options + + if w.table, err = conf.FieldString("table"); err != nil { + return + } + + // Symbols, doubles, and timestampStringFields are stored in maps + // for fast lookup. + var symbols []string + if conf.Contains("symbols") { + if symbols, err = conf.FieldStringList("symbols"); err != nil { + return + } + for _, s := range symbols { + w.symbols[s] = true + } + } + + var doubles []string + if conf.Contains("doubles") { + if doubles, err = conf.FieldStringList("doubles"); err != nil { + return + } + for _, d := range doubles { + w.doubles[d] = true + } + } + + var timestampStringFields []string + if conf.Contains("timestamp_string_fields") { + if timestampStringFields, err = conf.FieldStringList("timestamp_string_fields"); err != nil { + return + } + for _, f := range timestampStringFields { + w.timestampStringFields[f] = true + } + } + + if conf.Contains("designated_timestamp_field") { + if w.designatedTimestampField, err = conf.FieldString("designated_timestamp_field"); err != nil { + return + } + } + + var designatedTimestampUnit string + if conf.Contains("designated_timestamp_unit") { + if designatedTimestampUnit, err = conf.FieldString("designated_timestamp_unit"); err != nil { + return + } + + // perform validation on timestamp units here in case the user doesn't lint the config + w.designatedTimestampUnit = timestampUnit(designatedTimestampUnit) + if !w.designatedTimestampUnit.IsValid() { + err = fmt.Errorf("%v is not a valid timestamp unit", designatedTimestampUnit) + return + } + } + + if conf.Contains("timestamp_string_format") { + if w.timestampStringFormat, err = conf.FieldString("timestamp_string_format"); err != nil { + return + } + } + + if w.errorOnEmptyMessages, err = conf.FieldBool("error_on_empty_messages"); err != nil { + return + } + + return +} + +func (q *questdbWriter) Connect(ctx context.Context) error { + // No connections are required to initialize a LineSenderPool, + // so nothing to do here. Each LineSender has its own http client + // that will use the network only when flushing messages to the server. + return nil +} + +func (q *questdbWriter) parseTimestamp(v any) (time.Time, error) { + switch val := v.(type) { + case string: + t, err := time.Parse(q.timestampStringFormat, val) + if err != nil { + q.log.Errorf("could not parse timestamp field %v", err) + } + return t, err + case json.Number: + intVal, err := val.Int64() + if err != nil { + q.log.Errorf("numerical timestamps must be int64: %v", err) + } + return q.designatedTimestampUnit.From(intVal), err + default: + err := fmt.Errorf("unsupported type %T for designated timestamp: %v", v, v) + q.log.Error(err.Error()) + return time.Time{}, err + } +} + +func (q *questdbWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) (err error) { + sender, err := q.pool.Sender(ctx) + if err != nil { + return err + } + + defer func() { + // Closing the sender also flushes it + releaseErr := sender.Close(ctx) + if releaseErr != nil { + if err != nil { + err = fmt.Errorf("%v %w", err, releaseErr) + } else { + err = releaseErr + } + } + }() + + return batch.WalkWithBatchedErrors(func(i int, m *service.Message) (err error) { + // QuestDB's LineSender constructs ILP messages using a buffer, so message + // components must be written in the correct order, otherwise the sender will + // return an error. This order is: + // 1. Table Name + // 2. Symbols (key/value pairs) + // 3. Columns (key/value pairs) + // 4. Timestamp [optional] + // + // Before writing any column, we call Table(), which is guaranteed to run once. + // hasTable flag is used for that. + var hasTable bool + + q.log.Tracef("Writing message %v", i) + + jVal, err := m.AsStructured() + if err != nil { + err = fmt.Errorf("unable to parse JSON: %v", err) + m.SetError(err) + return err + } + jObj, ok := jVal.(map[string]any) + if !ok { + err = fmt.Errorf("expected JSON object, found '%T'", jVal) + m.SetError(err) + return err + } + + // Stage 1: Handle all symbols, which must be written to the buffer first + for s := range q.symbols { + v, found := jObj[s] + if found { + if !hasTable { + sender.Table(q.table) + hasTable = true + } + switch val := v.(type) { + case string: + sender.Symbol(s, val) + default: + sender.Symbol(s, fmt.Sprintf("%v", val)) + } + } + } + + // Stage 2: Handle columns + for k, v := range jObj { + // Skip designated timestamp field (will process this in the 3rd stage) + if q.designatedTimestampField == k { + continue + } + + // Skip symbols (already processed in 1st stage) + if _, isSymbol := q.symbols[k]; isSymbol { + continue + } + + // For all non-timestamp fields, process values by JSON types since we are working + // with structured messages + switch val := v.(type) { + case string: + // Check if the field is a timestamp and process accordingly + if _, isTimestampField := q.timestampStringFields[k]; isTimestampField { + timestamp, err := q.parseTimestamp(v) + if err == nil { + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.TimestampColumn(k, timestamp) + } else { + q.log.Errorf("%v", err) + } + continue + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.StringColumn(k, val) + case bool: + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.BoolColumn(k, val) + case json.Number: + // For json numbers, assume int unless column is explicitly marked as a double + if _, isDouble := q.doubles[k]; isDouble { + floatVal, err := val.Float64() + if err != nil { + q.log.Errorf("could not parse %v into a double: %v", val, err) + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Float64Column(k, floatVal) + } else { + intVal, err := val.Int64() + if err != nil { + q.log.Errorf("could not parse %v into an integer: %v", val, err) + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Int64Column(k, intVal) + } + case float64: + // float64 is only needed if BENTHOS_USE_NUMBER=false + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Float64Column(k, float64(val)) + default: + q.log.Errorf("unsupported type %T for field %v", v, k) + } + } + + // Stage 3: Handle designated timestamp and finalize the buffered message + var designatedTimestamp time.Time + if q.designatedTimestampField != "" { + val, found := jObj[q.designatedTimestampField] + if found { + designatedTimestamp, err = q.parseTimestamp(val) + if err != nil { + q.log.Errorf("unable to parse designated timestamp: %v", val) + } + } + } + + if !hasTable { + if q.errorOnEmptyMessages { + err = errors.New("empty message, skipping send to QuestDB") + m.SetError(err) + return err + } + q.log.Warn("empty message, skipping send to QuestDB") + return nil + } + + if !designatedTimestamp.IsZero() { + err = sender.At(ctx, designatedTimestamp) + } else { + err = sender.AtNow(ctx) + } + + if err != nil { + m.SetError(err) + } + return err + }) + +} + +func (q *questdbWriter) Close(ctx context.Context) error { + return q.pool.Close(ctx) +} + +func init() { + if err := service.RegisterBatchOutput( + "questdb", + questdbOutputConfig(), + fromConf, + ); err != nil { + panic(err) + } +} diff --git a/internal/impl/questdb/output_test.go b/internal/impl/questdb/output_test.go new file mode 100644 index 000000000..38e49f9ab --- /dev/null +++ b/internal/impl/questdb/output_test.go @@ -0,0 +1,269 @@ +package questdb + +import ( + "bufio" + "context" + "fmt" + "math" + "net" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/public/service" +) + +func TestTimestampConversions(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + value int64 + unit timestampUnit + expectedTime time.Time + }{ + { + name: "autoSecondsMin", + value: 0, + unit: auto, + expectedTime: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + name: "autoSecondsMax", + value: 9999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 0, time.UTC), + }, + { + name: "autoMillisMin", + value: 10000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoMillisMax", + value: 9999999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 999000000, time.UTC), + }, + { + name: "autoMicrosMin", + value: 10000000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoMicrosMax", + value: 9999999999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 999999000, time.UTC), + }, + { + name: "autoNanosMin", + value: 10000000000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoNanosMax", + value: math.MaxInt64, + unit: auto, + expectedTime: time.Date(2262, 4, 11, 23, 47, 16, 854775807, time.UTC), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedTime, tc.unit.From(tc.value)) + }) + } +} + +func TestFromConf(t *testing.T) { + t.Parallel() + + configSpec := questdbOutputConfig() + conf := ` +table: test +address: "localhost:9000" +designated_timestamp_field: myDesignatedTimestamp +designated_timestamp_unit: nanos +timestamp_string_fields: + - fieldA + - fieldB +timestamp_string_format: 2006-01-02T15:04:05Z07:00 # rfc3339 +symbols: + - mySymbolA + - mySymbolB +` + parsed, err := configSpec.ParseYAML(conf, nil) + require.NoError(t, err) + + out, _, _, err := fromConf(parsed, service.MockResources()) + require.NoError(t, err) + + w, ok := out.(*questdbWriter) + require.True(t, ok) + + assert.Equal(t, "test", w.table) + assert.Equal(t, "myDesignatedTimestamp", w.designatedTimestampField) + assert.Equal(t, nanos, w.designatedTimestampUnit) + assert.Equal(t, map[string]bool{"fieldA": true, "fieldB": true}, w.timestampStringFields) + assert.Equal(t, time.RFC3339, w.timestampStringFormat) + assert.Equal(t, map[string]bool{"mySymbolA": true, "mySymbolB": true}, w.symbols) + +} + +func TestValidationErrorsFromConf(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + conf string + expectedErrContains string + }{ + { + name: "no address", + conf: "table: test", + expectedErrContains: "field 'address' is required", + }, + { + name: "no table", + conf: `address: "localhost:9000"`, + expectedErrContains: "field 'table' is required", + }, + { + name: "invalid timestamp unit", + conf: ` +address: "localhost:9000" +table: test +designated_timestamp_unit: hello`, + expectedErrContains: "is not a valid timestamp unit", + }, + } + + for _, tc := range testCases { + configSpec := questdbOutputConfig() + + t.Run(tc.name, func(t *testing.T) { + cfg, err := configSpec.ParseYAML(tc.conf, nil) + if err != nil { + assert.ErrorContains(t, err, tc.expectedErrContains) + return + } + + _, _, _, err = fromConf(cfg, service.MockResources()) + assert.ErrorContains(t, err, tc.expectedErrContains) + + }) + } +} + +func TestOptionsOnWrite(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + sentMsgs := make(chan string, 4) // Arbitrary buffer size, > max number of test messages + t.Cleanup(func() { close(sentMsgs) }) + + // Set up mock QuestDB http server + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + s := http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + for scanner.Scan() { + sentMsgs <- scanner.Text() + } + assert.NoError(t, scanner.Err()) + w.WriteHeader(200) + }), + } + t.Cleanup(func() { + _ = s.Shutdown(ctx) + }) + go func() { + _ = s.Serve(listener) + }() + + testCases := []struct { + name string + extraConf string + payload []string + expectedLines []string + }{ + { + name: "withSymbols", + extraConf: "symbols: ['hello']", + payload: []string{`{"hello": "world", "test": 1}`}, + expectedLines: []string{"withSymbols,hello=world test=1i"}, + }, + { + name: "withDesignatedTimestamp", + extraConf: "designated_timestamp_field: timestamp", + payload: []string{`{"hello": "world", "timestamp": 1}`}, + expectedLines: []string{ + `withDesignatedTimestamp hello="world" 1000000000`, + }, + }, + { + name: "withTimestampUnit", + extraConf: "designated_timestamp_field: timestamp\ndesignated_timestamp_unit: nanos", + payload: []string{`{"hello": "world", "timestamp": 1}`}, + expectedLines: []string{ + `withTimestampUnit hello="world" 1`, + }, + }, + { + name: "withTimestampStringFields", + extraConf: "timestamp_string_fields: ['timestamp']\ntimestamp_string_format: 2006-02-01", + payload: []string{`{"timestamp": "1970-01-02"}`}, + expectedLines: []string{ + `withTimestampStringFields timestamp=2678400000000t`, + }, + }, + { + name: "withBoolValue", + extraConf: "timestamp_string_fields: ['timestamp']\ntimestamp_string_format: 2006-02-01", + payload: []string{`{"hello": true}`}, + expectedLines: []string{ + `withBoolValue hello=t`, + }, + }, + { + name: "withDoubles", + extraConf: "doubles: ['hello']", + payload: []string{`{"hello": 1.23}`}, + expectedLines: []string{ + `withDoubles hello=1.23`, + }, + }, + } + + for _, tc := range testCases { + conf := fmt.Sprintf("address: 'localhost:%d'\n", listener.Addr().(*net.TCPAddr).Port) + conf += fmt.Sprintf("table: '%s'\n", tc.name) + conf += tc.extraConf + + configSpec := questdbOutputConfig() + + cfg, err := configSpec.ParseYAML(conf, nil) + require.NoError(t, err) + w, _, _, err := fromConf(cfg, service.MockResources()) + require.NoError(t, err) + + qdbWriter := w.(*questdbWriter) + batch := service.MessageBatch{} + for _, msg := range tc.payload { + batch = append(batch, service.NewMessage([]byte(msg))) + } + assert.NoError(t, qdbWriter.WriteBatch(ctx, batch)) + for _, l := range tc.expectedLines { + assert.Equal(t, l, <-sentMsgs) + } + } +} diff --git a/internal/impl/questdb/timestamp.go b/internal/impl/questdb/timestamp.go new file mode 100644 index 000000000..fff643f15 --- /dev/null +++ b/internal/impl/questdb/timestamp.go @@ -0,0 +1,50 @@ +package questdb + +import "time" + +type timestampUnit string + +const ( + nanos timestampUnit = "nanos" + micros timestampUnit = "micros" + millis timestampUnit = "millis" + seconds timestampUnit = "seconds" + auto timestampUnit = "auto" +) + +func guessTimestampUnits(timestamp int64) timestampUnit { + if timestamp < 10000000000 { + return seconds + } else if timestamp < 10000000000000 { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros + return millis + } else if timestamp < 10000000000000000 { + return micros + } else { + return nanos + } +} + +func (t timestampUnit) IsValid() bool { + return t == nanos || + t == micros || + t == millis || + t == seconds || + t == auto +} + +func (t timestampUnit) From(value int64) time.Time { + switch t { + case nanos: + return time.Unix(0, value).UTC() + case micros: + return time.UnixMicro(value).UTC() + case millis: + return time.UnixMilli(value).UTC() + case seconds: + return time.Unix(value, 0).UTC() + case auto: + return guessTimestampUnits(value).From(value).UTC() + default: + panic("unsupported timestampUnit: " + t) + } +} diff --git a/public/components/all/package.go b/public/components/all/package.go index c46897a4c..9abaceeb1 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -43,6 +43,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/pure" _ "github.com/warpstreamlabs/bento/public/components/pure/extended" _ "github.com/warpstreamlabs/bento/public/components/pusher" + _ "github.com/warpstreamlabs/bento/public/components/questdb" _ "github.com/warpstreamlabs/bento/public/components/redis" _ "github.com/warpstreamlabs/bento/public/components/sentry" _ "github.com/warpstreamlabs/bento/public/components/sftp" diff --git a/public/components/questdb/package.go b/public/components/questdb/package.go new file mode 100644 index 000000000..7a62d8abe --- /dev/null +++ b/public/components/questdb/package.go @@ -0,0 +1,6 @@ +package questdb + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/questdb" +) diff --git a/website/docs/components/outputs/questdb.md b/website/docs/components/outputs/questdb.md new file mode 100644 index 000000000..6c2a853e6 --- /dev/null +++ b/website/docs/components/outputs/questdb.md @@ -0,0 +1,481 @@ +--- +title: questdb +slug: questdb +type: output +status: experimental +categories: ["Services"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Pushes messages to a QuestDB table. + + + + + + +```yml +# Common config fields, showing default values +output: + label: "" + questdb: + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + address: localhost:9000 # No default (required) + username: "" # No default (optional) + password: "" # No default (optional) + token: "" # No default (optional) + table: trades # No default (required) + designated_timestamp_field: "" # No default (optional) + designated_timestamp_unit: auto + timestamp_string_fields: [] # No default (optional) + timestamp_string_format: Jan _2 15:04:05.000000Z0700 + symbols: [] # No default (optional) + doubles: [] # No default (optional) + error_on_empty_messages: false +``` + + + + +```yml +# All config fields, showing default values +output: + label: "" + questdb: + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) + tls: + enabled: false + skip_cert_verify: false + enable_renegotiation: false + root_cas: "" + root_cas_file: "" + client_certs: [] + address: localhost:9000 # No default (required) + username: "" # No default (optional) + password: "" # No default (optional) + token: "" # No default (optional) + retry_timeout: "" # No default (optional) + request_timeout: "" # No default (optional) + request_min_throughput: 0 # No default (optional) + table: trades # No default (required) + designated_timestamp_field: "" # No default (optional) + designated_timestamp_unit: auto + timestamp_string_fields: [] # No default (optional) + timestamp_string_format: Jan _2 15:04:05.000000Z0700 + symbols: [] # No default (optional) + doubles: [] # No default (optional) + error_on_empty_messages: false +``` + + + + +:::warning Important +We recommend that the dedupe feature is enabled on the QuestDB server. Please visit https://questdb.io/docs/ for more information about deploying, configuring, and using QuestDB. +::: + +## Performance + +This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. + +This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more [in this doc](/docs/configuration/batching). + +## Fields + +### `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + + +Type: `int` +Default: `64` + +### `batching` + +Allows you to configure a [batching policy](/docs/configuration/batching). + + +Type: `object` + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +### `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +Type: `int` +Default: `0` + +### `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +Type: `int` +Default: `0` + +### `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +Type: `string` +Default: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +### `batching.check` + +A [Bloblang query](/docs/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch. + + +Type: `string` +Default: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +### `batching.processors` + +A list of [processors](/docs/components/processors/about) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +Type: `array` + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + +### `tls` + +Custom TLS settings can be used to override system defaults. + + +Type: `object` + +### `tls.enabled` + +Whether custom TLS settings are enabled. + + +Type: `bool` +Default: `false` + +### `tls.skip_cert_verify` + +Whether to skip server side certificate verification. + + +Type: `bool` +Default: `false` + +### `tls.enable_renegotiation` + +Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message `local error: tls: no renegotiation`. + + +Type: `bool` +Default: `false` +Requires version 1.0.0 or newer + +### `tls.root_cas` + +An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +```yml +# Examples + +root_cas: |- + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- +``` + +### `tls.root_cas_file` + +An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. + + +Type: `string` +Default: `""` + +```yml +# Examples + +root_cas_file: ./root_cas.pem +``` + +### `tls.client_certs` + +A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both. + + +Type: `array` +Default: `[]` + +```yml +# Examples + +client_certs: + - cert: foo + key: bar + +client_certs: + - cert_file: ./example.pem + key_file: ./example.key +``` + +### `tls.client_certs[].cert` + +A plain text certificate to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].key` + +A plain text certificate key to use. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +### `tls.client_certs[].cert_file` + +The path of a certificate to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].key_file` + +The path of a certificate key to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].password` + +A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +```yml +# Examples + +password: foo + +password: ${KEY_PASSWORD} +``` + +### `address` + +Address of the QuestDB server's HTTP port (excluding protocol) + + +Type: `string` + +```yml +# Examples + +address: localhost:9000 +``` + +### `username` + +Username for HTTP basic auth + + +Type: `string` + +### `password` + +Password for HTTP basic auth +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` + +### `token` + +Bearer token for HTTP auth (takes precedence over basic auth username & password) +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` + +### `retry_timeout` + +The time to continue retrying after a failed HTTP request. The interval between retries is an exponential backoff starting at 10ms and doubling after each failed attempt up to a maximum of 1 second. + + +Type: `string` + +### `request_timeout` + +The time to wait for a response from the server. This is in addition to the calculation derived from the request_min_throughput parameter. + + +Type: `string` + +### `request_min_throughput` + +Minimum expected throughput in bytes per second for HTTP requests. If the throughput is lower than this value, the connection will time out. This is used to calculate an additional timeout on top of request_timeout. This is useful for large requests. You can set this value to 0 to disable this logic. + + +Type: `int` + +### `table` + +Destination table + + +Type: `string` + +```yml +# Examples + +table: trades +``` + +### `designated_timestamp_field` + +Name of the designated timestamp field + + +Type: `string` + +### `designated_timestamp_unit` + +Designated timestamp field units + + +Type: `string` +Default: `"auto"` +Options: `nanos`, `micros`, `millis`, `seconds`, `auto`. + +### `timestamp_string_fields` + +String fields with textual timestamps + + +Type: `array` + +### `timestamp_string_format` + +Timestamp format, used when parsing timestamp string fields. Specified in golang's time.Parse layout + + +Type: `string` +Default: `"Jan _2 15:04:05.000000Z0700"` + +### `symbols` + +Columns that should be the SYMBOL type (string values default to STRING) + + +Type: `array` + +### `doubles` + +Columns that should be double type, (int is default) + + +Type: `array` + +### `error_on_empty_messages` + +Mark a message as errored if it is empty after field validation + + +Type: `bool` +Default: `false` + +