Skip to content

Commit

Permalink
Support switch storage engine by prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
dispensable committed Aug 17, 2023
1 parent c96c6b4 commit 9292ff9
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 41 deletions.
26 changes: 13 additions & 13 deletions .doubanpde/pde.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ spec:
name: var-run-nscd
readOnly: true
workingDir: /home/project
- name: mc
image: docker.douban/memcached:latest
workingDir: /
# - name: mc
# image: docker.douban/memcached:latest
# workingDir: /
{{- range (mkSlice 57980 57981 57982 57983) }}
- name: beansdb-{{ . }}
image: docker.douban/platform/gobeansdb:latest
Expand All @@ -64,20 +64,20 @@ spec:
name: beansdb-{{ . }}-cfg-dir
{{- end }}
- name: cassandra
image: docker.douban/wangqiang/cassandra:latest
image: docker.douban/dba/cassandra:4.1.2
workingDir: /
volumeMounts:
- mountPath: /var/lib/cassandra/
name: cassandra-data-dir
- mountPath: /tmp/cassandra/
name: cassandra-cfg
command:
- "/bin/bash"
args:
- "-c"
- >
cp -rfv /tmp/cassandra/cassandra.yaml /etc/cassandra/ &&
/usr/local/bin/docker-entrypoint.sh cassandra -f
# - mountPath: /tmp/cassandra/
# name: cassandra-cfg
# command:
# - "/bin/bash"
# args:
# - "-c"
# - >
# cp -rfv /tmp/cassandra/cassandra.yaml /etc/cassandra/ &&
# /usr/local/bin/docker-entrypoint.sh cassandra -f
restartPolicy: Never
volumes:
- hostPath:
Expand Down
16 changes: 11 additions & 5 deletions cassandra/cstar.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ func NewCassandraStore(cstarCfg *config.CassandraStoreCfg) (*CassandraStore, err
}
}
cluster.Keyspace = cstarCfg.DefaultKeySpace
cluster.Consistency = gocql.Quorum

switch cstarCfg.Consistency {
case "local_one":
cluster.Consistency = gocql.LocalOne
default:
cluster.Consistency = gocql.Quorum
}

cluster.ReconnectInterval = time.Duration(cstarCfg.ReconnectIntervalSec) * time.Second
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: cstarCfg.RetryNum}
cluster.Timeout = time.Duration(cstarCfg.CstarTimeoutMs) * time.Millisecond
Expand Down Expand Up @@ -127,11 +134,10 @@ func (c *CassandraStore) Get(key string) (*mc.Item, error) {
}
}

func (c *CassandraStore) GetMulti(keys []string) (map[string]*mc.Item, error) {
func (c *CassandraStore) GetMulti(keys []string, result map[string]*mc.Item) error {
// not using IN for this reason
// https://stackoverflow.com/questions/26999098/is-the-in-relation-in-cassandra-bad-for-queries

result := map[string]*mc.Item{}
lock := sync.Mutex{}

ctx := context.Background()
Expand Down Expand Up @@ -160,8 +166,8 @@ func (c *CassandraStore) GetMulti(keys []string) (map[string]*mc.Item, error) {
if err := g.Wait(); err != nil {
logger.Errorf("getm %s err: %s", keys, err)
}
return result, nil

return nil
}

func (c *CassandraStore) SetWithValue(key string, v *BDBValue) (ok bool, err error) {
Expand Down
141 changes: 141 additions & 0 deletions cassandra/prefix_switch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package cassandra

import (
"fmt"
"sync"

"github.com/douban/gobeansproxy/config"
"github.com/viant/ptrie"
)

type PrefixSwitchStatus int

const (
// bdb r/w c* disable
PrefixSwitchBrw PrefixSwitchStatus = 0
// bdb r/w c* w
PrefixSwitchBrwCw PrefixSwitchStatus = 1
// bdb w c* r/w
PrefixSwitchBwCrw PrefixSwitchStatus = 2
// c* rw bdb disable
PrefixSwitchCrw PrefixSwitchStatus = 3
statusBrw string = "br1w1cr0w0"
statusBrwCw string = "br1w1cr0w1"
statusBwCrw string = "br0w1cr1w1"
statusCrw string = "br0w0cr1w1"
)

type PrefixSwitcher struct {
trie *ptrie.Trie
defaultT PrefixSwitchStatus
lock sync.RWMutex
}

func strToSwitchStatus(s string) (PrefixSwitchStatus, error) {
switch s {
case statusBrw:
return PrefixSwitchBrw, nil
case statusBrwCw:
return PrefixSwitchBrwCw, nil
case statusBwCrw:
return PrefixSwitchBwCrw, nil
case statusCrw:
return PrefixSwitchCrw, nil
default:
return -1, fmt.Errorf("Unsupported switch type of %s", s)
}
}

func GetPrefixSwitchTrieFromCfg(cfg *config.CassandraStoreCfg) (*ptrie.Trie, error) {
s2k := cfg.SwitchToKeyPrefixes
prefixTrie := ptrie.New()

for s, kprefixs := range s2k {
status, err := strToSwitchStatus(s)
if err != nil {
return nil, err
}

for _, prefix := range kprefixs {
err := prefixTrie.Put([]byte(prefix), int(status))
if err != nil {
return nil, err
}
}
}

return &prefixTrie, nil
}

func NewPrefixSwitcher(config *config.CassandraStoreCfg) (*PrefixSwitcher, error) {
prefixTrie, err := GetPrefixSwitchTrieFromCfg(config)
if err != nil {
return nil, err
}

f := new(PrefixSwitcher)
f.trie = prefixTrie
f.defaultT = PrefixSwitchBrw
return f, nil
}

func (s *PrefixSwitcher) GetStatus(key string) PrefixSwitchStatus {
s.lock.RLock()
defer s.lock.RUnlock()

var result int
v := (*(s.trie)).MatchPrefix([]byte(key), func(key []byte, value interface{}) bool {
result = value.(int)
return true
})

if !v {
return s.defaultT
}
return PrefixSwitchStatus(result)
}

// check key prefix and return bdb read enable c* read enable
func (s *PrefixSwitcher) ReadEnabledOn(key string) (bool, bool) {
status := s.GetStatus(key)
return (status == PrefixSwitchBrw || status == PrefixSwitchBrwCw),
(status == PrefixSwitchCrw || status == PrefixSwitchBwCrw)
}

// check keys prefix list and return bdb read keys and c* read keys
func (s *PrefixSwitcher) ReadEnableOnKeys(keys []string) (bkeys []string, ckeys []string) {
for _, k := range keys {
b, c := s.ReadEnabledOn(k)
if b {
bkeys = append(bkeys, k)
}

if c {
ckeys = append(ckeys, k)
}
}
return
}

// check key prefix and return bdb write enable c* write enable
func (s *PrefixSwitcher) WriteEnabledOn(key string) (bool, bool) {
status := s.GetStatus(key)
return (status == PrefixSwitchBrw || status == PrefixSwitchBrwCw || status == PrefixSwitchBwCrw),
(status == PrefixSwitchCrw || status == PrefixSwitchBrwCw || status == PrefixSwitchBwCrw)
}

func (s *PrefixSwitcher) LoadCfg(cfgDir string) error {
cfg := config.ProxyConfig{}
cfg.InitDefault()
cfg.Load(cfgDir)

pTrie, err := GetPrefixSwitchTrieFromCfg(&cfg.CassandraStoreCfg)
if err != nil {
return err
}

s.lock.Lock()
defer s.lock.Unlock()
s.trie = pTrie
return nil
}
File renamed without changes.
File renamed without changes.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ProxyConfig struct {
dbcfg.MCConfig `yaml:"mc,omitempty"`
DStoreConfig `yaml:"dstore,omitempty"`
CassandraStoreCfg `yaml:"cassandra,omitempty"`
Confdir string
}

type DStoreConfig struct {
Expand Down Expand Up @@ -60,7 +61,9 @@ type CassandraStoreCfg struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
PasswordFile string `yaml:"password_file"`
Consistency string `yaml:"consistency,omitempty"`
TableToKeyPrefix map[string][]string `yaml:"table_to_keyprefix"`
SwitchToKeyPrefixes map[string][]string `yaml:"switch_to_keyprefixes"`
}

func (c *ProxyConfig) InitDefault() {
Expand Down Expand Up @@ -106,6 +109,7 @@ func (c *ProxyConfig) Load(confdir string) {
Route = route
checkConfig(c, Route)
}
c.Confdir = confdir
dbutils.InitSizesPointer(c)
c.ConfigPackages()
}
Expand Down
3 changes: 3 additions & 0 deletions deb-req.d/dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
less
python3-pip
python-is-python3
Loading

0 comments on commit 9292ff9

Please sign in to comment.