-
Notifications
You must be signed in to change notification settings - Fork 0
/
cassandra.go
118 lines (97 loc) · 2.5 KB
/
cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package datastore
import (
"sync"
"github.com/gocql/gocql"
"github.com/kklab-com/goth-kklogger"
kksecret "github.com/kklab-com/goth-kksecret"
)
var KKCassandraLocker = sync.Mutex{}
var KKCassandraProfiles = sync.Map{}
var KKCassandraDebug = false
var _KKCassandraDebug = sync.Once{}
type Cassandra struct {
name string
writer *CassandraOp
reader *CassandraOp
}
func (k *Cassandra) Writer() *CassandraOp {
return k.writer
}
func (k *Cassandra) Reader() *CassandraOp {
return k.reader
}
type CassandraOp struct {
opType OPType
meta kksecret.CassandraMeta
cluster *gocql.ClusterConfig
session *gocql.Session
opLock sync.Mutex
}
func (k *CassandraOp) Cluster() *gocql.ClusterConfig {
return k.cluster
}
func (k *CassandraOp) Session() *gocql.Session {
if k.session == nil {
k.opLock.Lock()
defer k.opLock.Unlock()
if k.session == nil {
session, err := k.cluster.CreateSession()
if err != nil {
kklogger.ErrorJ("goth-kkdatastore:CassandraOp.Session", err.Error())
return nil
}
k.session = session
}
}
return k.session
}
func KKCassandra(cassandraName string) *Cassandra {
_KKCassandraDebug.Do(func() {
if !KKCassandraDebug {
KKCassandraDebug = _IsKKDatastoreDebug()
}
})
if r, f := KKCassandraProfiles.Load(cassandraName); f && !KKCassandraDebug {
return r.(*Cassandra)
}
profile := kksecret.CassandraProfile(cassandraName)
if profile == nil {
return nil
}
KKCassandraLocker.Lock()
defer KKCassandraLocker.Unlock()
if KKCassandraDebug {
KKCassandraProfiles.Delete(cassandraName)
}
if r, f := KKCassandraProfiles.Load(cassandraName); !f {
csd := &Cassandra{
name: cassandraName,
}
wop := &CassandraOp{}
wop.opType = TypeWriter
wop.meta = profile.Writer
wop.cluster = gocql.NewCluster(wop.meta.Hosts...)
wop.cluster.Authenticator = gocql.PasswordAuthenticator{
Username: wop.meta.Username,
Password: wop.meta.Password,
}
wop.cluster.SslOpts = &gocql.SslOptions{CaPath: wop.meta.CaPath}
wop.cluster.Consistency = gocql.LocalQuorum
csd.writer = wop
rop := &CassandraOp{}
rop.opType = TypeReader
rop.meta = profile.Reader
rop.cluster = gocql.NewCluster(rop.meta.Hosts...)
rop.cluster.Authenticator = gocql.PasswordAuthenticator{
Username: rop.meta.Username,
Password: rop.meta.Password,
}
rop.cluster.SslOpts = &gocql.SslOptions{CaPath: rop.meta.CaPath}
rop.cluster.Consistency = gocql.LocalQuorum
csd.reader = rop
KKCassandraProfiles.Store(cassandraName, csd)
return csd
} else {
return r.(*Cassandra)
}
}