From 08a797f5589568b680856b7e8fff78c0c3da3669 Mon Sep 17 00:00:00 2001 From: kshvakov Date: Sat, 17 Mar 2018 08:04:17 +0200 Subject: [PATCH] collect metrics from the cluster --- plugins/inputs/clickhouse/README.md | 29 ++-- plugins/inputs/clickhouse/clickhouse.go | 146 ++++++++++++++++--- plugins/inputs/clickhouse/clickhouse_test.go | 4 +- 3 files changed, 143 insertions(+), 36 deletions(-) diff --git a/plugins/inputs/clickhouse/README.md b/plugins/inputs/clickhouse/README.md index 35fb268b5c578..6c20fc29857af 100644 --- a/plugins/inputs/clickhouse/README.md +++ b/plugins/inputs/clickhouse/README.md @@ -1,41 +1,48 @@ # Telegraf Input Plugin: ClickHouse -This [ClickHouse](https://github.com/yandex/ClickHouse) plugin provides metrics for your ClickHouse server. +This plugin gathers the statistic data from [ClickHouse](https://github.com/yandex/ClickHouse) server. -### Configuration example: +### Configuration ``` +# Read metrics from one or many ClickHouse servers [[inputs.clickhouse]] - dsn = "native://localhost:9000?username=user&password=qwerty" + dsn = "native://localhost:9000?username=user&password=qwerty" + cluster = true # If a setting is "true" plugin tries to connect to all servers in the cluster (system.clusters) + ignored_clusters = ["test_shard_localhost"] ## ignored cluster names ``` ### Metrics: - clickhouse_events - tags: - - server (ClickHouse server hostname) - - hostname (Telegraf agent hostname) + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) - fields: - all rows from system.events - clickhouse_metrics - tags: - - server (ClickHouse server hostname) - - hostname (Telegraf agent hostname) + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) - fields: - all rows from system.metrics - clickhouse_asynchronous_metrics - tags: - - server (ClickHouse server hostname) - - hostname (Telegraf agent hostname) + - hostname (ClickHouse server hostname) + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) - fields: - all rows from system.asynchronous_metrics - clickhouse_tables - tags: - - server (ClickHouse server hostname) - - hostname (Telegraf agent hostname) + - hostname (ClickHouse server hostname) - table - database + - cluster (Name of the cluster [optional]) + - shard_num (Shard number in the cluster [optional]) - fields: - bytes - parts diff --git a/plugins/inputs/clickhouse/clickhouse.go b/plugins/inputs/clickhouse/clickhouse.go index 627d0d13f42be..f4a7435b93ab0 100644 --- a/plugins/inputs/clickhouse/clickhouse.go +++ b/plugins/inputs/clickhouse/clickhouse.go @@ -2,7 +2,10 @@ package clickhouse import ( "database/sql" - "os" + "fmt" + "net" + "net/url" + "strconv" "time" "github.com/influxdata/telegraf" @@ -13,16 +16,23 @@ import ( const sampleConfig = ` ### ClickHouse DSN -dsn = "native://localhost:9000?username=user&password=qwerty" +dsn = "native://localhost:9000?username=user&password=qwerty" +cluster = false +ignored_clusters = ["test_shard_localhost"] ` -var hostname, _ = os.Hostname() +type connect struct { + *sql.DB + cluster, shardNum, hostname string +} // ClickHouse Telegraf Input Plugin type ClickHouse struct { - DSN string `toml:"dsn"` - server string - connect *sql.DB + DSN string `toml:"dsn"` + Cluster bool `toml:"cluster"` + IgnoredClusters []string `toml:"ignored_clusters"` + connect *connect + clustersConn map[string]*connect } // SampleConfig returns the sample config @@ -37,18 +47,37 @@ func (*ClickHouse) Description() string { // Gather collect data from ClickHouse server func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { + if !ch.Cluster { + if err := ch.gather(ch.connect, acc); err != nil { + acc.AddError(err) + return err + } + return nil + } + conns, err := ch.conns(acc) + if err != nil { + acc.AddError(err) + return err + } + for _, conn := range conns { + if err := ch.gather(conn, acc); err != nil { + acc.AddError(err) + } + } + return nil +} + +func (ch *ClickHouse) gather(conn *connect, acc telegraf.Accumulator) (err error) { var rows *sql.Rows for measurement, query := range measurementMap { - if rows, err = ch.connect.Query(query); err != nil { - acc.AddError(err) + if rows, err = conn.Query(query); err != nil { return err } - if err := ch.processRows(measurement, rows, acc); err != nil { + if err := ch.processRows(measurement, conn, rows, acc); err != nil { return err } } - if rows, err = ch.connect.Query(systemParts); err != nil { - acc.AddError(err) + if rows, err = conn.Query(systemParts); err != nil { return err } defer rows.Close() @@ -58,7 +87,6 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { bytes, parts, rowsInTable uint64 ) if err := rows.Scan(&database, &table, &bytes, &parts, &rowsInTable); err != nil { - acc.AddError(err) return err } acc.AddFields("clickhouse_tables", @@ -68,16 +96,75 @@ func (ch *ClickHouse) Gather(acc telegraf.Accumulator) (err error) { "rows": rowsInTable, }, map[string]string{ - "table": table, - "server": ch.server, - "hostname": hostname, - "database": database, - }) + "table": table, + "server": conn.hostname, + "cluster": conn.cluster, + "database": database, + "hostname": conn.hostname, + "shard_num": conn.shardNum, + }, + ) } return nil } -func (ch *ClickHouse) processRows(measurement string, rows *sql.Rows, acc telegraf.Accumulator) error { +func (ch *ClickHouse) conns(acc telegraf.Accumulator) ([]*connect, error) { + var ( + ignore = func(cluster string) bool { + for _, ignored := range ch.IgnoredClusters { + if cluster == ignored { + return true + } + } + return false + } + rows, err = ch.connect.Query(systemClusterSQL) + ) + if err != nil { + return nil, err + } + baseDSN, err := url.Parse(ch.DSN) + if err != nil { + return nil, err + } + for rows.Next() { + var ( + port, shardNum int + hostname, address, cluster string + connID = fmt.Sprintf("%s_%d", address, shardNum) + ) + if err := rows.Scan(&cluster, &shardNum, &hostname, &address, &port); err != nil { + acc.AddError(err) + continue + } + if _, found := ch.clustersConn[connID]; !found { + if ignore(cluster) { + continue + } + baseDSN.Host = net.JoinHostPort(address, strconv.Itoa(port)) + conn, err := sql.Open("clickhouse", baseDSN.String()) + if err != nil { + acc.AddError(err) + continue + } + ch.clustersConn[connID] = &connect{ + DB: conn, + cluster: cluster, + shardNum: strconv.Itoa(shardNum), + hostname: hostname, + } + } + } + conns := make([]*connect, 0, len(ch.clustersConn)) + for _, conn := range ch.clustersConn { + if err := conn.Ping(); err == nil { + conns = append(conns, conn) + } + } + return conns, nil +} + +func (ch *ClickHouse) processRows(measurement string, conn *connect, rows *sql.Rows, acc telegraf.Accumulator) error { defer rows.Close() fields := make(map[string]interface{}) for rows.Next() { @@ -86,21 +173,21 @@ func (ch *ClickHouse) processRows(measurement string, rows *sql.Rows, acc telegr value uint64 ) if err := rows.Scan(&key, &value); err != nil { - acc.AddError(err) return err } fields[key] = value } acc.AddFields("clickhouse_"+measurement, fields, map[string]string{ - "server": ch.server, - "hostname": hostname, + "cluster": conn.cluster, + "hostname": conn.hostname, + "shard_num": conn.shardNum, }) return nil } // Start ClickHouse input service func (ch *ClickHouse) Start(telegraf.Accumulator) (err error) { - if ch.connect, err = sql.Open("clickhouse", ch.DSN); err != nil { + if ch.connect.DB, err = sql.Open("clickhouse", ch.DSN); err != nil { return err } { @@ -108,7 +195,7 @@ func (ch *ClickHouse) Start(telegraf.Accumulator) (err error) { ch.connect.SetMaxIdleConns(1) ch.connect.SetConnMaxLifetime(20 * time.Second) } - return ch.connect.QueryRow("SELECT hostName()").Scan(&ch.server) + return ch.connect.QueryRow("SELECT hostName()").Scan(&ch.connect.hostname) } // Stop ClickHouse input service @@ -120,7 +207,10 @@ func (ch *ClickHouse) Stop() { func init() { inputs.Add("clickhouse", func() telegraf.Input { - return &ClickHouse{} + return &ClickHouse{ + connect: &connect{}, + clustersConn: make(map[string]*connect), + } }) } @@ -142,6 +232,14 @@ const ( ORDER BY database, table ` + systemClusterSQL = ` + SELECT + cluster, + shard_num, + host_name, + host_address, + port + FROM system.clusters` ) var measurementMap = map[string]string{ diff --git a/plugins/inputs/clickhouse/clickhouse_test.go b/plugins/inputs/clickhouse/clickhouse_test.go index c2d7686d2cd12..36d0d44fe08f1 100644 --- a/plugins/inputs/clickhouse/clickhouse_test.go +++ b/plugins/inputs/clickhouse/clickhouse_test.go @@ -14,7 +14,9 @@ func TestClickHouseGeneratesMetrics(t *testing.T) { t.Skip("Skipping integration test in short mode") } ch := ClickHouse{ - DSN: fmt.Sprintf("native://%s:9000", testutil.GetLocalHost()), + DSN: fmt.Sprintf("native://%s:9000", testutil.GetLocalHost()), + connect: &connect{}, + clustersConn: make(map[string]*connect), } var acc testutil.Accumulator {