Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tt-6671] AWS Kinesis pump #841

Merged
merged 13 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The table below provides details on the fields within each `tyk_analytics` recor
- [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes)
- [Timestream](#timestream-config)
- [AWS SQS](#SQS-config)
- [AWS Kinesis](#Kinesis-config)

# Configuration:

Expand Down Expand Up @@ -1343,6 +1344,45 @@ TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0

```

## Kinesis Config

#### Authentication & Prerequisite

We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials).

#### Config Fields

`stream_name` - The name of your Kinesis stream in the specified AWS region

`region` - The AWS region your Kinesis stream is located - i.e. eu-west-2

`batch_size` - Optional. The maximum size of the records in a batch is 5MiB. If your records are larger in size setting this batch size paramter can guarantee you don't have failed delivery due to too large a batch. Default size if unset is 100.

###### JSON / Conf File

```json
"kinesis":{
"type": "kinesis",
"meta": {
"stream_name": "my-stream",
"region": "eu-west-2",
"batch_size": 100
}
},
```

###### Env Variables

```
#Kinesis Pump Configuration
TYK_PMP_PUMPS_KINESIS_TYPE=kinesis
TYK_PMP_PUMPS_KINESIs_META_STREAMNAME=my-stream
TYK_PMP_PUMPS_KINESIS_META_REGION=eu-west-2
TYK_PMP_PUMPS_KINESIS_META_BATCHSIZE=100
```



# Base Pump Configurations

The following configurations can be added to any Pump. Keep reading for an example.
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ require (
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.2.0
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2 v1.30.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.6.1
github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
Expand Down Expand Up @@ -57,15 +58,16 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 // indirect
github.com/aws/smithy-go v1.16.0 // indirect
github.com/aws/smithy-go v1.20.3 // indirect
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
Expand All @@ -89,6 +91,7 @@ require (
github.com/jehiah/go-strftime v0.0.0-20151206194810-2efbe75097a5 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joho/godotenv v1.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
Expand Down
24 changes: 14 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm
github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE=
github.com/aws/aws-sdk-go-v2 v1.11.0/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U=
github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c=
github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o=
github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 h1:tW1/Rkad38LA15X4UQtjXZXNKsCgkshC3EbmcUmghTg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3/go.mod h1:UbnqO+zjqk3uIt9yCACHJ9IVNhyhOCnYk8yA19SAWrM=
github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s=
github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0=
Expand All @@ -42,19 +44,21 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0 h1:OpZjuUy8Jt3CA1WgJgBC5Bz+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.0/go.mod h1:5E1J3/TTYy6z909QNR0QnXGBpfESYGDqd3O0zqONghU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0/go.mod h1:NO3Q5ZTTQtO2xIg2+xTXYDiT7knSejfeDm7WGDaOo0U=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 h1:5SAoZ4jYpGH4721ZNoS1znQrhOfZinOhc4XuTXx/nVc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13/go.mod h1:+rdA6ZLpaSeM7tSg/B0IEDinCIBJGmW8rKDFkYpP04g=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0/go.mod h1:anlUzBoEWglcUxUQwZA7HQOEVEnQALVZsizAapB2hq8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 h1:WIijqeaAO7TYFLbhsZmi2rgLEAtWOC1LhxCAVTJlSKw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13/go.mod h1:i+kbfa76PQbWw/ULoWnp51EYVWH4ENln76fLQE3lXT8=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 h1:qGZWS/WgiFY+Zgad2u0gwBHpJxz6Ne401JE7iQI1nKs=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0/go.mod h1:Mq6AEc+oEjCUlBuLiK5YwW4shSOAKCQ3tXN0sQeYoBA=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1 h1:UIEtjoWh7oqjHXdgdjOP/tinga1uKR9F//tiUNshE7w=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.29.1/go.mod h1:tqz5Yq7ohiQIQ7qrj6e2fWJbT1Owq9zEo78mZb/+eWU=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs=
Expand All @@ -67,8 +71,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc=
github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik=
github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -186,7 +190,6 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU
github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o=
github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY=
github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI=
github.com/jackc/pgconn v1.10.0 h1:4EYhlDVEMsJ30nNj0mmgwIUXoq7e9sMJrVC2ED6QlCU=
github.com/jackc/pgconn v1.10.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI=
github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w=
github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM=
Expand All @@ -207,7 +210,6 @@ github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwX
github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag=
github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
Expand All @@ -233,7 +235,9 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr
github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
Expand Down
1 change: 1 addition & 0 deletions pumps/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ func init() {
AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{}
AvailablePumps["resurfaceio"] = &ResurfacePump{}
AvailablePumps["sqs"] = &SQSPump{}
AvailablePumps["kinesis"] = &KinesisPump{}
}
189 changes: 189 additions & 0 deletions pumps/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package pumps

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/big"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
)

// KinesisPump is a Tyk Pump that sends analytics records to AWS Kinesis.
type KinesisPump struct {
client *kinesis.Client
kinesisConf *KinesisConf
log *logrus.Entry
CommonPumpConfig
}

// @PumpConf Kinesis
type KinesisConf struct {
// The prefix for the environment variables that will be used to override the configuration.
// Defaults to `TYK_PMP_PUMPS_KINESIS_META`
EnvPrefix string `mapstructure:"meta_env_prefix"`
// A name to identify the stream. The stream name is scoped to the AWS account used by the application
// that creates the stream. It is also scoped by AWS Region.
// That is, two streams in two different AWS accounts can have the same name.
// Two streams in the same AWS account but in two different Regions can also have the same name.
StreamName string `mapstructure:"stream_name"`
// AWS Region the Kinesis stream targets
Region string `mapstructure:"region"`
// Each PutRecords (the function used in this pump)request can support up to 500 records.
// Each record in the request can be as large as 1 MiB, up to a limit of 5 MiB for the entire request, including partition keys.
// Each shard can support writes up to 1,000 records per second, up to a maximum data write total of 1 MiB per second.
BatchSize int `mapstructure:"batch_size"`
}

var (
kinesisPrefix = "kinesis-pump"
kinesisDefaultENV = PUMPS_ENV_PREFIX + "_KINESIS" + PUMPS_ENV_META_PREFIX
)

func (p *KinesisPump) New() Pump {
newPump := KinesisPump{}
return &newPump
}

// Init initializes the pump with configuration settings.
func (p *KinesisPump) Init(config interface{}) error {
p.log = log.WithField("prefix", kinesisPrefix)

// Read configuration file
p.kinesisConf = &KinesisConf{}
err := mapstructure.Decode(config, &p.kinesisConf)
if err != nil {
p.log.Fatal("Failed to decode configuration: ", err)
}

processPumpEnvVars(p, p.log, p.kinesisConf, kinesisDefaultENV)

// Load AWS configuration
// Credentials are loaded as specified in
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials
cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(p.kinesisConf.Region))
if err != nil {
p.log.Fatalf("unable to load Kinesis SDK config, %v", err)
}

defaultBatchSize := 100
if p.kinesisConf.BatchSize == 0 {
p.kinesisConf.BatchSize = defaultBatchSize
}

if p.kinesisConf.StreamName == "" {
p.log.Error("Stream name unset - may be unable to produce records")
}

// Create Kinesis client
p.client = kinesis.NewFromConfig(cfg)
p.log.Info(p.GetName() + " Initialized")

return nil
}

// WriteData writes the analytics records to AWS Kinesis in batches.
func (p *KinesisPump) WriteData(ctx context.Context, records []interface{}) error {
batches := splitIntoBatches(records, p.kinesisConf.BatchSize)
for _, batch := range batches {
var entries []types.PutRecordsRequestEntry
for _, record := range batch {
// Build message format
decoded, ok := record.(analytics.AnalyticsRecord)
if !ok {
p.log.WithField("record", record).Error("unable to decode record")
continue
}
//nolint:dupl
analyticsRecord := Json{
"timestamp": decoded.TimeStamp,
"method": decoded.Method,
"path": decoded.Path,
"raw_path": decoded.RawPath,
"response_code": decoded.ResponseCode,
"alias": decoded.Alias,
"api_key": decoded.APIKey,
"api_version": decoded.APIVersion,
"api_name": decoded.APIName,
"api_id": decoded.APIID,
"org_id": decoded.OrgID,
"oauth_id": decoded.OauthID,
"raw_request": decoded.RawRequest,
"request_time_ms": decoded.RequestTime,
"raw_response": decoded.RawResponse,
"ip_address": decoded.IPAddress,
"host": decoded.Host,
"content_length": decoded.ContentLength,
"user_agent": decoded.UserAgent,
"tags": decoded.Tags,
}

// Transform object to json string
json, jsonError := json.Marshal(analyticsRecord)
if jsonError != nil {
p.log.WithError(jsonError).Error("unable to marshal message")
}

n, err := rand.Int(rand.Reader, big.NewInt(1000000000))
if err != nil {
p.log.Error("failed to generate int for Partition key: ", err)
}

// Partition key uses a string representation of Int
// Should even distribute across shards as AWS uses md5 of each message partition key
entry := types.PutRecordsRequestEntry{
Data: json,
PartitionKey: aws.String(fmt.Sprint(n)),
}
entries = append(entries, entry)
}

input := &kinesis.PutRecordsInput{
StreamName: aws.String(p.kinesisConf.StreamName),
Records: entries,
}

output, err := p.client.PutRecords(ctx, input)
if err != nil {
p.log.Error("failed to put records to Kinesis: ", err)
}

// Check for failed records
if output != nil {
for _, record := range output.Records {
if record.ErrorCode != nil {
p.log.Debugf("Failed to put record: %s - %s", aws.ToString(record.ErrorCode), aws.ToString(record.ErrorMessage))
}
p.log.Debug(record)
}
p.log.Info("Purged ", len(output.Records), " records...")
}
}
return nil
}

// splitIntoBatches splits the records into batches of the specified size.
func splitIntoBatches(records []interface{}, batchSize int) [][]interface{} {
var batches [][]interface{}
for batchSize < len(records) {
records, batches = records[batchSize:], append(batches, records[0:batchSize:batchSize])
}
return append(batches, records)
}

// GetName returns the name of the pump.
func (p *KinesisPump) GetName() string {
return "Kinesis Pump"
}

func (p *KinesisPump) GetEnvPrefix() string {
return p.kinesisConf.EnvPrefix
}
Loading