diff --git a/README.md b/README.md index 435d0801..78c52fe9 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ - 类fastdfs - 高性能 (使用leveldb作为kv库) - 高可靠(设计极其简单,使用成熟组件) -- 无中心设计 +- 无中心设计(节点都可以同时读写) # 优点 @@ -154,6 +154,20 @@ cd fastdfs/data && find -type f |xargs -n 1 -I {} curl -F file=@data/{} -F path= ``` +- 如何搭建集群? +``` +一、先下载已编译的可执行文件(用最新版本) +二、运行可执行文件(生成配置) +三、修改配置 + peers:增加对端的http地址 + 检查: + host:自动生成是否正确 + peer_id:集群内是否唯一 +四、重新运行服器 +五、验证服务是否OK +``` + + - 适合海量存储吗? ``` 答案:适合海量存储 diff --git a/fileserver.go b/fileserver.go index 55a63cc1..79526afb 100644 --- a/fileserver.go +++ b/fileserver.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "encoding/base64" "runtime" + random "math/rand" "errors" "flag" @@ -84,7 +85,11 @@ const ( cfgJson = `{ "绑定端号": "端口", "addr": ":8080", + "PeerID": "集群内唯一", + "peer_id": "%s", "集群": "集群列表", + "本主机地址": "本机http地址", + "host":"%s", "peers": ["%s"], "组号": "用于区别不同的集群,带在下载路径中", "group": "group1", @@ -235,6 +240,7 @@ type GloablConfig struct { AutoRepair bool `json:"auto_repair"` Host string `json:"host"` FileSumArithmetic string `json:"file_sum_arithmetic"` + PeerId string `json:"peer_id"` } func NewServer() *Server { @@ -461,6 +467,19 @@ func (this *Common) GetUUID() string { } +func (this *Common) RandInt(min, max int) int { + + return func(min, max int) int { + + r := random.New(random.NewSource(time.Now().UnixNano())) + if min >= max { + return max + } + return r.Intn(max-min) + min + }(min, max) + +} + func (this *Common) GetToDay() string { return time.Now().Format("20060102") @@ -1046,7 +1065,7 @@ func (this *Server) GetServerURI(r *http.Request) string { return fmt.Sprintf("http://%s/", r.Host) } -func (this *Server) CheckFileAndSendToPeer(date string, filename string, is_force_upload bool) { +func (this *Server) CheckFileAndSendToPeer(date string, filename string, isForceUpload bool) { var ( md5set mapset.Set @@ -1076,7 +1095,7 @@ func (this *Server) CheckFileAndSendToPeer(date string, filename string, is_forc continue } if fileInfo, _ := this.GetFileInfoFromLevelDB(md.(string)); fileInfo != nil && fileInfo.Md5 != "" { - if is_force_upload { + if isForceUpload { fileInfo.Peers = []string{} } @@ -1108,7 +1127,7 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) { fi os.FileInfo i int data []byte - fpath string + fpath string ) defer func() { @@ -1138,9 +1157,9 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) { filename = fileInfo.ReName } - fpath=fileInfo.Path + "/" + filename + fpath = fileInfo.Path + "/" + filename if !this.util.FileExists(fpath) { - log.Warn(fmt.Sprintf("file '%s' not found",fpath)) + log.Warn(fmt.Sprintf("file '%s' not found", fpath)) continue } else { if fileInfo.Size == 0 { @@ -1224,10 +1243,10 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) { if v, ok = this.sumMap.GetValue(sumKey); !ok { if sumset, err = this.GetMd5sByDate(logDate, filename); err != nil { log.Error(err) - } else { - sumset = v.(mapset.Set) } - this.sumMap.Put(sumKey, sumset) + if sumset!=nil { + this.sumMap.Put(sumKey, sumset) + } } else { sumset = v.(mapset.Set) if sumset.Cardinality() == 0 { @@ -1337,13 +1356,13 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) { date := "" force := "" - is_force_upload := false + isForceUpload := false force = r.FormValue("force") date = r.FormValue("date") if force == "1" { - is_force_upload = true + isForceUpload = true } if date == "" { @@ -1353,13 +1372,13 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) { } date = strings.Replace(date, ".", "", -1) - if is_force_upload { + if isForceUpload { - go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, is_force_upload) + go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, isForceUpload) } else { - go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, is_force_upload) + go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, isForceUpload) } @@ -1714,9 +1733,9 @@ func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) { p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1) - download_url := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+filename) + downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+filename) - w.Write([]byte(download_url)) + w.Write([]byte(downloadUrl)) } @@ -1759,7 +1778,6 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) { defer uploadFile.Close() - os.MkdirAll(fileInfo.Path, 0775) outPath = fileInfo.Path + "/" + fileInfo.Name @@ -1822,9 +1840,9 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) { p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1) - download_url := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+fileInfo.Name) + downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+fileInfo.Name) - w.Write([]byte(download_url)) + w.Write([]byte(downloadUrl)) } @@ -1906,14 +1924,11 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { // fileInfo.Path = r.Header.Get("Sync-Path") - - - if strings.Contains(r.Host,"127.0.0.1") { + if strings.Contains(r.Host, "127.0.0.1") { w.Write([]byte( "(error) upload use clust ip(peers ip),not 127.0.0.1")) return } - if Config().EnableCustomPath { fileInfo.Path = r.FormValue("path") fileInfo.Path = strings.Trim(fileInfo.Path, "/") @@ -1986,6 +2001,7 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { } folder = time.Now().Format("20060102/15/04") + folder=fmt.Sprintf(folder+"%s",Config().PeerId) if fileInfo.Scene != "" { folder = fmt.Sprintf(STORE_DIR+"/%s/%s", fileInfo.Scene, folder) } else { @@ -2069,12 +2085,12 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { } p := strings.Replace(v.Path, STORE_DIR+"/", "", 1) p = Config().Group + "/" + p + "/" + outname - download_url := fmt.Sprintf("http://%s/%s", r.Host, p) + downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, p) if Config().DownloadDomain != "" { - download_url = fmt.Sprintf("http://%s/%s", Config().DownloadDomain, p) + downloadUrl = fmt.Sprintf("http://%s/%s", Config().DownloadDomain, p) } if output == "json" { - fileResult.Url = download_url + fileResult.Url = downloadUrl fileResult.Md5 = v.Md5 fileResult.Path = "/" + p fileResult.Domain = domain @@ -2092,7 +2108,7 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { } else { - w.Write([]byte(download_url)) + w.Write([]byte(downloadUrl)) } return } @@ -2122,17 +2138,15 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME) } - - p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1) p = Config().Group + "/" + p + "/" + outname - download_url := fmt.Sprintf("http://%s/%s", r.Host, p) + downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, p) if Config().DownloadDomain != "" { - download_url = fmt.Sprintf("http://%s/%s", Config().DownloadDomain, p) + downloadUrl = fmt.Sprintf("http://%s/%s", Config().DownloadDomain, p) } if output == "json" { - fileResult.Url = download_url + fileResult.Url = downloadUrl fileResult.Md5 = fileInfo.Md5 fileResult.Path = "/" + p fileResult.Domain = domain @@ -2149,7 +2163,7 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) { } else { - w.Write([]byte(download_url)) + w.Write([]byte(downloadUrl)) } return @@ -2166,16 +2180,16 @@ func (this *Server) SendToMail(to, subject, body, mailtype string) error { password := Config().Mail.Password hp := strings.Split(host, ":") auth := smtp.PlainAuth("", user, password, hp[0]) - var content_type string + var contentType string if mailtype == "html" { - content_type = "Content-Type: text/" + mailtype + "; charset=UTF-8" + contentType = "Content-Type: text/" + mailtype + "; charset=UTF-8" } else { - content_type = "Content-Type: text/plain" + "; charset=UTF-8" + contentType = "Content-Type: text/plain" + "; charset=UTF-8" } - msg := []byte("To: " + to + "\r\nFrom: " + user + ">\r\nSubject: " + "\r\n" + content_type + "\r\n\r\n" + body) - send_to := strings.Split(to, ";") - err := smtp.SendMail(host, auth, user, send_to, msg) + msg := []byte("To: " + to + "\r\nFrom: " + user + ">\r\nSubject: " + "\r\n" + contentType + "\r\n\r\n" + body) + sendTo := strings.Split(to, ";") + err := smtp.SendMail(host, auth, user, sendTo, msg) return err } @@ -2335,7 +2349,7 @@ func (this *Server) ConsumerDownLoad() { } for _, peer := range fileInfo.Peers { if strings.Contains(peer, "127.0.0.1") { - log.Warn("sync error with 127.0.0.1",fileInfo) + log.Warn("sync error with 127.0.0.1", fileInfo) continue } if peer != this.host { @@ -2376,9 +2390,9 @@ func (this *Server) Consumer() { } -func (this *Server) AutoRepair(force_repair bool) { +func (this *Server) AutoRepair(forceRepair bool) { - AutoRepairFunc := func(force_repair bool) { + AutoRepairFunc := func(forceRepair bool) { var ( dateStats []StatDateFileInfo @@ -2430,7 +2444,7 @@ func (this *Server) AutoRepair(force_repair bool) { if v, ok := this.statMap.GetValue(countKey); ok { switch v.(type) { case int64: - if v.(int64) != dateStat.FileCount || force_repair { //不相等,找差异 + if v.(int64) != dateStat.FileCount || forceRepair { //不相等,找差异 //TODO req := httplib.Post(fmt.Sprintf("%s/get_md5s_by_date", peer)) req.SetTimeout(time.Second*5, time.Second*20) @@ -2477,7 +2491,7 @@ func (this *Server) AutoRepair(force_repair bool) { } - AutoRepairFunc(force_repair) + AutoRepairFunc(forceRepair) } func (this *Server) CleanMd5SumCache() { @@ -2634,15 +2648,15 @@ func (this *Server) Repair(w http.ResponseWriter, r *http.Request) { var ( force string - force_repair bool + forceRepair bool ) r.ParseForm() force = r.FormValue("force") if force == "1" { - force_repair = true + forceRepair = true } if this.IsPeer(r) { - go this.AutoRepair(force_repair) + go this.AutoRepair(forceRepair) w.Write([]byte("repair job start...")) } else { w.Write([]byte(CONST_MESSAGE_CLUSTER_IP)) @@ -2764,11 +2778,13 @@ func init() { } flag.Parse() + peerId := fmt.Sprintf("%d", server.util.RandInt(0, 9)) + if !server.util.FileExists(CONST_CONF_FILE_NAME) { peer := "http://" + server.util.GetPulicIP() + ":8080" - cfg := fmt.Sprintf(cfgJson, peer) + cfg := fmt.Sprintf(cfgJson, peerId, peer,peer) server.util.WriteFile(CONST_CONF_FILE_NAME, cfg) } @@ -2794,12 +2810,16 @@ func init() { Config().QueueSize = CONST_QUEUE_SIZE } + if Config().PeerId == "" { + Config().PeerId = peerId + } + staticHandler = http.StripPrefix("/"+Config().Group+"/", http.FileServer(http.Dir(STORE_DIR))) server.initComponent(false) } -func (this *Server) initComponent(is_reload bool) { +func (this *Server) initComponent(isReload bool) { var ( err error ip string @@ -2809,10 +2829,13 @@ func (this *Server) initComponent(is_reload bool) { ) ip = this.util.GetPulicIP() - if server.host == "" { + if Config().Host == "" { if len(strings.Split(Config().Addr, ":")) == 2 { server.host = fmt.Sprintf("http://%s:%s", ip, strings.Split(Config().Addr, ":")[1]) + Config().Host=server.host } + } else { + server.host= Config().Host } ex, _ := regexp.Compile("\\d+\\.\\d+\\.\\d+\\.\\d+") @@ -2865,7 +2888,7 @@ func (this *Server) initComponent(is_reload bool) { } } - if !is_reload { + if !isReload { FormatStatInfo() } //Timer