Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add verification for cassandra tables
Browse files Browse the repository at this point in the history
  • Loading branch information
jtlisi authored and Dieterbe committed Sep 5, 2017
1 parent e9ad4d8 commit ac401e7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
6 changes: 3 additions & 3 deletions cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ var (
//cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.")
cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")
//cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have")
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the metrictank keyspace")

cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
Expand Down
26 changes: 18 additions & 8 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,36 @@ func (c *CasIdx) InitBare() error {
log.Error(3, "cassandra-idx failed to initialize cassandra keyspace. %s", err)
return err
}
err = tmpSession.Query(fmt.Sprintf(TableSchema, keyspace)).Exec()
if err != nil {
log.Error(3, "cassandra-idx failed to initialize cassandra table. %s", err)
return err
}
} else {
var keyspaceMetadata *gocql.KeyspaceMetadata
for attempt := 1; attempt > 0; attempt++ {
_, err = tmpSession.KeyspaceMetadata(keyspace)
keyspaceMetadata, err = tmpSession.KeyspaceMetadata(keyspace)
if err != nil {
log.Warn("cassandra-idx cassandra keyspace not found. retry attempt: %v", attempt)
if attempt > 5 {
return err
}
time.Sleep(5 * time.Second)
} else {
break
if _, ok := keyspaceMetadata.Tables["metric_idx"]; ok {
break
} else {
log.Warn("cassandra-idx cassandra table not found. retry attempt: %v", attempt)
if attempt > 5 {
return err
}
time.Sleep(5 * time.Second)
}
}
}

}

err = tmpSession.Query(fmt.Sprintf(TableSchema, keyspace)).Exec()
if err != nil {
log.Error(3, "cassandra-idx failed to initialize cassandra table. %s", err)
return err
}

tmpSession.Close()
c.cluster.Keyspace = keyspace
session, err := c.cluster.CreateSession()
Expand Down
37 changes: 25 additions & 12 deletions mdata/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,39 +199,52 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password,
return nil, err
}

ttlTables := GetTTLTables(ttls, windowFactor, Table_name_format)

// create or verify the metrictank keyspace
if createKeyspace {
err = tmpSession.Query(fmt.Sprintf(keyspace_schema, keyspace)).Exec()
if err != nil {
return nil, err
}
for _, result := range ttlTables {
err := tmpSession.Query(fmt.Sprintf(table_schema, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec()
if err != nil {
return nil, err
}
}

if err != nil {
return nil, err
}
} else {
var keyspaceMetadata *gocql.KeyspaceMetadata
// five attempts to verify the keyspace exists before returning an error
AttemptLoop:
for attempt := 1; attempt > 0; attempt++ {
_, err = tmpSession.KeyspaceMetadata(keyspace)
keyspaceMetadata, err = tmpSession.KeyspaceMetadata(keyspace)
if err != nil {
log.Warn("cassandra keyspace not found; attempt: %v", attempt)
if attempt > 5 {
return nil, err
}
time.Sleep(5 * time.Second)
} else {
for _, result := range ttlTables {
if _, ok := keyspaceMetadata.Tables[result.Table]; !ok {
log.Warn("cassandra table %s not found; attempt: %v", result.Table, attempt)
if attempt > 5 {
return nil, err
}
time.Sleep(5 * time.Second)
continue AttemptLoop
}
}
break
}
}
}

ttlTables := GetTTLTables(ttls, windowFactor, Table_name_format)
for _, result := range ttlTables {
err := tmpSession.Query(fmt.Sprintf(table_schema, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec()
if err != nil {
return nil, err
}
}

if err != nil {
return nil, err
}
tmpSession.Close()
cluster.Keyspace = keyspace
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retries}
Expand Down

0 comments on commit ac401e7

Please sign in to comment.