From febe6fd1542c70f18e2b8c6d82e7fcb933c36d1e Mon Sep 17 00:00:00 2001 From: liangkai Date: Thu, 10 Sep 2020 12:52:56 +0800 Subject: [PATCH 1/2] fix bug --- directory/directory.go | 15 +- directory/dispatcher.go | 36 +++-- libs/errors/common.go | 3 +- libs/errors/errors.go | 8 +- libs/meta/store.go | 9 ++ libs/meta/volume.go | 1 + pitchfork/conf/config.go | 112 ++++++------- pitchfork/pitchfork.go | 214 +++++++++++++------------ pitchfork/zk/zk.go | 331 ++++++++++++++++++++++----------------- 9 files changed, 404 insertions(+), 325 deletions(-) diff --git a/directory/directory.go b/directory/directory.go index dbcdf5e..a4ad8eb 100644 --- a/directory/directory.go +++ b/directory/directory.go @@ -27,8 +27,7 @@ type Directory struct { storeVolume map[string][]int32 // store_server_id:volume_ids // GROUP - storeGroup map[string]int // store_server_id:group - group map[int][]string // group_id:store_servers + group map[int][]string // group_id:store_servers // VOLUME volume map[int32]*meta.VolumeState // volume_id:volume_state @@ -161,30 +160,28 @@ func (d *Directory) syncGroups() (err error) { str string groups, stores []string group map[int][]string - storeGroup map[string]int ) // get all groups if groups, err = d.zk.Groups(); err != nil { return } group = make(map[int][]string) - storeGroup = make(map[string]int) for _, str = range groups { // get all stores by the group if stores, err = d.zk.GroupStores(str); err != nil { return } + if len(stores) == 0 { + log.Errorf("group:%s is empty", str) + continue + } if gid, err = strconv.Atoi(str); err != nil { log.Errorf("wrong group:%s", str) continue } group[gid] = stores - for _, str = range stores { - storeGroup[str] = gid - } } d.group = group - d.storeGroup = storeGroup return } @@ -218,10 +215,8 @@ func (d *Directory) SyncZookeeper() { select { case <-sev: log.Infof("stores status change or new store") - break case <-time.After(d.config.Zookeeper.PullInterval.Duration): log.Infof("pull from zk") - break } } } diff --git a/directory/dispatcher.go b/directory/dispatcher.go index 412db90..5f48912 100644 --- a/directory/dispatcher.go +++ b/directory/dispatcher.go @@ -13,8 +13,8 @@ import ( // Dispatcher , // get raw data and processed into memory for http reqs type Dispatcher struct { - gids []int // for write eg: gid:1;2 gids: [1,1,2,2,2,2,2] - wrtVids map[string]int32 // choose most suitable written volume, always order by rest space. + gids []int // for write eg: gid:1;2 gids: [1,1,2,2,2,2,2] + wrtVids map[string][]int32 // choose most suitable written volume of dir, always order by rest space. rand *rand.Rand rlock sync.Mutex } @@ -44,7 +44,8 @@ func (d *Dispatcher) Update(group map[int][]string, i int vid int32 gids []int - wrtVids map[string]int32 + wrtVids map[string][]int32 + tmpWrtVids map[string]map[string]int32 sid string stores []string restSpace, minScore, score int @@ -54,7 +55,8 @@ func (d *Dispatcher) Update(group map[int][]string, volumeState *meta.VolumeState ) gids = []int{} - wrtVids = map[string]int32{} + wrtVids = map[string][]int32{} + tmpWrtVids = map[string]map[string]int32{} for gid, stores = range group { write = true // check all stores can writeable by the group. @@ -90,8 +92,13 @@ func (d *Dispatcher) Update(group map[int][]string, totalAddDelay = totalAddDelay + volumeState.TotalWriteDelay // cacl most suitable written vid if volumeState.FreeSpace > minFreeSpace { - if value, ok := wrtVids[sid]; !ok || vid < value { - wrtVids[sid] = vid + v, ok := tmpWrtVids[sid] + if !ok { + tmpWrtVids[sid] = map[string]int32{volumeState.Dir: vid} + } else { + if value, ok := v[volumeState.Dir]; !ok || vid < value { + v[volumeState.Dir] = vid + } } } } @@ -104,8 +111,17 @@ func (d *Dispatcher) Update(group map[int][]string, gids = append(gids, gid) } } - d.gids = gids - d.wrtVids = wrtVids + if len(gids) > 0 { + d.gids = gids + } + for sid, v := range tmpWrtVids { + for _, vid := range v { + wrtVids[sid] = append(wrtVids[sid], vid) + } + } + if len(wrtVids) > 0 { + d.wrtVids = wrtVids + } return } @@ -133,6 +149,7 @@ func (d *Dispatcher) VolumeID(group map[int][]string, storeVolume map[string][]i var ( stores []string gid int + vids []int32 ) if len(d.gids) == 0 { err = errors.ErrStoreNotAvailable @@ -146,6 +163,7 @@ func (d *Dispatcher) VolumeID(group map[int][]string, storeVolume map[string][]i err = errors.ErrZookeeperDataError return } - vid = d.wrtVids[stores[0]] + vids = d.wrtVids[stores[0]] + vid = vids[d.rand.Intn(len(vids))] return } diff --git a/libs/errors/common.go b/libs/errors/common.go index 976f621..e802099 100644 --- a/libs/errors/common.go +++ b/libs/errors/common.go @@ -5,7 +5,7 @@ const ( RetServiceUnavailable = 65533 RetParamErr = 65534 RetInternalErr = 65535 - + RetServiceTimeout = 65536 // needle RetNeedleExist = 5000 ) @@ -15,6 +15,7 @@ var ( ErrParam = Error(RetParamErr) ErrInternal = Error(RetInternalErr) ErrServiceUnavailable = Error(RetServiceUnavailable) + ErrServiceTimeout = Error(RetServiceTimeout) ErrNeedleExist = Error(RetNeedleExist) ) diff --git a/libs/errors/errors.go b/libs/errors/errors.go index 5239384..feb22e4 100644 --- a/libs/errors/errors.go +++ b/libs/errors/errors.go @@ -10,9 +10,11 @@ var ( errorMsg = map[int]string{ /* ========================= Store ========================= */ // common - RetOK: "ok", - RetParamErr: "store param error", - RetInternalErr: "internal server error", + RetOK: "ok", + RetParamErr: "store param error", + RetInternalErr: "internal server error", + RetServiceTimeout: "service timeout", + RetServiceUnavailable: "service unavailable", // api RetUploadMaxFile: "exceed upload max file num", // block diff --git a/libs/meta/store.go b/libs/meta/store.go index 3b7878c..908eaaa 100644 --- a/libs/meta/store.go +++ b/libs/meta/store.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strings" "time" log "github.com/golang/glog" @@ -16,12 +17,14 @@ const ( StoreStatusEnableBit = 31 StoreStatusReadBit = 0 StoreStatusWriteBit = 1 + StoreStatusSyncBit = 2 // status StoreStatusInit = 0 StoreStatusEnable = (1 << StoreStatusEnableBit) StoreStatusRead = StoreStatusEnable | (1 << StoreStatusReadBit) StoreStatusWrite = StoreStatusEnable | (1 << StoreStatusWriteBit) StoreStatusHealth = StoreStatusRead | StoreStatusWrite + StoreStatusSync = StoreStatusEnable | (1 << StoreStatusSyncBit) // 2147483652 StoreStatusFail = StoreStatusEnable // api statAPI = "http://%s/info" @@ -101,10 +104,16 @@ func (s *Store) Info() (vs []*Volume, err error) { ) if req, err = http.NewRequest("GET", url, nil); err != nil { log.Info("http.NewRequest(GET,%s) error(%v)", url, err) + err = errors.ErrServiceTimeout return } if resp, err = _client.Do(req); err != nil { log.Errorf("_client.do(%s) error(%v)", url, err) + if strings.Contains(err.Error(), "connection refused") { + err = errors.ErrServiceUnavailable + } else { + err = errors.ErrServiceTimeout + } return } defer resp.Body.Close() diff --git a/libs/meta/volume.go b/libs/meta/volume.go index a84c44c..7c2cd96 100644 --- a/libs/meta/volume.go +++ b/libs/meta/volume.go @@ -17,4 +17,5 @@ type VolumeState struct { TotalWriteProcessed uint64 `json:"total_write_processed"` TotalWriteDelay uint64 `json:"total_write_delay"` FreeSpace uint32 `json:"free_space"` + Dir string `json:"dir"` } diff --git a/pitchfork/conf/config.go b/pitchfork/conf/config.go index 93b7f9e..916e52f 100644 --- a/pitchfork/conf/config.go +++ b/pitchfork/conf/config.go @@ -1,55 +1,57 @@ -package conf - -import ( - "github.com/BurntSushi/toml" - "io/ioutil" - "os" - "time" -) - -type Config struct { - Store *Store - Zookeeper *Zookeeper -} - -type Store struct { - StoreCheckInterval duration - NeedleCheckInterval duration - RackCheckInterval duration -} - -type Zookeeper struct { - VolumeRoot string - StoreRoot string - PitchforkRoot string - Addrs []string - Timeout duration -} - -// Code to implement the TextUnmarshaler interface for `duration`: -type duration struct { - time.Duration -} - -func (d *duration) UnmarshalText(text []byte) error { - var err error - d.Duration, err = time.ParseDuration(string(text)) - return err -} - -// NewConfig new a config. -func NewConfig(conf string) (c *Config, err error) { - var ( - file *os.File - blob []byte - ) - c = new(Config) - if file, err = os.Open(conf); err != nil { - return - } - if blob, err = ioutil.ReadAll(file); err != nil { - return - } - err = toml.Unmarshal(blob, c) - return -} +package conf + +import ( + "io/ioutil" + "os" + "time" + + "github.com/BurntSushi/toml" +) + +type Config struct { + Store *Store + Zookeeper *Zookeeper +} + +type Store struct { + StoreCheckInterval duration + NeedleCheckInterval duration + RackCheckInterval duration +} + +type Zookeeper struct { + VolumeRoot string + StoreRoot string + PitchforkRoot string + GroupRoot string + Addrs []string + Timeout duration +} + +// Code to implement the TextUnmarshaler interface for `duration`: +type duration struct { + time.Duration +} + +func (d *duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return err +} + +// NewConfig new a config. +func NewConfig(conf string) (c *Config, err error) { + var ( + file *os.File + blob []byte + ) + c = new(Config) + if file, err = os.Open(conf); err != nil { + return + } + if blob, err = ioutil.ReadAll(file); err != nil { + return + } + err = toml.Unmarshal(blob, c) + return +} diff --git a/pitchfork/pitchfork.go b/pitchfork/pitchfork.go index e979b28..99deed2 100644 --- a/pitchfork/pitchfork.go +++ b/pitchfork/pitchfork.go @@ -1,13 +1,17 @@ package main import ( + "encoding/json" + "math/rand" + "sort" + "sync" + "time" + "bfs/libs/errors" "bfs/libs/meta" + "bfs/pitchfork/conf" myzk "bfs/pitchfork/zk" - "encoding/json" - "sort" - "time" log "github.com/golang/glog" "github.com/samuel/go-zookeeper/zk" @@ -56,8 +60,17 @@ func (p *Pitchfork) watch() (res []string, ev <-chan zk.Event, err error) { return } +func (p *Pitchfork) groups() (groups []string, err error) { + if groups, err = p.zk.Groups(); err != nil { + log.Errorf("zk.Groups() error(%v)", err) + return + } + sort.Sort(sort.StringSlice(groups)) + return +} + // watchStores get all the store nodes and set up the watcher in the zookeeper. -func (p *Pitchfork) watchStores() (res []*meta.Store, ev <-chan zk.Event, err error) { +func (p *Pitchfork) watchStores() (res map[string]*meta.Store, ev <-chan zk.Event, err error) { var ( rack, store string racks, stores []string @@ -68,6 +81,7 @@ func (p *Pitchfork) watchStores() (res []*meta.Store, ev <-chan zk.Event, err er log.Errorf("zk.WatchGetStore() error(%v)", err) return } + res = make(map[string]*meta.Store) for _, rack = range racks { if stores, err = p.zk.Stores(rack); err != nil { return @@ -81,26 +95,25 @@ func (p *Pitchfork) watchStores() (res []*meta.Store, ev <-chan zk.Event, err er log.Errorf("json.Unmarshal() error(%v)", err) return } - res = append(res, storeMeta) + res[storeMeta.Id] = storeMeta } } - sort.Sort(meta.StoreList(res)) return } // Probe main flow of pitchfork server. func (p *Pitchfork) Probe() { var ( - stores []*meta.Store + groups []string pitchforks []string + storeMetas map[string]*meta.Store sev <-chan zk.Event pev <-chan zk.Event - stop chan struct{} - store *meta.Store err error + wg sync.WaitGroup ) for { - if stores, sev, err = p.watchStores(); err != nil { + if storeMetas, sev, err = p.watchStores(); err != nil { log.Errorf("watchGetStores() called error(%v)", err) time.Sleep(_retrySleep) continue @@ -110,42 +123,50 @@ func (p *Pitchfork) Probe() { time.Sleep(_retrySleep) continue } - if stores = p.divide(pitchforks, stores); err != nil || len(stores) == 0 { + if groups, err = p.groups(); err != nil { + log.Errorf("get groups() error(%v)", err) time.Sleep(_retrySleep) continue } - stop = make(chan struct{}) - for _, store = range stores { - go p.checkHealth(store, stop) - go p.checkNeedles(store, stop) + if groups = p.divide(pitchforks, groups); err != nil || len(groups) == 0 { + time.Sleep(_retrySleep) + continue + } + wg.Add(len(groups)) + for _, group := range groups { + go p.healthCheck(group, storeMetas, &wg) } + wg.Wait() select { case <-sev: log.Infof("store nodes change, rebalance") case <-pev: log.Infof("pitchfork nodes change, rebalance") + case <-time.After(p.config.Store.RackCheckInterval.Duration): log.Infof("pitchfork poll zk") } - close(stop) } } // divide a set of stores between a set of pitchforks. -func (p *Pitchfork) divide(pitchforks []string, stores []*meta.Store) []*meta.Store { +func (p *Pitchfork) divide(pitchforks []string, groups []string) []string { var ( n, m int ss, ps int first, last int node string - store *meta.Store - sm = make(map[string][]*meta.Store) + sm = make(map[string][]string) ) - ss = len(stores) + ss = len(groups) ps = len(pitchforks) - if ss == 0 || ps == 0 || ss < ps { + if ss == 0 || ps == 0 { return nil } + if ss < ps { + // rand get a group + return []string{groups[rand.Intn(ss)]} + } n = ss / ps m = ss % ps first = 0 @@ -159,109 +180,104 @@ func (p *Pitchfork) divide(pitchforks []string, stores []*meta.Store) []*meta.St if last > ss { last = ss } - for _, store = range stores[first:last] { - sm[node] = append(sm[node], store) - } + sm[node] = append(sm[node], groups[first:last]...) first = last } return sm[p.ID] } -// checkHealth check the store health. -func (p *Pitchfork) checkHealth(store *meta.Store, stop chan struct{}) { +// check the group health. +func (p *Pitchfork) healthCheck(group string, sm map[string]*meta.Store, wg *sync.WaitGroup) { var ( - err error - status, i int - volume *meta.Volume - volumes []*meta.Volume + err error + nodes []string + groupReadOnly = false ) - log.Infof("check_health job start") - for { - select { - case <-stop: - log.Infof("check_health job stop") - return - case <-time.After(p.config.Store.StoreCheckInterval.Duration): - break - } - storeReadOnly := true - status = store.Status - store.Status = meta.StoreStatusHealth - for i = 0; i < _retryCount; i++ { - if volumes, err = store.Info(); err == nil { - break - } - time.Sleep(_retrySleep) + defer func() { + wg.Done() + log.Infof("health check job stop") + }() + log.Infof("health check job start") + if nodes, err = p.zk.GroupStores(group); err != nil { + log.Errorf("zk.GroupStores(%s) error(%v)", group, err) + return + } + stores := make([]*meta.Store, 0, len(nodes)) + for _, sid := range nodes { + if store, ok := sm[sid]; ok { + stores = append(stores, store) } - if err == nil { - for _, volume = range volumes { - if volume.Block.LastErr != nil { - log.Infof("get store block.lastErr:%s host:%s", volume.Block.LastErr, store.Stat) - store.Status = meta.StoreStatusFail - break - } else if !volume.Block.Full() { - log.Infof("block: %s, offset: %d", volume.Block.File, volume.Block.Offset) - storeReadOnly = false - } - if err = p.zk.SetVolumeState(volume); err != nil { - log.Errorf("zk.SetVolumeState() error(%v)", err) - } - } - } else { - log.Errorf("get store info failed, retry host:%s", store.Stat) - store.Status = meta.StoreStatusFail + } + for _, store := range stores { + p.syncStore(store) + if store.Status == meta.StoreStatusRead { + groupReadOnly = true } - if storeReadOnly { + log.Infof("after check, group:%s,store(%+v)", group, store) + } + if groupReadOnly { + for _, store := range stores { store.Status = meta.StoreStatusRead - } - if status != store.Status { if err = p.zk.SetStore(store); err != nil { - log.Errorf("update store zk status failed, retry") - continue + log.Errorf("zk.SetStore(%+v) error(%v)", store, err) } } } } -// checkNeedles check the store health. -func (p *Pitchfork) checkNeedles(store *meta.Store, stop chan struct{}) { +func (p *Pitchfork) syncStore(store *meta.Store) { var ( err error status int volume *meta.Volume volumes []*meta.Volume ) - log.Infof("checkNeedles job start") - for { - select { - case <-stop: - log.Infof("checkNeedles job stop") - return - case <-time.After(p.config.Store.NeedleCheckInterval.Duration): - break + if store.Status == meta.StoreStatusSync { + log.Infof("store status is sync, health check will be ignored") + return + } + storeReadOnly := true + status = store.Status + store.Status = meta.StoreStatusHealth + if volumes, err = store.Info(); err == errors.ErrServiceTimeout { + // ignore timeout and no retry. + log.Errorf("store.Info() err:%v", err) + return + } + if err == errors.ErrServiceUnavailable { + log.Errorf("store.Info() err:%v", err) + store.Status = meta.StoreStatusFail + goto failed + } else if err != nil { + log.Errorf("get store info failed, retry host:%s", store.Stat) + store.Status = meta.StoreStatusFail + goto failed + } + for _, volume = range volumes { + if volume.Block.LastErr != nil { + log.Infof("get store block.lastErr:%s host:%s", volume.Block.LastErr, store.Stat) + store.Status = meta.StoreStatusFail + goto failed + } else if !volume.Block.Full() { + storeReadOnly = false } - if volumes, err = store.Info(); err != nil { - log.Errorf("get store info failed, retry host:%s", store.Stat) - continue + if err = p.zk.UpdateVolumeState(volume); err != nil { + log.Errorf("zk.UpdateVolumeState() error(%v)", err) } - status = store.Status - for _, volume = range volumes { - if err = volume.Block.LastErr; err != nil { - // ignore volume error - break - } - // ignore timeout - if err = store.Head(volume.Id); err == errors.ErrInternal { - store.Status = meta.StoreStatusFail - goto failed - } + } + if storeReadOnly { + store.Status = meta.StoreStatusRead + } + if len(volumes) > 0 { + volume = volumes[rand.Intn(len(volumes))] + if err = store.Head(volume.Id); err == errors.ErrInternal { + store.Status = meta.StoreStatusFail } - failed: - if status != store.Status { - if err = p.zk.SetStore(store); err != nil { - log.Errorf("update store zk status failed, retry") - continue - } + } +failed: + if status != store.Status { + if err = p.zk.SetStore(store); err != nil { + log.Errorf("update store zk status failed, retry") } } } diff --git a/pitchfork/zk/zk.go b/pitchfork/zk/zk.go index cfeaf04..8efd157 100644 --- a/pitchfork/zk/zk.go +++ b/pitchfork/zk/zk.go @@ -1,148 +1,183 @@ -package zk - -import ( - "bfs/libs/meta" - "bfs/pitchfork/conf" - "encoding/json" - "fmt" - log "github.com/golang/glog" - "github.com/samuel/go-zookeeper/zk" - "path" -) - -type Zookeeper struct { - c *zk.Conn - config *conf.Config -} - -// NewZookeeper new a connection to zookeeper. -func NewZookeeper(config *conf.Config) (z *Zookeeper, err error) { - var ( - s <-chan zk.Event - ) - z = &Zookeeper{} - if z.c, s, err = zk.Connect(config.Zookeeper.Addrs, config.Zookeeper.Timeout.Duration); err != nil { - log.Errorf("zk.Connect(\"%v\") error(%v)", config.Zookeeper.Addrs, err) - return - } - z.config = config - go func() { - var e zk.Event - for { - if e = <-s; e.Type == 0 { - return - } - log.Infof("zookeeper get a event: %s", e.State.String()) - } - }() - return -} - -// NewNode create pitchfork node in zk. -func (z *Zookeeper) NewNode(fpath string) (node string, err error) { - if node, err = z.c.Create(path.Join(fpath, "")+"/", []byte(""), int32(zk.FlagEphemeral|zk.FlagSequence), zk.WorldACL(zk.PermAll)); err != nil { - log.Errorf("zk.Create error(%v)", err) - } else { - node = path.Base(node) - } - return -} - -// setRoot update root. -func (z *Zookeeper) setRoot() (err error) { - if _, err = z.c.Set(z.config.Zookeeper.StoreRoot, []byte(""), -1); err != nil { - log.Errorf("zk.Set(\"%s\") error(%v)", z.config.Zookeeper.StoreRoot, err) - } - return -} - -// SetStore update store status. -func (z *Zookeeper) SetStore(s *meta.Store) (err error) { - var ( - data []byte - store = &meta.Store{} - spath = path.Join(z.config.Zookeeper.StoreRoot, s.Rack, s.Id) - ) - if data, _, err = z.c.Get(spath); err != nil { - log.Errorf("zk.Get(\"%s\") error(%v)", spath, err) - return - } - if len(data) > 0 { - if err = json.Unmarshal(data, store); err != nil { - log.Errorf("json.Unmarshal() error(%v)", err) - return - } - } - store.Status = s.Status - if data, err = json.Marshal(store); err != nil { - log.Errorf("json.Marshal() error(%v)", err) - return err - } - if _, err = z.c.Set(spath, data, -1); err != nil { - log.Errorf("zk.Set(\"%s\") error(%v)", spath, err) - return - } - err = z.setRoot() - return -} - -// WatchPitchforks watch pitchfork nodes. -func (z *Zookeeper) WatchPitchforks() (nodes []string, ev <-chan zk.Event, err error) { - if nodes, _, ev, err = z.c.ChildrenW(z.config.Zookeeper.PitchforkRoot); err != nil { - log.Errorf("zk.ChildrenW(\"%s\") error(%v)", z.config.Zookeeper.PitchforkRoot, err) - } - return -} - -// WatchRacks watch the rack nodes. -func (z *Zookeeper) WatchRacks() (nodes []string, ev <-chan zk.Event, err error) { - if nodes, _, ev, err = z.c.ChildrenW(z.config.Zookeeper.StoreRoot); err != nil { - log.Errorf("zk.ChildrenW(\"%s\") error(%v)", z.config.Zookeeper.StoreRoot, err) - } - return -} - -// Stores get all stores from a rack. -func (z *Zookeeper) Stores(rack string) (stores []string, err error) { - var spath = path.Join(z.config.Zookeeper.StoreRoot, rack) - if stores, _, err = z.c.Children(spath); err != nil { - log.Errorf("zk.Children(\"%s\") error(%v)", spath, err) - } - return -} - -// Store get a store node data. -func (z *Zookeeper) Store(rack, store string) (data []byte, err error) { - var spath = path.Join(z.config.Zookeeper.StoreRoot, rack, store) - if data, _, err = z.c.Get(spath); err != nil { - log.Errorf("zk.Get(\"%s\") error(%v)", spath, err) - } - return -} - -// SetVolumeStat set volume stat -func (z *Zookeeper) SetVolumeState(volume *meta.Volume) (err error) { - var ( - d []byte - spath string - vstate = &meta.VolumeState{ - TotalWriteProcessed: volume.Stats.TotalWriteProcessed, - TotalWriteDelay: volume.Stats.TotalWriteDelay, - } - ) - vstate.FreeSpace = volume.Block.FreeSpace() - spath = path.Join(z.config.Zookeeper.VolumeRoot, fmt.Sprintf("%d", volume.Id)) - if d, err = json.Marshal(vstate); err != nil { - log.Errorf("json.Marshal() error(%v)", err) - return - } - if _, err = z.c.Set(spath, d, -1); err != nil { - log.Errorf("zk.Set(\"%s\") error(%v)", spath, err) - } - return -} - -// Close close the zookeeper connection. -func (z *Zookeeper) Close() { - z.c.Close() -} +package zk + +import ( + "encoding/json" + "fmt" + "path" + "path/filepath" + + "bfs/libs/meta" + "bfs/pitchfork/conf" + + log "github.com/golang/glog" + "github.com/samuel/go-zookeeper/zk" +) + +type Zookeeper struct { + c *zk.Conn + config *conf.Config +} + +// NewZookeeper new a connection to zookeeper. +func NewZookeeper(config *conf.Config) (z *Zookeeper, err error) { + var ( + s <-chan zk.Event + ) + z = &Zookeeper{} + if z.c, s, err = zk.Connect(config.Zookeeper.Addrs, config.Zookeeper.Timeout.Duration); err != nil { + log.Errorf("zk.Connect(\"%v\") error(%v)", config.Zookeeper.Addrs, err) + return + } + z.config = config + go func() { + var e zk.Event + for { + if e = <-s; e.Type == 0 { + return + } + log.Infof("zookeeper get a event: %s", e.State.String()) + } + }() + return +} + +// NewNode create pitchfork node in zk. +func (z *Zookeeper) NewNode(fpath string) (node string, err error) { + if node, err = z.c.Create(path.Join(fpath, "")+"/", []byte(""), int32(zk.FlagEphemeral|zk.FlagSequence), zk.WorldACL(zk.PermAll)); err != nil { + log.Errorf("zk.Create error(%v)", err) + } else { + node = path.Base(node) + } + return +} + +// setRoot update root. +func (z *Zookeeper) setRoot() (err error) { + if _, err = z.c.Set(z.config.Zookeeper.StoreRoot, []byte(""), -1); err != nil { + log.Errorf("zk.Set(\"%s\") error(%v)", z.config.Zookeeper.StoreRoot, err) + } + return +} + +// SetStore update store status. +func (z *Zookeeper) SetStore(s *meta.Store) (err error) { + var ( + data []byte + store = &meta.Store{} + spath = path.Join(z.config.Zookeeper.StoreRoot, s.Rack, s.Id) + ) + if data, _, err = z.c.Get(spath); err != nil { + log.Errorf("zk.Get(\"%s\") error(%v)", spath, err) + return + } + if len(data) > 0 { + if err = json.Unmarshal(data, store); err != nil { + log.Errorf("json.Unmarshal() error(%v)", err) + return + } + } + store.Status = s.Status + if data, err = json.Marshal(store); err != nil { + log.Errorf("json.Marshal() error(%v)", err) + return err + } + if _, err = z.c.Set(spath, data, -1); err != nil { + log.Errorf("zk.Set(\"%s\") error(%v)", spath, err) + return + } + err = z.setRoot() + return +} + +// WatchPitchforks watch pitchfork nodes. +func (z *Zookeeper) WatchPitchforks() (nodes []string, ev <-chan zk.Event, err error) { + if nodes, _, ev, err = z.c.ChildrenW(z.config.Zookeeper.PitchforkRoot); err != nil { + log.Errorf("zk.ChildrenW(\"%s\") error(%v)", z.config.Zookeeper.PitchforkRoot, err) + } + return +} + +// WatchRacks watch the rack nodes. +func (z *Zookeeper) WatchRacks() (nodes []string, ev <-chan zk.Event, err error) { + if nodes, _, ev, err = z.c.ChildrenW(z.config.Zookeeper.StoreRoot); err != nil { + log.Errorf("zk.ChildrenW(\"%s\") error(%v)", z.config.Zookeeper.StoreRoot, err) + } + return +} + +// Stores get all stores from a rack. +func (z *Zookeeper) Stores(rack string) (stores []string, err error) { + var spath = path.Join(z.config.Zookeeper.StoreRoot, rack) + if stores, _, err = z.c.Children(spath); err != nil { + log.Errorf("zk.Children(\"%s\") error(%v)", spath, err) + } + return +} + +// Store get a store node data. +func (z *Zookeeper) Store(rack, store string) (data []byte, err error) { + var spath = path.Join(z.config.Zookeeper.StoreRoot, rack, store) + if data, _, err = z.c.Get(spath); err != nil { + log.Errorf("zk.Get(\"%s\") error(%v)", spath, err) + } + return +} + +// UpdateVolumeState update volume state if freeSpace less than old state +func (z *Zookeeper) UpdateVolumeState(volume *meta.Volume) (err error) { + var ( + d []byte + spath string + oldVstate = &meta.VolumeState{} + vstate = &meta.VolumeState{ + TotalWriteProcessed: volume.Stats.TotalWriteProcessed, + TotalWriteDelay: volume.Stats.TotalWriteDelay, + Dir: filepath.Dir(volume.Block.File), + } + ) + vstate.FreeSpace = volume.Block.FreeSpace() + spath = path.Join(z.config.Zookeeper.VolumeRoot, fmt.Sprintf("%d", volume.Id)) + if d, _, err = z.c.Get(spath); err != nil { + log.Errorf("zk.Get(\"%s\") error(%v)", spath, err) + return + } + if len(d) > 0 { + if err = json.Unmarshal(d, oldVstate); err != nil { + log.Errorf("json.Unmarshal() error(%v)", err) + return + } + if oldVstate.FreeSpace < vstate.FreeSpace { + return + } + } + if d, err = json.Marshal(vstate); err != nil { + log.Errorf("json.Marshal() error(%v)", err) + return + } + if _, err = z.c.Set(spath, d, -1); err != nil { + log.Errorf("zk.Set(\"%s\") error(%v)", spath, err) + } + return +} + +// Groups watch the group nodes. +func (z *Zookeeper) Groups() (nodes []string, err error) { + if nodes, _, err = z.c.Children(z.config.Zookeeper.GroupRoot); err != nil { + log.Errorf("zk.Children(\"%s\") error(%v)", z.config.Zookeeper.GroupRoot, err) + } + return +} + +// GroupStores get stores of group +func (z *Zookeeper) GroupStores(group string) (nodes []string, err error) { + var spath = path.Join(z.config.Zookeeper.GroupRoot, group) + if nodes, _, err = z.c.Children(spath); err != nil { + log.Errorf("zk.Children(\"%s\") error(%v)", spath, err) + } + return +} + +// Close close the zookeeper connection. +func (z *Zookeeper) Close() { + z.c.Close() +} From df15c0be73c1947d8c815daaa2369e950953188c Mon Sep 17 00:00:00 2001 From: liangkai Date: Wed, 16 Sep 2020 14:14:44 +0800 Subject: [PATCH 2/2] add sync state in read func --- libs/meta/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/meta/store.go b/libs/meta/store.go index 908eaaa..936443b 100644 --- a/libs/meta/store.go +++ b/libs/meta/store.go @@ -163,5 +163,5 @@ func (s *Store) CanWrite() bool { // CanRead reports whether the store can read. func (s *Store) CanRead() bool { - return s.Status == StoreStatusRead || s.Status == StoreStatusHealth + return s.Status == StoreStatusRead || s.Status == StoreStatusHealth || s.Status == StoreStatusSync }