diff --git a/cmd/mt-store-cat/main.go b/cmd/mt-store-cat/main.go index a1dccb416f..594b3a62d0 100644 --- a/cmd/mt-store-cat/main.go +++ b/cmd/mt-store-cat/main.go @@ -34,9 +34,9 @@ var ( //cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.") cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.") //cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have") - cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") - cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") - cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace") + cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") + cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") + cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace") cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra") cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL") diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index ee8f277aad..1dba476ed8 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -176,9 +176,15 @@ func (c *CasIdx) InitBare() error { log.Error(3, "cassandra-idx failed to initialize cassandra keyspace. %s", err) return err } + err = tmpSession.Query(fmt.Sprintf(TableSchema, keyspace)).Exec() + if err != nil { + log.Error(3, "cassandra-idx failed to initialize cassandra table. %s", err) + return err + } } else { + var keyspaceMetadata *gocql.KeyspaceMetadata for attempt := 1; attempt > 0; attempt++ { - _, err = tmpSession.KeyspaceMetadata(keyspace) + keyspaceMetadata, err = tmpSession.KeyspaceMetadata(keyspace) if err != nil { log.Warn("cassandra-idx cassandra keyspace not found. retry attempt: %v", attempt) if attempt > 5 { @@ -186,16 +192,20 @@ func (c *CasIdx) InitBare() error { } time.Sleep(5 * time.Second) } else { - break + if _, ok := keyspaceMetadata.Tables["metric_idx"]; ok { + break + } else { + log.Warn("cassandra-idx cassandra table not found. retry attempt: %v", attempt) + if attempt > 5 { + return err + } + time.Sleep(5 * time.Second) + } } } + } - - err = tmpSession.Query(fmt.Sprintf(TableSchema, keyspace)).Exec() - if err != nil { - log.Error(3, "cassandra-idx failed to initialize cassandra table. %s", err) - return err - } + tmpSession.Close() c.cluster.Keyspace = keyspace session, err := c.cluster.CreateSession() diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index bc7e6492e0..b2f06fbdd9 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -199,16 +199,30 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, return nil, err } + ttlTables := GetTTLTables(ttls, windowFactor, Table_name_format) + // create or verify the metrictank keyspace if createKeyspace { err = tmpSession.Query(fmt.Sprintf(keyspace_schema, keyspace)).Exec() if err != nil { return nil, err } + for _, result := range ttlTables { + err := tmpSession.Query(fmt.Sprintf(table_schema, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec() + if err != nil { + return nil, err + } + } + + if err != nil { + return nil, err + } } else { + var keyspaceMetadata *gocql.KeyspaceMetadata // five attempts to verify the keyspace exists before returning an error + AttemptLoop: for attempt := 1; attempt > 0; attempt++ { - _, err = tmpSession.KeyspaceMetadata(keyspace) + keyspaceMetadata, err = tmpSession.KeyspaceMetadata(keyspace) if err != nil { log.Warn("cassandra keyspace not found; attempt: %v", attempt) if attempt > 5 { @@ -216,22 +230,21 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, } time.Sleep(5 * time.Second) } else { + for _, result := range ttlTables { + if _, ok := keyspaceMetadata.Tables[result.Table]; !ok { + log.Warn("cassandra table %s not found; attempt: %v", result.Table, attempt) + if attempt > 5 { + return nil, err + } + time.Sleep(5 * time.Second) + continue AttemptLoop + } + } break } } - } - - ttlTables := GetTTLTables(ttls, windowFactor, Table_name_format) - for _, result := range ttlTables { - err := tmpSession.Query(fmt.Sprintf(table_schema, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec() - if err != nil { - return nil, err - } } - if err != nil { - return nil, err - } tmpSession.Close() cluster.Keyspace = keyspace cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retries}