Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add config variable to verify instead of create cassandra keyspace
Browse files Browse the repository at this point in the history
  • Loading branch information
jtlisi authored and Dieterbe committed Sep 5, 2017
1 parent 8f356d7 commit e9ad4d8
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 15 deletions.
3 changes: 2 additions & 1 deletion cmd/mt-split-metrics-by-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")

cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
Expand Down Expand Up @@ -68,7 +69,7 @@ func main() {
panic(fmt.Sprintf("Error creating directory: %s", err))
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, ttls)
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, ttls)
if err != nil {
panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err))
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
//cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have")
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")

cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
Expand Down Expand Up @@ -156,7 +157,7 @@ func main() {
}
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil)
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
cassandraReadQueueSize = globalFlags.Int("cassandra-read-queue-size", 100, "max number of outstanding reads before blocking. value doesn't matter much")
cassandraRetries = globalFlags.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = globalFlags.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = globalFlags.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")

cassandraSSL = globalFlags.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = globalFlags.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
Expand Down Expand Up @@ -148,7 +149,7 @@ func main() {
log.SetLevel(log.InfoLevel)
}

store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, nil)
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, nil)
if err != nil {
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
}
Expand Down
28 changes: 23 additions & 5 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var (
ssl bool
auth bool
hostverification bool
createKeyspace bool
keyspace string
hosts string
capath string
Expand Down Expand Up @@ -99,6 +100,7 @@ func ConfigSetup() *flag.FlagSet {
casIdx.DurationVar(&maxStale, "max-stale", 0, "clear series from the index if they have not been seen for this much time.")
casIdx.DurationVar(&pruneInterval, "prune-interval", time.Hour*3, "Interval at which the index should be checked for stale series.")
casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use")
casIdx.BoolVar(&createKeyspace, "create-keyspace", true, "set to create keyspaces otherwise check to ensure they exist")

casIdx.BoolVar(&ssl, "ssl", false, "enable SSL connection to cassandra")
casIdx.StringVar(&capath, "ca-path", "/etc/metrictank/ca.pem", "cassandra CA certficate path when using SSL")
Expand Down Expand Up @@ -167,12 +169,28 @@ func (c *CasIdx) InitBare() error {
return err
}

// ensure the keyspace and table exist.
err = tmpSession.Query(fmt.Sprintf(KeyspaceSchema, keyspace)).Exec()
if err != nil {
log.Error(3, "cassandra-idx failed to initialize cassandra keyspace. %s", err)
return err
// create the keyspace or ensure it exists
if createKeyspace {
err = tmpSession.Query(fmt.Sprintf(KeyspaceSchema, keyspace)).Exec()
if err != nil {
log.Error(3, "cassandra-idx failed to initialize cassandra keyspace. %s", err)
return err
}
} else {
for attempt := 1; attempt > 0; attempt++ {
_, err = tmpSession.KeyspaceMetadata(keyspace)
if err != nil {
log.Warn("cassandra-idx cassandra keyspace not found. retry attempt: %v", attempt)
if attempt > 5 {
return err
}
time.Sleep(5 * time.Second)
} else {
break
}
}
}

err = tmpSession.Query(fmt.Sprintf(TableSchema, keyspace)).Exec()
if err != nil {
log.Error(3, "cassandra-idx failed to initialize cassandra table. %s", err)
Expand Down
29 changes: 23 additions & 6 deletions mdata/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func GetTTLTable(ttl uint32, windowFactor int, nameFormat string) ttlTable {
}
}

func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, ttls []uint32) (*CassandraStore, error) {
func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, createKeyspace bool, ttls []uint32) (*CassandraStore, error) {

stats.NewGauge32("store.cassandra.write_queue.size").Set(writeqsize)
stats.NewGauge32("store.cassandra.num_writers").Set(writers)
Expand All @@ -198,11 +198,28 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password,
if err != nil {
return nil, err
}
// ensure the keyspace and table exist.
err = tmpSession.Query(fmt.Sprintf(keyspace_schema, keyspace)).Exec()
if err != nil {
return nil, err
}

// create or verify the metrictank keyspace
if createKeyspace {
err = tmpSession.Query(fmt.Sprintf(keyspace_schema, keyspace)).Exec()
if err != nil {
return nil, err
}
} else {
// five attempts to verify the keyspace exists before returning an error
for attempt := 1; attempt > 0; attempt++ {
_, err = tmpSession.KeyspaceMetadata(keyspace)
if err != nil {
log.Warn("cassandra keyspace not found; attempt: %v", attempt)
if attempt > 5 {
return nil, err
}
time.Sleep(5 * time.Second)
} else {
break
}
}
}

ttlTables := GetTTLTables(ttls, windowFactor, Table_name_format)
for _, result := range ttlTables {
Expand Down
4 changes: 4 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ cassandra-write-queue-size = 100000
cassandra-retries = 0
# CQL protocol version. cassandra 3.x needs v3 or 4.
cql-protocol-version = 4
# enable the creation of the mdata keyspace and tables, only one node needs this
cassandra-create-keyspace = true

# enable SSL connection to cassandra
cassandra-ssl = false
Expand Down Expand Up @@ -286,6 +288,8 @@ auth = false
username = cassandra
# password for authentication
password = cassandra
# sets whether to create or check for the cassandra keyspaces
create-keyspace = true

### in-memory only
[memory-idx]
Expand Down
3 changes: 2 additions & 1 deletion metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ var (
cassandraWindowFactor = flag.Int("cassandra-window-factor", 20, "size of compaction window relative to TTL")
cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")

cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
Expand Down Expand Up @@ -265,7 +266,7 @@ func main() {
/***********************************
Initialize our backendStore
***********************************/
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, mdata.TTLs())
store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, mdata.TTLs())
if err != nil {
log.Fatal(4, "failed to initialize cassandra. %s", err)
}
Expand Down

0 comments on commit e9ad4d8

Please sign in to comment.