Skip to content

Commit

Permalink
instrument api executor
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Aug 5, 2018
1 parent fd302a1 commit 2eb2bdb
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 114 deletions.
8 changes: 2 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ DOCKER_RUN_DOC_PORT := 8000
DOCKER_RUN_DOC_MOUNT := -v $(CURDIR)/docs:/mkdocs
DOCKER_RUN_DOC_OPTS := --rm $(DOCKER_RUN_DOC_MOUNT) -p $(DOCKER_RUN_DOC_PORT):8000

all: release

release:
@read -p "Enter new release version: " version; \
./misc/scripts/release.sh $$version
all: test

prepare:
go get github.com/mitchellh/gox

test:
go test $(TESTFOLDERS) -cover
go test $(TESTFOLDERS) -cover -race

web:
./misc/scripts/update_web.sh
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Try our [demo instance](https://centrifugo.herokuapp.com/) on Heroku (password `
### Highlights

* Fast server capable to serve thousands of simultaneous connections
* Simple to install and cross platform – works on Linux, MacOS and Windows
* Easily integrates with existing application – no need to rewrite your code base to introduce real-time events
* HTTP and GRPC API to communicate from your application backend (publish messages in channels etc)
* JSON and binary Protobuf Websocket protocol
Expand Down
2 changes: 1 addition & 1 deletion docs/content/deploy/packages.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Currently we support versions of the following distributions:
See [full list of available packages](https://packagecloud.io/FZambia/centrifugo) and
[installation instructions](https://packagecloud.io/FZambia/centrifugo/install).

Also note that if your linux distro is not in list you can ask us to package
Also note that if your Linux distro is not in list you can ask us to package
for it or just download appropriate package from packagecloud that fits your
distribution.

Expand Down
10 changes: 5 additions & 5 deletions docs/content/libraries/client.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Client connection libraries
# Client libraries

In progress:
These libraries allow your users to connect to Centrifugo from application frontend.

* [centrifuge-js](https://github.com/centrifugal/centrifuge-js/tree/c2) – for browser, NodeJS and React Native.
* [centrifuge-go](https://github.com/centrifugal/centrifuge-go/tree/c2) - for Go language.
* [centrifuge-mobile](https://github.com/centrifugal/centrifuge-mobile/c2) - for iOS and Android using `centrifuge-go` as basis and `gomobile` project to create bindings.
* [centrifuge-js](https://github.com/centrifugal/centrifuge-js) – for browser, NodeJS and React Native.
* [centrifuge-go](https://github.com/centrifugal/centrifuge-go) - for Go language.
* [centrifuge-mobile](https://github.com/centrifugal/centrifuge-mobile) - for iOS and Android using `centrifuge-go` as basis and `gomobile` project to create bindings.

There are no native mobile clients at moment but hopefully this will change soon with open-source community help.
1 change: 1 addition & 0 deletions docs/content/server/channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Several symbols in channel names are reserved for Centrifugo internal needs:
* `#` – for user channel boundary (see below)
* `*` – for future Centrifugo needs
* `&` – for future Centrifugo needs
* `/` – for future Centrifugo needs

### namespace channel boundary (:)

Expand Down
40 changes: 37 additions & 3 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package api

import (
"context"
"time"

"github.com/centrifugal/centrifuge"
)

// apiExecutor can run API methods.
type apiExecutor struct {
node *centrifuge.Node
node *centrifuge.Node
protocol string
}

func newAPIExecutor(n *centrifuge.Node) *apiExecutor {
func newAPIExecutor(n *centrifuge.Node, protocol string) *apiExecutor {
return &apiExecutor{
node: n,
node: n,
protocol: protocol,
}
}

// Publish publishes data into channel.
func (h *apiExecutor) Publish(ctx context.Context, cmd *PublishRequest) *PublishResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "publish").Observe(time.Since(started).Seconds())
}(time.Now())

ch := cmd.Channel
data := cmd.Data

Expand Down Expand Up @@ -60,6 +67,9 @@ func (h *apiExecutor) Publish(ctx context.Context, cmd *PublishRequest) *Publish

// Broadcast publishes the same data into many channels.
func (h *apiExecutor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *BroadcastResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "broadcast").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &BroadcastResponse{}

Expand Down Expand Up @@ -125,6 +135,9 @@ func (h *apiExecutor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Bro
// Unsubscribe unsubscribes user from channel and sends unsubscribe
// control message to other nodes so they could also unsubscribe user.
func (h *apiExecutor) Unsubscribe(ctx context.Context, cmd *UnsubscribeRequest) *UnsubscribeResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "unsibscribe").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &UnsubscribeResponse{}

Expand Down Expand Up @@ -157,6 +170,9 @@ func (h *apiExecutor) Unsubscribe(ctx context.Context, cmd *UnsubscribeRequest)
// Disconnect disconnects user by its ID and sends disconnect
// control message to other nodes so they could also disconnect user.
func (h *apiExecutor) Disconnect(ctx context.Context, cmd *DisconnectRequest) *DisconnectResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "disconnect").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &DisconnectResponse{}

Expand All @@ -178,6 +194,9 @@ func (h *apiExecutor) Disconnect(ctx context.Context, cmd *DisconnectRequest) *D

// Presence returns response with presence information for channel.
func (h *apiExecutor) Presence(ctx context.Context, cmd *PresenceRequest) *PresenceResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "presence").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &PresenceResponse{}

Expand Down Expand Up @@ -224,6 +243,9 @@ func (h *apiExecutor) Presence(ctx context.Context, cmd *PresenceRequest) *Prese

// PresenceStats returns response with presence stats information for channel.
func (h *apiExecutor) PresenceStats(ctx context.Context, cmd *PresenceStatsRequest) *PresenceStatsResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "presence_stats").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &PresenceStatsResponse{}

Expand Down Expand Up @@ -262,6 +284,9 @@ func (h *apiExecutor) PresenceStats(ctx context.Context, cmd *PresenceStatsReque

// History returns response with history information for channel.
func (h *apiExecutor) History(ctx context.Context, cmd *HistoryRequest) *HistoryResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "history").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &HistoryResponse{}

Expand Down Expand Up @@ -316,6 +341,9 @@ func (h *apiExecutor) History(ctx context.Context, cmd *HistoryRequest) *History

// HistoryRemove removes all history information for channel.
func (h *apiExecutor) HistoryRemove(ctx context.Context, cmd *HistoryRemoveRequest) *HistoryRemoveResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "history_remove").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &HistoryRemoveResponse{}

Expand Down Expand Up @@ -349,6 +377,9 @@ func (h *apiExecutor) HistoryRemove(ctx context.Context, cmd *HistoryRemoveReque

// Channels returns active channels.
func (h *apiExecutor) Channels(ctx context.Context, cmd *ChannelsRequest) *ChannelsResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "channels").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &ChannelsResponse{}

Expand All @@ -367,6 +398,9 @@ func (h *apiExecutor) Channels(ctx context.Context, cmd *ChannelsRequest) *Chann

// Info returns information about running nodes.
func (h *apiExecutor) Info(ctx context.Context, cmd *InfoRequest) *InfoResponse {
defer func(started time.Time) {
apiCommandDurationSummary.WithLabelValues(h.protocol, "info").Observe(time.Since(started).Seconds())
}(time.Now())

resp := &InfoResponse{}

Expand Down
2 changes: 1 addition & 1 deletion internal/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type grpcAPIService struct {
func newGRPCAPIService(n *centrifuge.Node, c GRPCAPIServiceConfig) *grpcAPIService {
return &grpcAPIService{
config: c,
api: newAPIExecutor(n),
api: newAPIExecutor(n, "grpc"),
}
}

Expand Down
9 changes: 1 addition & 8 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/centrifugal/centrifuge"
)
Expand All @@ -26,7 +25,7 @@ func NewHandler(n *centrifuge.Node, c Config) *Handler {
return &Handler{
node: n,
config: c,
api: newAPIExecutor(n),
api: newAPIExecutor(n, "http"),
}
}

Expand All @@ -38,10 +37,6 @@ func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
default:
}

defer func(started time.Time) {
apiHandlerDurationSummary.Observe(time.Since(started).Seconds())
}(time.Now())

var data []byte
var err error

Expand Down Expand Up @@ -83,9 +78,7 @@ func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
now := time.Now()
rep, err := s.handleAPICommand(r.Context(), enc, command)
apiCommandDurationSummary.WithLabelValues(strings.ToLower(MethodType_name[int32(command.Method)])).Observe(time.Since(now).Seconds())
if err != nil {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling API command", map[string]interface{}{"error": err.Error()}))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
Expand Down
93 changes: 3 additions & 90 deletions internal/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,102 +7,15 @@ import (
var metricsNamespace = "centrifugo"

var (
messagesSentCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "messages_sent_count",
Help: "Number of messages sent.",
}, []string{"type"})

messagesReceivedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "messages_received_count",
Help: "Number of messages received.",
}, []string{"type"})

actionCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "action_count",
Help: "Number of node actions called.",
}, []string{"action"})

numClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "num_clients",
Help: "Number of clients connected.",
})

numUsersGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "num_users",
Help: "Number of unique users connected.",
})

buildInfoGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "build",
Help: "Node build info.",
}, []string{"version"})

numChannelsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: "node",
Name: "num_channels",
Help: "Number of channels with one or more subscribers.",
})

replyErrorCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "client",
Name: "num_reply_errors",
Help: "Number of errors in replies sent to clients.",
}, []string{"method", "code"})

commandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: metricsNamespace,
Subsystem: "client",
Name: "command_duration_seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
Help: "Client command duration summary.",
}, []string{"method"})

apiHandlerDurationSummary = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: metricsNamespace,
Subsystem: "http",
Name: "api_request_duration_seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
Help: "Duration of API handler in general.",
})

apiCommandDurationSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: metricsNamespace,
Subsystem: "http",
Name: "api_request_command_duration_seconds",
Subsystem: "api",
Name: "command_duration_seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
Help: "Duration of API per command.",
}, []string{"method"})

transportConnectCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "connect_count",
Help: "Number of connections to specific transport.",
}, []string{"transport"})

transportMessagesSent = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "transport",
Name: "messages_sent",
Help: "Number of messages sent over specific transport.",
}, []string{"transport"})
}, []string{"protocol", "method"})
)

func init() {
prometheus.MustRegister(apiHandlerDurationSummary)
prometheus.MustRegister(apiCommandDurationSummary)
}

0 comments on commit 2eb2bdb

Please sign in to comment.