Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug #87

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type Directory struct {
storeVolume map[string][]int32 // store_server_id:volume_ids

// GROUP
storeGroup map[string]int // store_server_id:group
group map[int][]string // group_id:store_servers
group map[int][]string // group_id:store_servers

// VOLUME
volume map[int32]*meta.VolumeState // volume_id:volume_state
Expand Down Expand Up @@ -161,30 +160,28 @@ func (d *Directory) syncGroups() (err error) {
str string
groups, stores []string
group map[int][]string
storeGroup map[string]int
)
// get all groups
if groups, err = d.zk.Groups(); err != nil {
return
}
group = make(map[int][]string)
storeGroup = make(map[string]int)
for _, str = range groups {
// get all stores by the group
if stores, err = d.zk.GroupStores(str); err != nil {
return
}
if len(stores) == 0 {
log.Errorf("group:%s is empty", str)
continue
}
if gid, err = strconv.Atoi(str); err != nil {
log.Errorf("wrong group:%s", str)
continue
}
group[gid] = stores
for _, str = range stores {
storeGroup[str] = gid
}
}
d.group = group
d.storeGroup = storeGroup
return
}

Expand Down Expand Up @@ -218,10 +215,8 @@ func (d *Directory) SyncZookeeper() {
select {
case <-sev:
log.Infof("stores status change or new store")
break
case <-time.After(d.config.Zookeeper.PullInterval.Duration):
log.Infof("pull from zk")
break
}
}
}
Expand Down
36 changes: 27 additions & 9 deletions directory/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
// Dispatcher ,
// get raw data and processed into memory for http reqs
type Dispatcher struct {
gids []int // for write eg: gid:1;2 gids: [1,1,2,2,2,2,2]
wrtVids map[string]int32 // choose most suitable written volume, always order by rest space.
gids []int // for write eg: gid:1;2 gids: [1,1,2,2,2,2,2]
wrtVids map[string][]int32 // choose most suitable written volume of dir, always order by rest space.
rand *rand.Rand
rlock sync.Mutex
}
Expand Down Expand Up @@ -44,7 +44,8 @@ func (d *Dispatcher) Update(group map[int][]string,
i int
vid int32
gids []int
wrtVids map[string]int32
wrtVids map[string][]int32
tmpWrtVids map[string]map[string]int32
sid string
stores []string
restSpace, minScore, score int
Expand All @@ -54,7 +55,8 @@ func (d *Dispatcher) Update(group map[int][]string,
volumeState *meta.VolumeState
)
gids = []int{}
wrtVids = map[string]int32{}
wrtVids = map[string][]int32{}
tmpWrtVids = map[string]map[string]int32{}
for gid, stores = range group {
write = true
// check all stores can writeable by the group.
Expand Down Expand Up @@ -90,8 +92,13 @@ func (d *Dispatcher) Update(group map[int][]string,
totalAddDelay = totalAddDelay + volumeState.TotalWriteDelay
// cacl most suitable written vid
if volumeState.FreeSpace > minFreeSpace {
if value, ok := wrtVids[sid]; !ok || vid < value {
wrtVids[sid] = vid
v, ok := tmpWrtVids[sid]
if !ok {
tmpWrtVids[sid] = map[string]int32{volumeState.Dir: vid}
} else {
if value, ok := v[volumeState.Dir]; !ok || vid < value {
v[volumeState.Dir] = vid
}
}
}
}
Expand All @@ -104,8 +111,17 @@ func (d *Dispatcher) Update(group map[int][]string,
gids = append(gids, gid)
}
}
d.gids = gids
d.wrtVids = wrtVids
if len(gids) > 0 {
d.gids = gids
}
for sid, v := range tmpWrtVids {
for _, vid := range v {
wrtVids[sid] = append(wrtVids[sid], vid)
}
}
if len(wrtVids) > 0 {
d.wrtVids = wrtVids
}
return
}

Expand Down Expand Up @@ -133,6 +149,7 @@ func (d *Dispatcher) VolumeID(group map[int][]string, storeVolume map[string][]i
var (
stores []string
gid int
vids []int32
)
if len(d.gids) == 0 {
err = errors.ErrStoreNotAvailable
Expand All @@ -146,6 +163,7 @@ func (d *Dispatcher) VolumeID(group map[int][]string, storeVolume map[string][]i
err = errors.ErrZookeeperDataError
return
}
vid = d.wrtVids[stores[0]]
vids = d.wrtVids[stores[0]]
vid = vids[d.rand.Intn(len(vids))]
return
}
3 changes: 2 additions & 1 deletion libs/errors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const (
RetServiceUnavailable = 65533
RetParamErr = 65534
RetInternalErr = 65535

RetServiceTimeout = 65536
// needle
RetNeedleExist = 5000
)
Expand All @@ -15,6 +15,7 @@ var (
ErrParam = Error(RetParamErr)
ErrInternal = Error(RetInternalErr)
ErrServiceUnavailable = Error(RetServiceUnavailable)
ErrServiceTimeout = Error(RetServiceTimeout)

ErrNeedleExist = Error(RetNeedleExist)
)
8 changes: 5 additions & 3 deletions libs/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ var (
errorMsg = map[int]string{
/* ========================= Store ========================= */
// common
RetOK: "ok",
RetParamErr: "store param error",
RetInternalErr: "internal server error",
RetOK: "ok",
RetParamErr: "store param error",
RetInternalErr: "internal server error",
RetServiceTimeout: "service timeout",
RetServiceUnavailable: "service unavailable",
// api
RetUploadMaxFile: "exceed upload max file num",
// block
Expand Down
11 changes: 10 additions & 1 deletion libs/meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

log "github.com/golang/glog"
Expand All @@ -16,12 +17,14 @@ const (
StoreStatusEnableBit = 31
StoreStatusReadBit = 0
StoreStatusWriteBit = 1
StoreStatusSyncBit = 2
// status
StoreStatusInit = 0
StoreStatusEnable = (1 << StoreStatusEnableBit)
StoreStatusRead = StoreStatusEnable | (1 << StoreStatusReadBit)
StoreStatusWrite = StoreStatusEnable | (1 << StoreStatusWriteBit)
StoreStatusHealth = StoreStatusRead | StoreStatusWrite
StoreStatusSync = StoreStatusEnable | (1 << StoreStatusSyncBit) // 2147483652
StoreStatusFail = StoreStatusEnable
// api
statAPI = "http://%s/info"
Expand Down Expand Up @@ -101,10 +104,16 @@ func (s *Store) Info() (vs []*Volume, err error) {
)
if req, err = http.NewRequest("GET", url, nil); err != nil {
log.Info("http.NewRequest(GET,%s) error(%v)", url, err)
err = errors.ErrServiceTimeout
return
}
if resp, err = _client.Do(req); err != nil {
log.Errorf("_client.do(%s) error(%v)", url, err)
if strings.Contains(err.Error(), "connection refused") {
err = errors.ErrServiceUnavailable
} else {
err = errors.ErrServiceTimeout
}
return
}
defer resp.Body.Close()
Expand Down Expand Up @@ -154,5 +163,5 @@ func (s *Store) CanWrite() bool {

// CanRead reports whether the store can read.
func (s *Store) CanRead() bool {
return s.Status == StoreStatusRead || s.Status == StoreStatusHealth
return s.Status == StoreStatusRead || s.Status == StoreStatusHealth || s.Status == StoreStatusSync
}
1 change: 1 addition & 0 deletions libs/meta/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type VolumeState struct {
TotalWriteProcessed uint64 `json:"total_write_processed"`
TotalWriteDelay uint64 `json:"total_write_delay"`
FreeSpace uint32 `json:"free_space"`
Dir string `json:"dir"`
}
112 changes: 57 additions & 55 deletions pitchfork/conf/config.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,57 @@
package conf

import (
"github.com/BurntSushi/toml"
"io/ioutil"
"os"
"time"
)

type Config struct {
Store *Store
Zookeeper *Zookeeper
}

type Store struct {
StoreCheckInterval duration
NeedleCheckInterval duration
RackCheckInterval duration
}

type Zookeeper struct {
VolumeRoot string
StoreRoot string
PitchforkRoot string
Addrs []string
Timeout duration
}

// Code to implement the TextUnmarshaler interface for `duration`:
type duration struct {
time.Duration
}

func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

// NewConfig new a config.
func NewConfig(conf string) (c *Config, err error) {
var (
file *os.File
blob []byte
)
c = new(Config)
if file, err = os.Open(conf); err != nil {
return
}
if blob, err = ioutil.ReadAll(file); err != nil {
return
}
err = toml.Unmarshal(blob, c)
return
}
package conf

import (
"io/ioutil"
"os"
"time"

"github.com/BurntSushi/toml"
)

type Config struct {
Store *Store
Zookeeper *Zookeeper
}

type Store struct {
StoreCheckInterval duration
NeedleCheckInterval duration
RackCheckInterval duration
}

type Zookeeper struct {
VolumeRoot string
StoreRoot string
PitchforkRoot string
GroupRoot string
Addrs []string
Timeout duration
}

// Code to implement the TextUnmarshaler interface for `duration`:
type duration struct {
time.Duration
}

func (d *duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

// NewConfig new a config.
func NewConfig(conf string) (c *Config, err error) {
var (
file *os.File
blob []byte
)
c = new(Config)
if file, err = os.Open(conf); err != nil {
return
}
if blob, err = ioutil.ReadAll(file); err != nil {
return
}
err = toml.Unmarshal(blob, c)
return
}
Loading