Skip to content

Commit

Permalink
collect metrics from the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
kshvakov committed Mar 17, 2018
1 parent 572b647 commit 08a797f
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 36 deletions.
29 changes: 18 additions & 11 deletions plugins/inputs/clickhouse/README.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
# Telegraf Input Plugin: ClickHouse

This [ClickHouse](https://github.com/yandex/ClickHouse) plugin provides metrics for your ClickHouse server.
This plugin gathers the statistic data from [ClickHouse](https://github.com/yandex/ClickHouse) server.

### Configuration example:
### Configuration
```
# Read metrics from one or many ClickHouse servers
[[inputs.clickhouse]]
dsn = "native://localhost:9000?username=user&password=qwerty"
dsn = "native://localhost:9000?username=user&password=qwerty"
cluster = true # If a setting is "true" plugin tries to connect to all servers in the cluster (system.clusters)
ignored_clusters = ["test_shard_localhost"] ## ignored cluster names
```

### Metrics:
- clickhouse_events
- tags:
- server (ClickHouse server hostname)
- hostname (Telegraf agent hostname)
- hostname (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- all rows from system.events

- clickhouse_metrics
- tags:
- server (ClickHouse server hostname)
- hostname (Telegraf agent hostname)
- hostname (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- all rows from system.metrics

- clickhouse_asynchronous_metrics
- tags:
- server (ClickHouse server hostname)
- hostname (Telegraf agent hostname)
- hostname (ClickHouse server hostname)
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- all rows from system.asynchronous_metrics

- clickhouse_tables
- tags:
- server (ClickHouse server hostname)
- hostname (Telegraf agent hostname)
- hostname (ClickHouse server hostname)
- table
- database
- cluster (Name of the cluster [optional])
- shard_num (Shard number in the cluster [optional])
- fields:
- bytes
- parts
Expand Down
146 changes: 122 additions & 24 deletions plugins/inputs/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package clickhouse

import (
"database/sql"
"os"
"fmt"
"net"
"net/url"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -13,16 +16,23 @@ import (

const sampleConfig = `
### ClickHouse DSN
dsn = "native://localhost:9000?username=user&password=qwerty"
dsn = "native://localhost:9000?username=user&password=qwerty"
cluster = false
ignored_clusters = ["test_shard_localhost"]
`

var hostname, _ = os.Hostname()
type connect struct {
*sql.DB
cluster, shardNum, hostname string
}

// ClickHouse Telegraf Input Plugin
type ClickHouse struct {
DSN string `toml:"dsn"`
server string
connect *sql.DB
DSN string `toml:"dsn"`
Cluster bool `toml:"cluster"`
IgnoredClusters []string `toml:"ignored_clusters"`
connect *connect
clustersConn map[string]*connect
}

// SampleConfig returns the sample config
Expand All @@ -37,18 +47,37 @@ func (*ClickHouse) Description() string {

// Gather collect data from ClickHouse server
func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
if !ch.Cluster {
if err := ch.gather(ch.connect, acc); err != nil {
acc.AddError(err)
return err
}
return nil
}
conns, err := ch.conns(acc)
if err != nil {
acc.AddError(err)
return err
}
for _, conn := range conns {
if err := ch.gather(conn, acc); err != nil {
acc.AddError(err)
}
}
return nil
}

func (ch *ClickHouse) gather(conn *connect, acc telegraf.Accumulator) (err error) {
var rows *sql.Rows
for measurement, query := range measurementMap {
if rows, err = ch.connect.Query(query); err != nil {
acc.AddError(err)
if rows, err = conn.Query(query); err != nil {
return err
}
if err := ch.processRows(measurement, rows, acc); err != nil {
if err := ch.processRows(measurement, conn, rows, acc); err != nil {
return err
}
}
if rows, err = ch.connect.Query(systemParts); err != nil {
acc.AddError(err)
if rows, err = conn.Query(systemParts); err != nil {
return err
}
defer rows.Close()
Expand All @@ -58,7 +87,6 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
bytes, parts, rowsInTable uint64
)
if err := rows.Scan(&database, &table, &bytes, &parts, &rowsInTable); err != nil {
acc.AddError(err)
return err
}
acc.AddFields("clickhouse_tables",
Expand All @@ -68,16 +96,75 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) {
"rows": rowsInTable,
},
map[string]string{
"table": table,
"server": ch.server,
"hostname": hostname,
"database": database,
})
"table": table,
"server": conn.hostname,
"cluster": conn.cluster,
"database": database,
"hostname": conn.hostname,
"shard_num": conn.shardNum,
},
)
}
return nil
}

func (ch *ClickHouse) processRows(measurement string, rows *sql.Rows, acc telegraf.Accumulator) error {
func (ch *ClickHouse) conns(acc telegraf.Accumulator) ([]*connect, error) {
var (
ignore = func(cluster string) bool {
for _, ignored := range ch.IgnoredClusters {
if cluster == ignored {
return true
}
}
return false
}
rows, err = ch.connect.Query(systemClusterSQL)
)
if err != nil {
return nil, err
}
baseDSN, err := url.Parse(ch.DSN)
if err != nil {
return nil, err
}
for rows.Next() {
var (
port, shardNum int
hostname, address, cluster string
connID = fmt.Sprintf("%s_%d", address, shardNum)
)
if err := rows.Scan(&cluster, &shardNum, &hostname, &address, &port); err != nil {
acc.AddError(err)
continue
}
if _, found := ch.clustersConn[connID]; !found {
if ignore(cluster) {
continue
}
baseDSN.Host = net.JoinHostPort(address, strconv.Itoa(port))
conn, err := sql.Open("clickhouse", baseDSN.String())
if err != nil {
acc.AddError(err)
continue
}
ch.clustersConn[connID] = &connect{
DB: conn,
cluster: cluster,
shardNum: strconv.Itoa(shardNum),
hostname: hostname,
}
}
}
conns := make([]*connect, 0, len(ch.clustersConn))
for _, conn := range ch.clustersConn {
if err := conn.Ping(); err == nil {
conns = append(conns, conn)
}
}
return conns, nil
}

func (ch *ClickHouse) processRows(measurement string, conn *connect, rows *sql.Rows, acc telegraf.Accumulator) error {
defer rows.Close()
fields := make(map[string]interface{})
for rows.Next() {
Expand All @@ -86,29 +173,29 @@ func (ch *ClickHouse) processRows(measurement string, rows *sql.Rows, acc telegr
value uint64
)
if err := rows.Scan(&key, &value); err != nil {
acc.AddError(err)
return err
}
fields[key] = value
}
acc.AddFields("clickhouse_"+measurement, fields, map[string]string{
"server": ch.server,
"hostname": hostname,
"cluster": conn.cluster,
"hostname": conn.hostname,
"shard_num": conn.shardNum,
})
return nil
}

// Start ClickHouse input service
func (ch *ClickHouse) Start(telegraf.Accumulator) (err error) {
if ch.connect, err = sql.Open("clickhouse", ch.DSN); err != nil {
if ch.connect.DB, err = sql.Open("clickhouse", ch.DSN); err != nil {
return err
}
{
ch.connect.SetMaxOpenConns(2)
ch.connect.SetMaxIdleConns(1)
ch.connect.SetConnMaxLifetime(20 * time.Second)
}
return ch.connect.QueryRow("SELECT hostName()").Scan(&ch.server)
return ch.connect.QueryRow("SELECT hostName()").Scan(&ch.connect.hostname)
}

// Stop ClickHouse input service
Expand All @@ -120,7 +207,10 @@ func (ch *ClickHouse) Stop() {

func init() {
inputs.Add("clickhouse", func() telegraf.Input {
return &ClickHouse{}
return &ClickHouse{
connect: &connect{},
clustersConn: make(map[string]*connect),
}
})
}

Expand All @@ -142,6 +232,14 @@ const (
ORDER BY
database, table
`
systemClusterSQL = `
SELECT
cluster,
shard_num,
host_name,
host_address,
port
FROM system.clusters`
)

var measurementMap = map[string]string{
Expand Down
4 changes: 3 additions & 1 deletion plugins/inputs/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ func TestClickHouseGeneratesMetrics(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
ch := ClickHouse{
DSN: fmt.Sprintf("native://%s:9000", testutil.GetLocalHost()),
DSN: fmt.Sprintf("native://%s:9000", testutil.GetLocalHost()),
connect: &connect{},
clustersConn: make(map[string]*connect),
}
var acc testutil.Accumulator
{
Expand Down

0 comments on commit 08a797f

Please sign in to comment.