Skip to content

Commit

Permalink
Removing unwanted commits
Browse files Browse the repository at this point in the history
  • Loading branch information
blackfly19 committed Jul 30, 2021
1 parent d7e49b6 commit 870ee15
Show file tree
Hide file tree
Showing 32 changed files with 352 additions and 185 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Lint

on:
push:
branches:
- master
pull_request:
branches:
- master
workflow_dispatch:

jobs:
lint:
runs-on: ubuntu-latest

steps:
- name: Set up Go 1.15
uses: actions/setup-go@v2
with:
go-version: 1.15

- name: Check out code
uses: actions/checkout@v2

- name: Verify dependencies
run: |
LINT_VERSION=1.41.1
curl -fsSL https://github.com/golangci/golangci-lint/releases/download/v${LINT_VERSION}/golangci-lint-${LINT_VERSION}-linux-amd64.tar.gz | \
tar xz --strip-components 1 --wildcards \*/golangci-lint
sudo mv golangci-lint /usr/local/bin/
- name: Run checks
run: |
make lint
36 changes: 36 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
linters:
enable:
- deadcode
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
# Additional linters
- gofmt
- goimports
- misspell
- nakedret
- unconvert
- bodyclose
- dogsled
- dupl
# - gosec
- ifshort
- nilerr
- prealloc
# - revive
- unparam
# - wrapcheck
# - gocritic
linters-settings:
errcheck:
ignore: go.uber.org/zap:Sync
goimports:
# put imports beginning with prefix after 3rd-party packages;
# it's a comma-separated list of prefixes
local-prefixes: github.com/fission/fission,github.com/fission/keda-connectors
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

lint:
./hack/lint.sh

4 changes: 2 additions & 2 deletions aws-kinesis-http-connector/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13-alpine as builder
FROM golang:1.15-alpine as builder

RUN apk add bash ca-certificates git gcc g++ libc-dev

Expand All @@ -14,7 +14,7 @@ RUN go mod download
COPY . .

RUN go build -a -o /go/bin/main
FROM alpine:3.12 as base
FROM alpine:3.13 as base
RUN apk add --update ca-certificates
COPY --from=builder /go/bin/main /

Expand Down
24 changes: 13 additions & 11 deletions aws-kinesis-http-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@ The job of the connector is to read messages from the stream, call an HTTP endpo
- `MAX_RETRIES`: Maximum number of times an http endpoint will be retried upon failure.
- `CONTENT_TYPE`: Content type used while creating post request.

#### Ways to connect to AWS
- `AWS_REGION`: Region is mandatory for any aws connection.

1) Through AWS endpoint
- `AWS_ENDPOINT` : Kinesis endpoint on which it is running, for local it can be http://localhost:4568.
## Ways to connect to AWS

2) Through AWS aws key and secret
- `AWS_ACCESS_KEY_ID`: aws access key of your account.
- `AWS_SECRET_ACCESS_KEY`: aws secret key got from your account.
`AWS_REGION`: Region is mandatory for any aws connection.

3) Through AWS credentials
- `AWS_CRED_PATH`: Path where aws credentials are present, ex ~/.aws/credentials.
- `AWS_CRED_PROFILE`: Profile With which to connect to AWS, present in ~/.aws/credentials file.
1. Through AWS endpoint
- `AWS_ENDPOINT` : Kinesis endpoint on which it is running, for local it can be [http://localhost:4568](http://localhost:4568)

2. Through AWS aws key and secret

- `AWS_ACCESS_KEY_ID`: aws access key of your account.
- `AWS_SECRET_ACCESS_KEY`: aws secret key got from your account.

3. Through AWS credentials

- `AWS_CRED_PATH`: Path where aws credentials are present, ex ~/.aws/credentials.
- `AWS_CRED_PROFILE`: Profile With which to connect to AWS, present in ~/.aws/credentials file.

More information about the above parameters and how to define it scaledobject refer [AWS SQS scaler doc](https://keda.sh/docs/1.5/scalers/aws-sqs/).
2 changes: 1 addition & 1 deletion aws-kinesis-http-connector/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/fission/keda-connectors/aws-kinesis-http-connector

go 1.13
go 1.15

require (
github.com/aws/aws-sdk-go v1.34.25
Expand Down
3 changes: 0 additions & 3 deletions aws-kinesis-http-connector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/aws/aws-sdk-go v1.34.25/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fission/keda-connectors/common v0.0.0-20200915102844-c68eb4e4d582 h1:dBEd5TkrxnImIE+rXJG+vymYX2ZqtbRfOAu+ak929cA=
github.com/fission/keda-connectors/common v0.0.0-20200915102844-c68eb4e4d582/go.mod h1:WGr9mV3DArFdiSCg6PKXaeQ5XPNzZY4aWAj/oL63+qo=
github.com/fission/keda-connectors/common v0.0.0-20210204070438-3b74480a5e70 h1:PI6+71zxuvIrypARTHpYEJqOcNzJAL5S1Yi9yvlevP0=
github.com/fission/keda-connectors/common v0.0.0-20210204070438-3b74480a5e70/go.mod h1:eJ7ViC/moBvbXCIxlbT4JLfMi8dg00SnjyNyYvD0vg8=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -36,7 +34,6 @@ go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
74 changes: 39 additions & 35 deletions aws-kinesis-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"

"github.com/fission/keda-connectors/common"

"go.uber.org/zap"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (conn *awsKinesisConnector) listShards() ([]*kinesis.Shard, error) {

//findNewShards sends shards, it only sends newly added shards
func (conn *awsKinesisConnector) findNewShards() {
shards := make(map[string]*kinesis.Shard)
var shards sync.Map
var ticker = time.NewTicker(30 * time.Second)
for {
select {
Expand All @@ -63,15 +64,13 @@ func (conn *awsKinesisConnector) findNewShards() {

for _, s := range shardList {
//send only new shards
if _, ok := shards[*s.ShardId]; ok {
continue
_, loaded := shards.LoadOrStore(*s.ShardId, s)
if !loaded {
conn.shardc <- s
}
shards[*s.ShardId] = s
conn.shardc <- s
}
}
}

}

//getIterator get's the iterator either from start or from where we left
Expand All @@ -85,15 +84,15 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
//Start from, where we left
params.StartingSequenceNumber = aws.String(checkpoint)
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(aws.Context(conn.ctx), params)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
if err != nil {
return nil, err
}
return iteratorOutput, err
}
//Start from, oldest record in the shard
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(aws.Context(conn.ctx), params)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -122,12 +121,12 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
//scan each shards for any new records, when found call the passed func
func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
//checkpoints to identify how much read has happened
checkpoints := make(map[string]string)
var checkpoints sync.Map
var wg sync.WaitGroup
//get called when any new shards are added
for s := range conn.shardc {
//Start fresh
checkpoints[*s.ShardId] = ""
checkpoints.Store(*s.ShardId, "")
wg.Add(1)
go func(shardID string) {
defer wg.Done()
Expand All @@ -136,10 +135,12 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
defer scanTicker.Stop()
for {
//do noting if shard got deleted
if _, found := checkpoints[shardID]; !found {
checkpoint, found := checkpoints.Load(shardID)
if !found {
conn.logger.Info("shard not found", zap.String("shardID", shardID))
return
}
iteratorOutput, err := conn.getIterator(shardID, checkpoints[shardID])
iteratorOutput, err := conn.getIterator(shardID, checkpoint.(string))
if err != nil {
conn.logger.Error("error in iterator",
zap.String("shardID", shardID),
Expand All @@ -159,7 +160,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
for _, r := range resp.Records {
//send records
err := fn(&record{r, shardID, resp.MillisBehindLatest})
checkpoints[shardID] = *r.SequenceNumber
checkpoints.Store(shardID, *r.SequenceNumber)
if err != nil {
conn.logger.Error("error in processing records",
zap.String("shardID", shardID),
Expand All @@ -168,8 +169,8 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
}
if isShardClosed(resp.NextShardIterator, iterator) {
//when shards got deleted, remove it from checkpoints
if _, found := checkpoints[shardID]; found {
delete(checkpoints, shardID)
if _, found := checkpoints.Load(shardID); found {
checkpoints.Delete(shardID)
return
}
}
Expand Down Expand Up @@ -198,43 +199,47 @@ func (conn *awsKinesisConnector) consumeMessage(r *record) {

resp, err := common.HandleHTTPRequest(string(r.Data), headers, conn.connectordata, conn.logger)
if err != nil {
conn.logger.Error("error processing message",
zap.String("shardID", r.shardID),
zap.Error(err))
conn.errorHandler(r, err.Error())
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
conn.logger.Error("error processing message",
zap.String("shardID", r.shardID),
zap.Error(err))
conn.errorHandler(r, err.Error())
} else {
if success := conn.responseHandler(r, string(body)); success {
conn.logger.Info("done processing message",
zap.String("shardID", r.shardID),
zap.String("message", string(body)))
if err := conn.responseHandler(r, string(body)); err != nil {
conn.logger.Error("failed to publish response body from http request to topic",
zap.Error(err),
zap.String("topic", conn.connectordata.ResponseTopic),
zap.String("source", conn.connectordata.SourceName),
zap.String("http endpoint", conn.connectordata.HTTPEndpoint))
}
conn.logger.Info("done processing message",
zap.String("shardID", r.shardID),
zap.String("message", string(body)))
}
}
}

func (conn *awsKinesisConnector) responseHandler(r *record, response string) bool {
func (conn *awsKinesisConnector) responseHandler(r *record, response string) error {
if len(conn.connectordata.ResponseTopic) > 0 {
params := &kinesis.PutRecordInput{
Data: []byte(response), // Required
PartitionKey: aws.String(*r.PartitionKey), // Required
StreamName: aws.String(conn.connectordata.ResponseTopic), // Required
SequenceNumberForOrdering: aws.String(*r.SequenceNumber),
}

_, err := conn.client.PutRecord(params)
if err != nil {
conn.logger.Error("failed to publish response body from http request to topic",
zap.Error(err),
zap.String("topic", conn.connectordata.ResponseTopic),
zap.String("source", conn.connectordata.SourceName),
zap.String("http endpoint", conn.connectordata.HTTPEndpoint),
)
return false
return err
}
}
return true
return nil
}

func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) {
Expand Down Expand Up @@ -280,19 +285,18 @@ func main() {
}

s, err := session.NewSession(config)
kc := kinesis.New(s)

if err != nil {
logger.Error("not able to create the session")
logger.Error("not able to create the session", zap.Error(err))
return
}
kc := kinesis.New(s)
connectordata, err := common.ParseConnectorMetadata()
if err != nil {
logger.Error("error while parsing metadata")
logger.Error("error while parsing metadata", zap.Error(err))
return
}
if err := kc.WaitUntilStreamExists(&kinesis.DescribeStreamInput{StreamName: &connectordata.Topic}); err != nil {
logger.Error("not able to connect to kinesis stream")
logger.Error("not able to connect to kinesis stream", zap.Error(err))
return
}

Expand All @@ -313,7 +317,7 @@ func main() {
shardc: shardc,
maxRecords: 10, //Read maximum 10 records
}

logger.Info("Starting aws kinesis connector")
//Get the shards in shardc chan
go func() {
conn.findNewShards()
Expand Down
2 changes: 1 addition & 1 deletion aws-kinesis-http-connector/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.6
v0.7
2 changes: 1 addition & 1 deletion aws-sqs-http-connector/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/fission/keda-connectors/aws-sqs-http-connector

go 1.12
go 1.15

require (
github.com/aws/aws-sdk-go v1.34.25
Expand Down
10 changes: 8 additions & 2 deletions aws-sqs-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"

"github.com/fission/keda-connectors/common"
)

Expand Down Expand Up @@ -79,7 +80,6 @@ func (conn awsSQSConnector) consumeMessage() {
if err != nil {
conn.errorHandler(errorQueueURL, err)
} else {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
conn.errorHandler(errorQueueURL, err)
Expand All @@ -98,6 +98,10 @@ func (conn awsSQSConnector) consumeMessage() {
conn.deleteMessage(*message.ReceiptHandle, consQueueURL)
}
}
err = resp.Body.Close()
if err != nil {
conn.logger.Error("failed to close response body", zap.Error(err))
}
}
}
}
Expand Down Expand Up @@ -171,7 +175,9 @@ func main() {
defer logger.Sync()

connectordata, err := common.ParseConnectorMetadata()

if err != nil {
logger.Fatal("failed to parse connector metadata", zap.Error(err))
}
config, err := common.GetAwsConfig()
if err != nil {
logger.Error("failed to fetch aws config", zap.Error(err))
Expand Down
Loading

0 comments on commit 870ee15

Please sign in to comment.