From 7b94da9e1890dd71c460469e0ea16ab0c193772f Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 2 Aug 2017 15:25:39 +0200 Subject: [PATCH 1/6] change the way cassandra gets initialized in whisper importer --- cmd/mt-whisper-importer-writer/main.go | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 0c6158d459..1eea464550 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -45,16 +45,6 @@ var ( "127.0.0.1:8080", "The http endpoint to listen on", ) - cassandraAddrs = flag.String( - "cassandra-addrs", - "localhost", - "cassandra host (may be given multiple times as comma-separated list)", - ) - cassandraKeyspace = flag.String( - "cassandra-keyspace", - "metrictank", - "cassandra keyspace to use for storing the metric data table", - ) ttlsStr = flag.String( "ttls", "35d", @@ -80,12 +70,30 @@ var ( 1, "Number of Partitions", ) + + cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") + cassandraKeyspace = flag.String("cassandra-keyspace", "raintank", "cassandra keyspace to use for storing the metric data table") + cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one") + cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "") + cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds") + cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.") + cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much") + cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") + cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") + + cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra") + cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL") + cassandraHostVerification = flag.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL") + + cassandraAuth = flag.Bool("cassandra-auth", false, "enable cassandra authentication") + cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication") + cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication") + GitHash = "(none)" printLock sync.Mutex ) type Server struct { - Cluster *gocql.ClusterConfig Session *gocql.Session TTLTables mdata.TTLTables Partitioner partitioner.Partitioner @@ -96,16 +104,9 @@ func main() { cassandra.ConfigSetup() flag.Parse() - cassCluster := gocql.NewCluster(strings.Split(*cassandraAddrs, ",")...) - cassCluster.Consistency = gocql.ParseConsistency("one") - cassCluster.Timeout = time.Second - cassCluster.NumConns = 2 - cassCluster.ProtoVersion = 4 - cassCluster.Keyspace = *cassandraKeyspace - - session, err := cassCluster.CreateSession() + store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil) if err != nil { - panic(fmt.Sprintf("Failed to create cassandra session: %q", err)) + panic(fmt.Sprintf("Failed to initialize cassandra: %q", err)) } splits := strings.Split(*ttlsStr, ",") @@ -121,8 +122,7 @@ func main() { } server := &Server{ - Cluster: cassCluster, - Session: session, + Session: store.Session, TTLTables: ttlTables, Partitioner: p, Index: cassandra.New(), From 7673c2975816dd381b8901d9a5a8a28abc136351 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 2 Aug 2017 17:32:59 +0200 Subject: [PATCH 2/6] initialize index --- cmd/mt-whisper-importer-writer/main.go | 69 ++++++++++++++++---------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 1eea464550..627ba0354f 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -25,69 +25,71 @@ import ( ) var ( - exitOnError = flag.Bool( + globalFlags = flag.NewFlagSet("global config flags", flag.ExitOnError) + + exitOnError = globalFlags.Bool( "exit-on-error", true, "Exit with a message when there's an error", ) - verbose = flag.Bool( + verbose = globalFlags.Bool( "verbose", false, "Write logs to terminal", ) - fakeAvgAggregates = flag.Bool( + fakeAvgAggregates = globalFlags.Bool( "fake-avg-aggregates", true, "Generate sum/cnt series out of avg series to accommodate metrictank", ) - httpEndpoint = flag.String( + httpEndpoint = globalFlags.String( "http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on", ) - ttlsStr = flag.String( + ttlsStr = globalFlags.String( "ttls", "35d", "list of ttl strings used by MT separated by ','", ) - windowFactor = flag.Int( + windowFactor = globalFlags.Int( "window-factor", 20, "the window factor be used when creating the metric table schema", ) - partitionScheme = flag.String( + partitionScheme = globalFlags.String( "partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)", ) - uriPath = flag.String( + uriPath = globalFlags.String( "uri-path", "/chunks", "the URI on which we expect chunks to get posted", ) - numPartitions = flag.Int( + numPartitions = globalFlags.Int( "num-partitions", 1, "Number of Partitions", ) - cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") - cassandraKeyspace = flag.String("cassandra-keyspace", "raintank", "cassandra keyspace to use for storing the metric data table") - cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one") - cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "") - cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds") - cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.") - cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much") - cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") - cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") + cassandraAddrs = globalFlags.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") + cassandraKeyspace = globalFlags.String("cassandra-keyspace", "raintank", "cassandra keyspace to use for storing the metric data table") + cassandraConsistency = globalFlags.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one") + cassandraHostSelectionPolicy = globalFlags.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "") + cassandraTimeout = globalFlags.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds") + cassandraReadConcurrency = globalFlags.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.") + cassandraReadQueueSize = globalFlags.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much") + cassandraRetries = globalFlags.Int("cassandra-retries", 0, "how many times to retry a query before failing it") + cqlProtocolVersion = globalFlags.Int("cql-protocol-version", 4, "cql protocol version to use") - cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra") - cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL") - cassandraHostVerification = flag.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL") + cassandraSSL = globalFlags.Bool("cassandra-ssl", false, "enable SSL connection to cassandra") + cassandraCaPath = globalFlags.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL") + cassandraHostVerification = globalFlags.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL") - cassandraAuth = flag.Bool("cassandra-auth", false, "enable cassandra authentication") - cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication") - cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication") + cassandraAuth = globalFlags.Bool("cassandra-auth", false, "enable cassandra authentication") + cassandraUsername = globalFlags.String("cassandra-username", "cassandra", "username for authentication") + cassandraPassword = globalFlags.String("cassandra-password", "cassandra", "password for authentication") GitHash = "(none)" printLock sync.Mutex @@ -101,8 +103,23 @@ type Server struct { } func main() { - cassandra.ConfigSetup() - flag.Parse() + cassFlags := cassandra.ConfigSetup() + + var cassI int + for i, v := range os.Args { + if v == "cass" { + cassI = i + } + } + if cassI == 0 { + fmt.Println("only indextype 'cass' supported") + flag.Usage() + os.Exit(1) + } + + globalFlags.Parse(os.Args[1:cassI]) + cassFlags.Parse(os.Args[cassI+1 : len(os.Args)]) + cassandra.Enabled = true store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil) if err != nil { From 9e5b8f42f9dfb56982e58b1f471a71bfa82580b5 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 2 Aug 2017 19:15:34 +0200 Subject: [PATCH 3/6] add status url --- cmd/mt-whisper-importer-writer/main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 627ba0354f..4d7fa6b1c0 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -148,6 +148,7 @@ func main() { server.Index.Init() http.HandleFunc(*uriPath, server.chunksHandler) + http.HandleFunc("/status", server.statusHandler) log(fmt.Sprintf("Listening on %q", *httpEndpoint)) err = http.ListenAndServe(*httpEndpoint, nil) @@ -175,6 +176,10 @@ func log(msg string) { } } +func (s *Server) statusHandler(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("ok")) +} + func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) { metric := &archive.Metric{} err := metric.UnmarshalCompressed(req.Body) From 3776c5f7da59c31086fb2eca8a6ce0219645dd72 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 3 Aug 2017 13:24:02 +0200 Subject: [PATCH 4/6] add auth and error handling to reader --- cmd/mt-whisper-importer-reader/main.go | 28 +++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/cmd/mt-whisper-importer-reader/main.go b/cmd/mt-whisper-importer-reader/main.go index 9dea213c66..5ef4ba883d 100644 --- a/cmd/mt-whisper-importer-reader/main.go +++ b/cmd/mt-whisper-importer-reader/main.go @@ -1,6 +1,8 @@ package main import ( + "crypto/tls" + "encoding/base64" "errors" "flag" "fmt" @@ -63,6 +65,11 @@ var ( 1, "Organization ID the data belongs to ", ) + insecureSSL = flag.Bool( + "insecure-ssl", + false, + "Disables ssl certificate verification", + ) whisperDirectory = flag.String( "whisper-directory", "/opt/graphite/storage/whisper", @@ -73,6 +80,11 @@ var ( "*", "Comma separated list of positive integers or '*' for all archives", ) + httpAuth = flag.String( + "http-auth", + "", + "The credentials used to authenticate in the format \"user:password\"", + ) chunkSpans []uint32 readArchives map[int]struct{} printLock sync.Mutex @@ -139,7 +151,10 @@ func log(msg string) { } func processFromChan(files chan string, wg *sync.WaitGroup) { - client := &http.Client{} + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: *insecureSSL}, + } + client := &http.Client{Transport: tr} for file := range files { fd, err := os.Open(file) @@ -173,11 +188,18 @@ func processFromChan(files chan string, wg *sync.WaitGroup) { req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Encoding", "gzip") - _, err = client.Do(req) + if len(*httpAuth) > 0 { + req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*httpAuth))) + } + + resp, err := client.Do(req) if err != nil { throwError(fmt.Sprintf("Error sending request to http endpoint %q: %q", *httpEndpoint, err)) continue } + if resp.StatusCode != 200 { + throwError(fmt.Sprintf("Error when submitting data: %s", resp.Status)) + } } wg.Done() } @@ -363,7 +385,7 @@ func getFileListIntoChan(fileChan chan string) { filepath.Walk( *whisperDirectory, func(path string, info os.FileInfo, err error) error { - if path[len(path)-4:] == ".wsp" { + if len(path) >= 4 && path[len(path)-4:] == ".wsp" { fileChan <- path } return nil From 07a2335957b5b38f9711ec50f5c185369bb259f8 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 3 Aug 2017 13:41:53 +0200 Subject: [PATCH 5/6] add help message --- cmd/mt-whisper-importer-writer/main.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 4d7fa6b1c0..73133bcdc2 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -105,6 +105,29 @@ type Server struct { func main() { cassFlags := cassandra.ConfigSetup() + flag.Usage = func() { + fmt.Println("mt-whisper-importer-writer") + fmt.Println() + fmt.Println("Opens an endpoint to send data to, which then gets stored in the MT internal DB(s)") + fmt.Println() + fmt.Printf("Usage:\n\n") + fmt.Printf(" mt-whisper-importer-writer [global config flags] [idx config flags] \n\n") + fmt.Printf("global config flags:\n\n") + globalFlags.PrintDefaults() + fmt.Println() + fmt.Printf("idxtype: only 'cass' supported for now\n\n") + fmt.Printf("cass config flags:\n\n") + cassFlags.PrintDefaults() + fmt.Println() + fmt.Println("EXAMPLES:") + fmt.Println("mt-whisper-importer-writer -cassandra-addrs=192.168.0.1 -cassandra-keyspace=mydata -exit-on-error=true -fake-avg-aggregates=true -http-endpoint=0.0.0.0:8080 -num-partitions=8 -partition-scheme=bySeries -ttls=8d,2y -uri-path=/chunks -verbose=true -window-factor=20 cass -hosts=192.168.0.1:9042 -keyspace=mydata") + } + + if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") { + flag.Usage() + os.Exit(0) + } + var cassI int for i, v := range os.Args { if v == "cass" { From 992ad72a795faf56aa7f52271643e5027b1aad2e Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 3 Aug 2017 14:59:22 +0200 Subject: [PATCH 6/6] reuse connections & rename /status to /healthz --- cmd/mt-whisper-importer-reader/main.go | 3 +++ cmd/mt-whisper-importer-writer/main.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/mt-whisper-importer-reader/main.go b/cmd/mt-whisper-importer-reader/main.go index 5ef4ba883d..0859d08183 100644 --- a/cmd/mt-whisper-importer-reader/main.go +++ b/cmd/mt-whisper-importer-reader/main.go @@ -7,6 +7,7 @@ import ( "flag" "fmt" "io" + "io/ioutil" "net/http" "os" "path/filepath" @@ -200,6 +201,8 @@ func processFromChan(files chan string, wg *sync.WaitGroup) { if resp.StatusCode != 200 { throwError(fmt.Sprintf("Error when submitting data: %s", resp.Status)) } + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } wg.Done() } diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 73133bcdc2..80024a0926 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -171,7 +171,7 @@ func main() { server.Index.Init() http.HandleFunc(*uriPath, server.chunksHandler) - http.HandleFunc("/status", server.statusHandler) + http.HandleFunc("/healthz", server.healthzHandler) log(fmt.Sprintf("Listening on %q", *httpEndpoint)) err = http.ListenAndServe(*httpEndpoint, nil) @@ -199,7 +199,7 @@ func log(msg string) { } } -func (s *Server) statusHandler(w http.ResponseWriter, req *http.Request) { +func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) { w.Write([]byte("ok")) }