From 2eb2bdbb039f1cc8346237d79d5d430e5191518b Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sun, 5 Aug 2018 22:02:32 +0300 Subject: [PATCH] instrument api executor --- Makefile | 8 +-- README.md | 1 + docs/content/deploy/packages.md | 2 +- docs/content/libraries/client.md | 10 ++-- docs/content/server/channels.md | 1 + internal/api/api.go | 40 ++++++++++++-- internal/api/grpc.go | 2 +- internal/api/handler.go | 9 +--- internal/api/metrics.go | 93 ++------------------------------ 9 files changed, 52 insertions(+), 114 deletions(-) diff --git a/Makefile b/Makefile index 4f89331ce4..5feb394348 100644 --- a/Makefile +++ b/Makefile @@ -7,17 +7,13 @@ DOCKER_RUN_DOC_PORT := 8000 DOCKER_RUN_DOC_MOUNT := -v $(CURDIR)/docs:/mkdocs DOCKER_RUN_DOC_OPTS := --rm $(DOCKER_RUN_DOC_MOUNT) -p $(DOCKER_RUN_DOC_PORT):8000 -all: release - -release: - @read -p "Enter new release version: " version; \ - ./misc/scripts/release.sh $$version +all: test prepare: go get github.com/mitchellh/gox test: - go test $(TESTFOLDERS) -cover + go test $(TESTFOLDERS) -cover -race web: ./misc/scripts/update_web.sh diff --git a/README.md b/README.md index fc12f29800..a3602e3ccb 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Try our [demo instance](https://centrifugo.herokuapp.com/) on Heroku (password ` ### Highlights * Fast server capable to serve thousands of simultaneous connections +* Simple to install and cross platform – works on Linux, MacOS and Windows * Easily integrates with existing application – no need to rewrite your code base to introduce real-time events * HTTP and GRPC API to communicate from your application backend (publish messages in channels etc) * JSON and binary Protobuf Websocket protocol diff --git a/docs/content/deploy/packages.md b/docs/content/deploy/packages.md index 79415eed30..243d695ab3 100644 --- a/docs/content/deploy/packages.md +++ b/docs/content/deploy/packages.md @@ -14,7 +14,7 @@ Currently we support versions of the following distributions: See [full list of available packages](https://packagecloud.io/FZambia/centrifugo) and [installation instructions](https://packagecloud.io/FZambia/centrifugo/install). -Also note that if your linux distro is not in list you can ask us to package +Also note that if your Linux distro is not in list you can ask us to package for it or just download appropriate package from packagecloud that fits your distribution. diff --git a/docs/content/libraries/client.md b/docs/content/libraries/client.md index 6eb93d9a1d..969b89be84 100644 --- a/docs/content/libraries/client.md +++ b/docs/content/libraries/client.md @@ -1,9 +1,9 @@ -# Client connection libraries +# Client libraries -In progress: +These libraries allow your users to connect to Centrifugo from application frontend. -* [centrifuge-js](https://github.com/centrifugal/centrifuge-js/tree/c2) – for browser, NodeJS and React Native. -* [centrifuge-go](https://github.com/centrifugal/centrifuge-go/tree/c2) - for Go language. -* [centrifuge-mobile](https://github.com/centrifugal/centrifuge-mobile/c2) - for iOS and Android using `centrifuge-go` as basis and `gomobile` project to create bindings. +* [centrifuge-js](https://github.com/centrifugal/centrifuge-js) – for browser, NodeJS and React Native. +* [centrifuge-go](https://github.com/centrifugal/centrifuge-go) - for Go language. +* [centrifuge-mobile](https://github.com/centrifugal/centrifuge-mobile) - for iOS and Android using `centrifuge-go` as basis and `gomobile` project to create bindings. There are no native mobile clients at moment but hopefully this will change soon with open-source community help. diff --git a/docs/content/server/channels.md b/docs/content/server/channels.md index d7663253ee..430fde4500 100644 --- a/docs/content/server/channels.md +++ b/docs/content/server/channels.md @@ -17,6 +17,7 @@ Several symbols in channel names are reserved for Centrifugo internal needs: * `#` – for user channel boundary (see below) * `*` – for future Centrifugo needs * `&` – for future Centrifugo needs +* `/` – for future Centrifugo needs ### namespace channel boundary (:) diff --git a/internal/api/api.go b/internal/api/api.go index 3b1c16a906..401e07d09e 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -2,23 +2,30 @@ package api import ( "context" + "time" "github.com/centrifugal/centrifuge" ) // apiExecutor can run API methods. type apiExecutor struct { - node *centrifuge.Node + node *centrifuge.Node + protocol string } -func newAPIExecutor(n *centrifuge.Node) *apiExecutor { +func newAPIExecutor(n *centrifuge.Node, protocol string) *apiExecutor { return &apiExecutor{ - node: n, + node: n, + protocol: protocol, } } // Publish publishes data into channel. func (h *apiExecutor) Publish(ctx context.Context, cmd *PublishRequest) *PublishResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "publish").Observe(time.Since(started).Seconds()) + }(time.Now()) + ch := cmd.Channel data := cmd.Data @@ -60,6 +67,9 @@ func (h *apiExecutor) Publish(ctx context.Context, cmd *PublishRequest) *Publish // Broadcast publishes the same data into many channels. func (h *apiExecutor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *BroadcastResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "broadcast").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &BroadcastResponse{} @@ -125,6 +135,9 @@ func (h *apiExecutor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Bro // Unsubscribe unsubscribes user from channel and sends unsubscribe // control message to other nodes so they could also unsubscribe user. func (h *apiExecutor) Unsubscribe(ctx context.Context, cmd *UnsubscribeRequest) *UnsubscribeResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "unsibscribe").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &UnsubscribeResponse{} @@ -157,6 +170,9 @@ func (h *apiExecutor) Unsubscribe(ctx context.Context, cmd *UnsubscribeRequest) // Disconnect disconnects user by its ID and sends disconnect // control message to other nodes so they could also disconnect user. func (h *apiExecutor) Disconnect(ctx context.Context, cmd *DisconnectRequest) *DisconnectResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "disconnect").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &DisconnectResponse{} @@ -178,6 +194,9 @@ func (h *apiExecutor) Disconnect(ctx context.Context, cmd *DisconnectRequest) *D // Presence returns response with presence information for channel. func (h *apiExecutor) Presence(ctx context.Context, cmd *PresenceRequest) *PresenceResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "presence").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &PresenceResponse{} @@ -224,6 +243,9 @@ func (h *apiExecutor) Presence(ctx context.Context, cmd *PresenceRequest) *Prese // PresenceStats returns response with presence stats information for channel. func (h *apiExecutor) PresenceStats(ctx context.Context, cmd *PresenceStatsRequest) *PresenceStatsResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "presence_stats").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &PresenceStatsResponse{} @@ -262,6 +284,9 @@ func (h *apiExecutor) PresenceStats(ctx context.Context, cmd *PresenceStatsReque // History returns response with history information for channel. func (h *apiExecutor) History(ctx context.Context, cmd *HistoryRequest) *HistoryResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "history").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &HistoryResponse{} @@ -316,6 +341,9 @@ func (h *apiExecutor) History(ctx context.Context, cmd *HistoryRequest) *History // HistoryRemove removes all history information for channel. func (h *apiExecutor) HistoryRemove(ctx context.Context, cmd *HistoryRemoveRequest) *HistoryRemoveResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "history_remove").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &HistoryRemoveResponse{} @@ -349,6 +377,9 @@ func (h *apiExecutor) HistoryRemove(ctx context.Context, cmd *HistoryRemoveReque // Channels returns active channels. func (h *apiExecutor) Channels(ctx context.Context, cmd *ChannelsRequest) *ChannelsResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "channels").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &ChannelsResponse{} @@ -367,6 +398,9 @@ func (h *apiExecutor) Channels(ctx context.Context, cmd *ChannelsRequest) *Chann // Info returns information about running nodes. func (h *apiExecutor) Info(ctx context.Context, cmd *InfoRequest) *InfoResponse { + defer func(started time.Time) { + apiCommandDurationSummary.WithLabelValues(h.protocol, "info").Observe(time.Since(started).Seconds()) + }(time.Now()) resp := &InfoResponse{} diff --git a/internal/api/grpc.go b/internal/api/grpc.go index 060b17ef9f..e5d155de88 100644 --- a/internal/api/grpc.go +++ b/internal/api/grpc.go @@ -26,7 +26,7 @@ type grpcAPIService struct { func newGRPCAPIService(n *centrifuge.Node, c GRPCAPIServiceConfig) *grpcAPIService { return &grpcAPIService{ config: c, - api: newAPIExecutor(n), + api: newAPIExecutor(n, "grpc"), } } diff --git a/internal/api/handler.go b/internal/api/handler.go index f8777e52df..14bd0ea145 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "net/http" "strings" - "time" "github.com/centrifugal/centrifuge" ) @@ -26,7 +25,7 @@ func NewHandler(n *centrifuge.Node, c Config) *Handler { return &Handler{ node: n, config: c, - api: newAPIExecutor(n), + api: newAPIExecutor(n, "http"), } } @@ -38,10 +37,6 @@ func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { default: } - defer func(started time.Time) { - apiHandlerDurationSummary.Observe(time.Since(started).Seconds()) - }(time.Now()) - var data []byte var err error @@ -83,9 +78,7 @@ func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "Bad Request", http.StatusBadRequest) return } - now := time.Now() rep, err := s.handleAPICommand(r.Context(), enc, command) - apiCommandDurationSummary.WithLabelValues(strings.ToLower(MethodType_name[int32(command.Method)])).Observe(time.Since(now).Seconds()) if err != nil { s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling API command", map[string]interface{}{"error": err.Error()})) http.Error(w, "Internal Server Error", http.StatusInternalServerError) diff --git a/internal/api/metrics.go b/internal/api/metrics.go index d5377ba168..0b389c806e 100644 --- a/internal/api/metrics.go +++ b/internal/api/metrics.go @@ -7,102 +7,15 @@ import ( var metricsNamespace = "centrifugo" var ( - messagesSentCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "messages_sent_count", - Help: "Number of messages sent.", - }, []string{"type"}) - - messagesReceivedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "messages_received_count", - Help: "Number of messages received.", - }, []string{"type"}) - - actionCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "action_count", - Help: "Number of node actions called.", - }, []string{"action"}) - - numClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "num_clients", - Help: "Number of clients connected.", - }) - - numUsersGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "num_users", - Help: "Number of unique users connected.", - }) - - buildInfoGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "build", - Help: "Node build info.", - }, []string{"version"}) - - numChannelsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: "node", - Name: "num_channels", - Help: "Number of channels with one or more subscribers.", - }) - - replyErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "client", - Name: "num_reply_errors", - Help: "Number of errors in replies sent to clients.", - }, []string{"method", "code"}) - - commandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: "client", - Name: "command_duration_seconds", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - Help: "Client command duration summary.", - }, []string{"method"}) - - apiHandlerDurationSummary = prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: metricsNamespace, - Subsystem: "http", - Name: "api_request_duration_seconds", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - Help: "Duration of API handler in general.", - }) - apiCommandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: metricsNamespace, - Subsystem: "http", - Name: "api_request_command_duration_seconds", + Subsystem: "api", + Name: "command_duration_seconds", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, Help: "Duration of API per command.", - }, []string{"method"}) - - transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "transport", - Name: "connect_count", - Help: "Number of connections to specific transport.", - }, []string{"transport"}) - - transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: "transport", - Name: "messages_sent", - Help: "Number of messages sent over specific transport.", - }, []string{"transport"}) + }, []string{"protocol", "method"}) ) func init() { - prometheus.MustRegister(apiHandlerDurationSummary) prometheus.MustRegister(apiCommandDurationSummary) }