Skip to content

Commit

Permalink
Merge 4eb7c8a into 1b5fb94
Browse files Browse the repository at this point in the history
  • Loading branch information
kevindiu authored Nov 10, 2020
2 parents 1b5fb94 + 4eb7c8a commit 4a2dc28
Show file tree
Hide file tree
Showing 5 changed files with 1,497 additions and 675 deletions.
118 changes: 78 additions & 40 deletions internal/db/nosql/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,61 @@ import (
)

var (
ErrNotFound = gocql.ErrNotFound
ErrUnavailable = gocql.ErrUnavailable
ErrUnsupported = gocql.ErrUnsupported
ErrTooManyStmts = gocql.ErrTooManyStmts
ErrUseStmt = gocql.ErrUseStmt
ErrSessionClosed = gocql.ErrSessionClosed
ErrNoConnections = gocql.ErrNoConnections
ErrNoKeyspace = gocql.ErrNoKeyspace
// ErrNotFound is a alias of gocql.ErrNotFound
ErrNotFound = gocql.ErrNotFound
// ErrUnavailable is a alias of gocql.ErrUnavailable
ErrUnavailable = gocql.ErrUnavailable
// ErrUnsupported is a alias of gocql.ErrUnsupported
ErrUnsupported = gocql.ErrUnsupported
// ErrTooManyStmts is a alias of gocql.ErrTooManyStmts
ErrTooManyStmts = gocql.ErrTooManyStmts
// ErrUseStmt is a alias of gocql.ErrUseStmt
ErrUseStmt = gocql.ErrUseStmt
// ErrSessionClosed is a alias of gocql.ErrSessionClosed
ErrSessionClosed = gocql.ErrSessionClosed
// ErrNoConnections is a alias of gocql.ErrNoConnections
ErrNoConnections = gocql.ErrNoConnections
// ErrNoKeyspace is a alias of gocql.ErrNoKeyspace
ErrNoKeyspace = gocql.ErrNoKeyspace
// ErrKeyspaceDoesNotExist is a alias of gocql.ErrKeyspaceDoesNotExist
ErrKeyspaceDoesNotExist = gocql.ErrKeyspaceDoesNotExist
ErrNoMetadata = gocql.ErrNoMetadata
ErrNoHosts = gocql.ErrNoHosts
// ErrNoMetadata is a alias of gocql.ErrNoMetadata
ErrNoMetadata = gocql.ErrNoMetadata
// ErrNoHosts is a alias of gocql.ErrNoHosts
ErrNoHosts = gocql.ErrNoHosts
// ErrNoConnectionsStarted is a alias of gocql.ErrNoConnectionsStarted
ErrNoConnectionsStarted = gocql.ErrNoConnectionsStarted
ErrHostQueryFailed = gocql.ErrHostQueryFailed
// ErrHostQueryFailed is a alias of gocql.ErrHostQueryFailed
ErrHostQueryFailed = gocql.ErrHostQueryFailed
)

// Cassandra represent an interface to query on cassandra
type Cassandra interface {
Open(ctx context.Context) error
Close(ctx context.Context) error
Query(stmt string, names []string) *Queryx
}

// ClusterConfig represent an interface of cassandra cluster configuation
type ClusterConfig interface {
CreateSession() (*gocql.Session, error)
}

type (
Session = gocql.Session
Cmp = qb.Cmp
BatchBuilder = qb.BatchBuilder
// Session is a alias of gocql.Session
Session = gocql.Session
// Cmp is a alias of qb.Cmp
Cmp = qb.Cmp
// BatchBuilder is a alias of qb.BatchBuilder
BatchBuilder = qb.BatchBuilder
// InsertBuilder is a alias of qb.InsertBuilder
InsertBuilder = qb.InsertBuilder
// DeleteBuilder is a alias of qb.DeleteBuilder
DeleteBuilder = qb.DeleteBuilder
// UpdateBuilder is a alias of qb.UpdateBuilder
UpdateBuilder = qb.UpdateBuilder
Queryx = gocqlx.Queryx
// Queryx is a alias of gocqlx.Queryx
Queryx = gocqlx.Queryx
)

type (
Expand All @@ -83,6 +109,11 @@ type (
dcHost string
whiteList []string
}
events struct {
DisableNodeStatusEvents bool
DisableTopologyEvents bool
DisableSchemaEvents bool
}
client struct {
hosts []string
cqlVersion string
Expand Down Expand Up @@ -128,11 +159,12 @@ type (
dialer gocql.Dialer
writeCoalesceWaitTime time.Duration

cluster *gocql.ClusterConfig
cluster ClusterConfig
session *gocql.Session
}
)

// New initialize and return the cassandra client, or any error occurred.
func New(opts ...Option) (Cassandra, error) {
c := new(client)
for _, opt := range append(defaultOpts, opts...) {
Expand Down Expand Up @@ -216,31 +248,29 @@ func New(opts ...Option) (Cassandra, error) {
ReconnectInterval: c.reconnectInterval,
MaxWaitSchemaAgreement: c.maxWaitSchemaAgreement,
HostFilter: func() (hf gocql.HostFilter) {
if c.hostFilter.enable {
if len(c.hostFilter.dcHost) != 0 {
hf = gocql.DataCentreHostFilter(c.poolConfig.dataCenterName)
}
if len(c.hostFilter.whiteList) != 0 {
if hf == nil {
hf = gocql.WhiteListHostFilter(c.hostFilter.whiteList...)
} else {
hf = gocql.HostFilterFunc(func(host *gocql.HostInfo) bool {
return hf.Accept(host) ||
gocql.WhiteListHostFilter(c.hostFilter.whiteList...).Accept(host)
})
}
if !c.hostFilter.enable {
return nil
}

if len(c.hostFilter.dcHost) != 0 {
hf = gocql.DataCentreHostFilter(c.hostFilter.dcHost)
}
if len(c.hostFilter.whiteList) != 0 {
wlhf := gocql.WhiteListHostFilter(c.hostFilter.whiteList...)
if hf == nil {
hf = wlhf
} else {
hf = gocql.HostFilterFunc(func(host *gocql.HostInfo) bool {
return hf.Accept(host) || wlhf.Accept(host)
})
}
}
return hf
}(),
// AddressTranslator
IgnorePeerAddr: c.ignorePeerAddr,
DisableInitialHostLookup: c.disableInitialHostLookup,
Events: struct {
DisableNodeStatusEvents bool
DisableTopologyEvents bool
DisableSchemaEvents bool
}{
Events: events{
DisableNodeStatusEvents: c.disableNodeStatusEvents,
DisableTopologyEvents: c.disableTopologyEvents,
DisableSchemaEvents: c.disableSchemaEvents,
Expand Down Expand Up @@ -269,26 +299,26 @@ func New(opts ...Option) (Cassandra, error) {
return c, nil
}

func (c *client) Open(ctx context.Context) error {
session, err := c.cluster.CreateSession()
if err != nil {
// Open creates a session to cassandra and return any error occurred
func (c *client) Open(ctx context.Context) (err error) {
if c.session, err = c.cluster.CreateSession(); err != nil {
return err
}

c.session = session

return nil
}

// Close closes the session to cassandra
func (c *client) Close(ctx context.Context) error {
c.session.Close()
return nil
}

// Query creates an query that can be executed on cassandra
func (c *client) Query(stmt string, names []string) *Queryx {
return gocqlx.Query(c.session.Query(stmt), names)
}

// Select build and returns the cql string and the named args
func Select(table string, columns []string, cmps ...Cmp) (stmt string, names []string) {
sb := qb.Select(table).Columns(columns...)
for _, cmp := range cmps {
Expand All @@ -297,6 +327,7 @@ func Select(table string, columns []string, cmps ...Cmp) (stmt string, names []s
return sb.ToCql()
}

// Delete returns the delete builder
func Delete(table string, cmps ...Cmp) *DeleteBuilder {
db := qb.Delete(table)
for _, cmp := range cmps {
Expand All @@ -305,30 +336,37 @@ func Delete(table string, cmps ...Cmp) *DeleteBuilder {
return db
}

// Insert returns the insert builder
func Insert(table string, columns ...string) *InsertBuilder {
return qb.Insert(table).Columns(columns...)
}

// Update returns the update builder
func Update(table string) *UpdateBuilder {
return qb.Update(table)
}

// Batch returns the batch builder
func Batch() *BatchBuilder {
return qb.Batch()
}

// Eq returns the equal comparator
func Eq(column string) Cmp {
return qb.Eq(column)
}

// In returns the in comparator
func In(column string) Cmp {
return qb.In(column)
}

// Contains return the contains comparator
func Contains(column string) Cmp {
return qb.Contains(column)
}

// WrapErrorWithKeys wraps the cassandra error to Vald internal error
func WrapErrorWithKeys(err error, keys ...string) error {
switch err {
case ErrNotFound:
Expand Down
11 changes: 11 additions & 0 deletions internal/db/nosql/cassandra/cassandra_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cassandra

import "github.com/gocql/gocql"

type MockClusterConfig struct {
CreateSessionFunc func() (*gocql.Session, error)
}

func (m *MockClusterConfig) CreateSession() (*gocql.Session, error) {
return m.CreateSessionFunc()
}
Loading

0 comments on commit 4a2dc28

Please sign in to comment.