Skip to content

Commit

Permalink
Update internal dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
  • Loading branch information
alexellis committed Jan 17, 2023
1 parent 10ae9e9 commit 3615ccb
Show file tree
Hide file tree
Showing 51 changed files with 6,853 additions and 5,011 deletions.
26 changes: 21 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
module github.com/openfaas/nats-queue-worker

go 1.16
go 1.18

require (
github.com/nats-io/stan.go v0.10.4
github.com/openfaas/faas-provider v0.19.1
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.3.2 // indirect
github.com/nats-io/nats-streaming-server v0.22.0 // indirect
github.com/nats-io/stan.go v0.9.0
github.com/openfaas/faas-provider v0.18.6
github.com/gorilla/mux v1.8.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/nats-io/nats.go v1.22.1 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
)
411 changes: 411 additions & 0 deletions go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

const sharedQueue = "faas-request"

// CreateNATSQueue ready for asynchronous processing
// CreateNATSQueue ready for asynchronous message processing of paylods of
// up to a maximum of 256KB in size.
func CreateNATSQueue(address string, port int, clusterName, channel string, clientConfig NATSConfig) (*NATSQueue, error) {
var err error
natsURL := fmt.Sprintf("nats://%s:%d", address, port)
Expand Down
5 changes: 5 additions & 0 deletions handler/nats_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"encoding/json"
"fmt"
"log"
"sync"
"time"
Expand Down Expand Up @@ -37,6 +38,10 @@ func (q *NATSQueue) Queue(req *ftypes.QueueRequest) error {
if v := req.Header.Get("X-Call-Id"); len(v) > 0 {
callId = v
}
max := 256 * 1000
if len(req.Body) > max {
return fmt.Errorf("request body too large for OpenFaaS CE (%d bytes), maximum: %d bytes", len(req.Body), 256*1000)
}

log.Printf("[%s] Queueing (%d) bytes for: %s.\n", callId, len(req.Body), req.Function)

Expand Down
24 changes: 12 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -40,6 +39,8 @@ func main() {
log.Printf("Starting queue-worker (Community Edition). Concurrency: %d\tChannel: %s\tVersion: %s\tGit Commit: %s",
config.MaxInflight, sharedQueue, release, sha)

log.Printf("[Warning] NATS Streaming is deprecated and will be removed in a future release. See: https://www.openfaas.com/blog/jetstream-for-openfaas/")

client := makeClient()

counter := uint64(0)
Expand Down Expand Up @@ -99,7 +100,7 @@ func main() {
timeTaken := time.Since(started).Seconds()

if req.CallbackURL != nil {
resultStatusCode, resultErr := postResult(&client,
resultStatusCode, err := postResult(&client,
res,
functionResult,
req.CallbackURL.String(),
Expand All @@ -108,8 +109,8 @@ func main() {
req.Function,
timeTaken)

if resultErr != nil {
log.Printf("[#%d] Posted callback to: %s - status %d, error: %s\n", i, req.CallbackURL.String(), http.StatusServiceUnavailable, resultErr.Error())
if err != nil {
log.Printf("[#%d] Posted callback to: %s - status %d, error: %s\n", i, req.CallbackURL.String(), http.StatusServiceUnavailable, err.Error())
} else {
log.Printf("[#%d] Posted result to %s - status: %d", i, req.CallbackURL.String(), resultStatusCode)
}
Expand All @@ -121,7 +122,7 @@ func main() {
if res.Body != nil {
defer res.Body.Close()

resData, err := ioutil.ReadAll(res.Body)
resData, err := io.ReadAll(res.Body)
functionResult = resData

if err != nil {
Expand All @@ -140,7 +141,7 @@ func main() {
if req.CallbackURL != nil {
log.Printf("[#%d] Callback to: %s\n", i, req.CallbackURL.String())

resultStatusCode, resultErr := postResult(&client,
resultStatusCode, err := postResult(&client,
res,
functionResult,
req.CallbackURL.String(),
Expand All @@ -149,8 +150,8 @@ func main() {
req.Function,
timeTaken)

if resultErr != nil {
log.Printf("[#%d] Error posting to callback-url: %s\n", i, resultErr)
if err != nil {
log.Printf("[#%d] Error posting to callback-url: %s\n", i, err)
} else {
log.Printf("[#%d] Posted result for %s to callback-url: %s, status: %d", i, req.Function, req.CallbackURL.String(), resultStatusCode)
}
Expand All @@ -177,15 +178,16 @@ func main() {
ackWait: config.AckWait,
}

if initErr := natsQueue.connect(); initErr != nil {
log.Panic(initErr)
if err := natsQueue.connect(); err != nil {
log.Panic(err)
}

// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
// Run cleanup when signal is received
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
<-signalChan

fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
if err := natsQueue.closeConnection(); err != nil {
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
Expand All @@ -202,7 +204,6 @@ func makeClient() http.Client {
Timeout: 30 * time.Second,
KeepAlive: 0,
}).DialContext,

MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
Expand All @@ -225,7 +226,6 @@ func postResult(client *http.Client, functionRes *http.Response, result []byte,
}

request, err := http.NewRequest(http.MethodPost, callbackURL, reader)

if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to post result, error: %s", err.Error())
}
Expand Down
5 changes: 4 additions & 1 deletion vendor/github.com/nats-io/nats.go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions vendor/github.com/nats-io/nats.go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 106 additions & 0 deletions vendor/github.com/nats-io/nats.go/.words

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions vendor/github.com/nats-io/nats.go/.words.readme

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions vendor/github.com/nats-io/nats.go/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3615ccb

Please sign in to comment.