Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Improvements on whisper importer #704

Merged
merged 6 commits into from
Aug 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions cmd/mt-whisper-importer-reader/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package main

import (
"crypto/tls"
"encoding/base64"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -63,6 +66,11 @@ var (
1,
"Organization ID the data belongs to ",
)
insecureSSL = flag.Bool(
"insecure-ssl",
false,
"Disables ssl certificate verification",
)
whisperDirectory = flag.String(
"whisper-directory",
"/opt/graphite/storage/whisper",
Expand All @@ -73,6 +81,11 @@ var (
"*",
"Comma separated list of positive integers or '*' for all archives",
)
httpAuth = flag.String(
"http-auth",
"",
"The credentials used to authenticate in the format \"user:password\"",
)
chunkSpans []uint32
readArchives map[int]struct{}
printLock sync.Mutex
Expand Down Expand Up @@ -139,7 +152,10 @@ func log(msg string) {
}

func processFromChan(files chan string, wg *sync.WaitGroup) {
client := &http.Client{}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: *insecureSSL},
}
client := &http.Client{Transport: tr}

for file := range files {
fd, err := os.Open(file)
Expand Down Expand Up @@ -173,11 +189,20 @@ func processFromChan(files chan string, wg *sync.WaitGroup) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")

_, err = client.Do(req)
if len(*httpAuth) > 0 {
req.Header.Add("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(*httpAuth)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than manually adding the header, you can call

req.URL.User = url.UserPassword(user, pass)

But as you have the user/pass in a single string, what you are doing is probably easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thx, i'll leave it then

}

resp, err := client.Do(req)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to drain the resp.Body in order for the connection to be re-used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if err != nil {
throwError(fmt.Sprintf("Error sending request to http endpoint %q: %q", *httpEndpoint, err))
continue
}
if resp.StatusCode != 200 {
throwError(fmt.Sprintf("Error when submitting data: %s", resp.Status))
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
wg.Done()
}
Expand Down Expand Up @@ -363,7 +388,7 @@ func getFileListIntoChan(fileChan chan string) {
filepath.Walk(
*whisperDirectory,
func(path string, info os.FileInfo, err error) error {
if path[len(path)-4:] == ".wsp" {
if len(path) >= 4 && path[len(path)-4:] == ".wsp" {
fileChan <- path
}
return nil
Expand Down
109 changes: 77 additions & 32 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,87 +25,128 @@ import (
)

var (
exitOnError = flag.Bool(
globalFlags = flag.NewFlagSet("global config flags", flag.ExitOnError)

exitOnError = globalFlags.Bool(
"exit-on-error",
true,
"Exit with a message when there's an error",
)
verbose = flag.Bool(
verbose = globalFlags.Bool(
"verbose",
false,
"Write logs to terminal",
)
fakeAvgAggregates = flag.Bool(
fakeAvgAggregates = globalFlags.Bool(
"fake-avg-aggregates",
true,
"Generate sum/cnt series out of avg series to accommodate metrictank",
)
httpEndpoint = flag.String(
httpEndpoint = globalFlags.String(
"http-endpoint",
"127.0.0.1:8080",
"The http endpoint to listen on",
)
cassandraAddrs = flag.String(
"cassandra-addrs",
"localhost",
"cassandra host (may be given multiple times as comma-separated list)",
)
cassandraKeyspace = flag.String(
"cassandra-keyspace",
"metrictank",
"cassandra keyspace to use for storing the metric data table",
)
ttlsStr = flag.String(
ttlsStr = globalFlags.String(
"ttls",
"35d",
"list of ttl strings used by MT separated by ','",
)
windowFactor = flag.Int(
windowFactor = globalFlags.Int(
"window-factor",
20,
"the window factor be used when creating the metric table schema",
)
partitionScheme = flag.String(
partitionScheme = globalFlags.String(
"partition-scheme",
"bySeries",
"method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)",
)
uriPath = flag.String(
uriPath = globalFlags.String(
"uri-path",
"/chunks",
"the URI on which we expect chunks to get posted",
)
numPartitions = flag.Int(
numPartitions = globalFlags.Int(
"num-partitions",
1,
"Number of Partitions",
)

cassandraAddrs = globalFlags.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)")
cassandraKeyspace = globalFlags.String("cassandra-keyspace", "raintank", "cassandra keyspace to use for storing the metric data table")
cassandraConsistency = globalFlags.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
cassandraHostSelectionPolicy = globalFlags.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
cassandraTimeout = globalFlags.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
cassandraReadConcurrency = globalFlags.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.")
cassandraReadQueueSize = globalFlags.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much")
cassandraRetries = globalFlags.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = globalFlags.Int("cql-protocol-version", 4, "cql protocol version to use")

cassandraSSL = globalFlags.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = globalFlags.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
cassandraHostVerification = globalFlags.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL")

cassandraAuth = globalFlags.Bool("cassandra-auth", false, "enable cassandra authentication")
cassandraUsername = globalFlags.String("cassandra-username", "cassandra", "username for authentication")
cassandraPassword = globalFlags.String("cassandra-password", "cassandra", "password for authentication")

GitHash = "(none)"
printLock sync.Mutex
)

type Server struct {
Cluster *gocql.ClusterConfig
Session *gocql.Session
TTLTables mdata.TTLTables
Partitioner partitioner.Partitioner
Index idx.MetricIndex
}

func main() {
cassandra.ConfigSetup()
flag.Parse()
cassFlags := cassandra.ConfigSetup()

flag.Usage = func() {
fmt.Println("mt-whisper-importer-writer")
fmt.Println()
fmt.Println("Opens an endpoint to send data to, which then gets stored in the MT internal DB(s)")
fmt.Println()
fmt.Printf("Usage:\n\n")
fmt.Printf(" mt-whisper-importer-writer [global config flags] <idxtype> [idx config flags] \n\n")
fmt.Printf("global config flags:\n\n")
globalFlags.PrintDefaults()
fmt.Println()
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
fmt.Printf("cass config flags:\n\n")
cassFlags.PrintDefaults()
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("mt-whisper-importer-writer -cassandra-addrs=192.168.0.1 -cassandra-keyspace=mydata -exit-on-error=true -fake-avg-aggregates=true -http-endpoint=0.0.0.0:8080 -num-partitions=8 -partition-scheme=bySeries -ttls=8d,2y -uri-path=/chunks -verbose=true -window-factor=20 cass -hosts=192.168.0.1:9042 -keyspace=mydata")
}

cassCluster := gocql.NewCluster(strings.Split(*cassandraAddrs, ",")...)
cassCluster.Consistency = gocql.ParseConsistency("one")
cassCluster.Timeout = time.Second
cassCluster.NumConns = 2
cassCluster.ProtoVersion = 4
cassCluster.Keyspace = *cassandraKeyspace
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
flag.Usage()
os.Exit(0)
}

var cassI int
for i, v := range os.Args {
if v == "cass" {
cassI = i
}
}
if cassI == 0 {
fmt.Println("only indextype 'cass' supported")
flag.Usage()
os.Exit(1)
}

globalFlags.Parse(os.Args[1:cassI])
cassFlags.Parse(os.Args[cassI+1 : len(os.Args)])
cassandra.Enabled = true

session, err := cassCluster.CreateSession()
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil)
if err != nil {
panic(fmt.Sprintf("Failed to create cassandra session: %q", err))
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
}

splits := strings.Split(*ttlsStr, ",")
Expand All @@ -121,8 +162,7 @@ func main() {
}

server := &Server{
Cluster: cassCluster,
Session: session,
Session: store.Session,
TTLTables: ttlTables,
Partitioner: p,
Index: cassandra.New(),
Expand All @@ -131,6 +171,7 @@ func main() {
server.Index.Init()

http.HandleFunc(*uriPath, server.chunksHandler)
http.HandleFunc("/healthz", server.healthzHandler)

log(fmt.Sprintf("Listening on %q", *httpEndpoint))
err = http.ListenAndServe(*httpEndpoint, nil)
Expand Down Expand Up @@ -158,6 +199,10 @@ func log(msg string) {
}
}

func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("ok"))
}

func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
metric := &archive.Metric{}
err := metric.UnmarshalCompressed(req.Body)
Expand Down