From adf5747f9bb00f7f7781779e63c4640958573158 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Tue, 14 Dec 2021 13:42:33 -0800 Subject: [PATCH] Use persistence config consistency for schema version check (#2291) * Use persistence config consistency for Cassandra schema version checking * Update CQL schema tools accordingly --- common/persistence/cassandra/schema_version_reader.go | 3 --- tools/cassandra/cqlclient.go | 9 ++++++--- tools/cassandra/handler.go | 1 + tools/common/schema/types.go | 2 ++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/common/persistence/cassandra/schema_version_reader.go b/common/persistence/cassandra/schema_version_reader.go index a1777d792cb..793276fb2fb 100644 --- a/common/persistence/cassandra/schema_version_reader.go +++ b/common/persistence/cassandra/schema_version_reader.go @@ -53,9 +53,6 @@ func NewSchemaVersionReader(session gocql.Session) *SchemaVersionReader { // ReadSchemaVersion returns the current schema version for the Keyspace func (svr *SchemaVersionReader) ReadSchemaVersion(keyspace string) (string, error) { query := svr.session.Query(readSchemaVersionCQL, keyspace) - // when querying the DB schema version, override to local quorum - // in case Cassandra node down (compared to using ALL) - query.Consistency(gocql.LocalQuorum) iter := query.Iter() var version string diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index 107ed12ae65..e616087c4f1 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -58,6 +58,7 @@ type ( Timeout int numReplicas int Datacenter string + Consistency string TLS *auth.TLS } ) @@ -135,6 +136,11 @@ func (cfg *CQLClientConfig) toCassandraConfig() *config.Cassandra { Keyspace: cfg.Keyspace, TLS: cfg.TLS, Datacenter: cfg.Datacenter, + Consistency: &config.CassandraStoreConsistency{ + Default: &config.CassandraConsistencySettings{ + Consistency: cfg.Consistency, + }, + }, } return &cassandraConfig @@ -178,9 +184,6 @@ func (client *cqlClient) CreateSchemaVersionTables() error { // ReadSchemaVersion returns the current schema version for the Keyspace func (client *cqlClient) ReadSchemaVersion() (string, error) { query := client.session.Query(readSchemaVersionCQL, client.keyspace) - // when querying the DB schema version, override to local quorum - // in case Cassandra node down (compared to using ALL) - query.Consistency(gocql.LocalQuorum) iter := query.Iter() var version string diff --git a/tools/cassandra/handler.go b/tools/cassandra/handler.go index 0aeee8b456c..66bb6d80577 100644 --- a/tools/cassandra/handler.go +++ b/tools/cassandra/handler.go @@ -176,6 +176,7 @@ func newCQLClientConfig(cli *cli.Context) (*CQLClientConfig, error) { Keyspace: cli.GlobalString(schema.CLIOptKeyspace), numReplicas: cli.Int(schema.CLIOptReplicationFactor), Datacenter: cli.String(schema.CLIOptDatacenter), + Consistency: cli.String(schema.CLIOptConsistency), } if cli.GlobalBool(schema.CLIFlagEnableTLS) { diff --git a/tools/common/schema/types.go b/tools/common/schema/types.go index 2c88fe4170b..035e40edec5 100644 --- a/tools/common/schema/types.go +++ b/tools/common/schema/types.go @@ -107,6 +107,8 @@ const ( CLIOptReplicationFactor = "replication-factor" // CLIOptDatacenter is the cli option for NetworkTopologyStrategy datacenter CLIOptDatacenter = "datacenter" + // CLIOptConsistency is the cli option for consistency settings + CLIOptConsistency = "consistency" // CLIOptQuiet is the cli option for quiet mode CLIOptQuiet = "quiet" // CLIOptForce is the cli option for force mode