Skip to content

Commit

Permalink
modify file log
Browse files Browse the repository at this point in the history
  • Loading branch information
sjqzhang committed Feb 23, 2019
1 parent 0bc2483 commit 625242c
Showing 1 changed file with 117 additions and 44 deletions.
161 changes: 117 additions & 44 deletions fileserver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"crypto/md5"
"crypto/rand"
"crypto/sha1"
Expand All @@ -10,12 +11,12 @@ import (
"fmt"
"github.com/astaxie/beego/httplib"
"github.com/deckarep/golang-set"
_ "github.com/eventials/go-tus"
"github.com/json-iterator/go"
log "github.com/sjqzhang/seelog"
"github.com/sjqzhang/tusd"
"github.com/sjqzhang/tusd/filestore"
"github.com/syndtr/goleveldb/leveldb"
_ "github.com/eventials/go-tus"
"io"
"io/ioutil"
slog "log"
Expand All @@ -37,7 +38,6 @@ import (
"strconv"
"strings"
"sync"
"bytes"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -192,6 +192,7 @@ type Server struct {
sumMap *CommonMap //map[string]mapset.Set
queueToPeers chan FileInfo
queueFromPeers chan FileInfo
queueFileLog chan *FileLog
lockMap *CommonMap

curDate string
Expand All @@ -210,6 +211,11 @@ type FileInfo struct {
OffSet int64 `json:"offset"`
}

type FileLog struct {
FileInfo *FileInfo
FileName string
}

type JsonResult struct {
Message string `json:"message"`
Status string `json:"status"`
Expand Down Expand Up @@ -282,6 +288,7 @@ func NewServer() *Server {
lockMap: NewCommonMap(0),
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFromPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFileLog: make(chan *FileLog, 500),
sumMap: NewCommonMap(363 * 3),
}
settins := httplib.BeegoHTTPSettings{
Expand Down Expand Up @@ -1428,6 +1435,12 @@ func (this *Server) CheckFileAndSendToPeer(date string, filename string, isForce
}
}()

//iter:=server.ldb.NewIterator(util.BytesPrefix([]byte(date+"_")),nil)
//for iter.Next() {
// fmt.Println(string(iter.Key()),string(iter.Value()))
//}
//iter.Release()

if md5set, err = this.GetMd5sByDate(date, filename); err != nil {
log.Error(err)
return
Expand Down Expand Up @@ -1577,6 +1590,12 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
}

func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {

this.queueFileLog <- &FileLog{FileInfo: fileInfo, FileName: filename}

}

func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
err error
msg string
Expand All @@ -1589,12 +1608,25 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
sumset mapset.Set
fullpath string
v interface{}
md5Path string
logKey string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("saveFileMd5Log")
log.Error(re)
log.Error(string(buffer))
}
}()

logDate = this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)

sumKey = fmt.Sprintf("%s_%s", logDate, filename)

this.lockMap.LockKey(sumKey)
defer this.lockMap.UnLockKey(sumKey)

if v, ok = this.sumMap.GetValue(sumKey); !ok {
if sumset, err = this.GetMd5sByDate(logDate, filename); err != nil {
log.Error(err)
Expand All @@ -1609,9 +1641,9 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
}
}

if sumset.Contains(fileInfo.Md5) {
return
}
//if sumset.Contains(fileInfo.Md5) { // remove from xxx
// return
//}
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
Expand All @@ -1622,28 +1654,54 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
if _, err = os.Stat(logpath); err != nil {
os.MkdirAll(logpath, 0775)
}
msg = fmt.Sprintf("%s|%d|%d|%s\n", fileInfo.Md5, fileInfo.Size, fileInfo.TimeStamp, fullpath)
if tmpFile, err = os.OpenFile(logpath+"/"+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644); err != nil {
log.Error(err)
return
}
defer tmpFile.Close()
tmpFile.WriteString(msg)

if !sumset.Contains(fileInfo.Md5) {
msg = fmt.Sprintf("%s|%d|%d|%s\n", fileInfo.Md5, fileInfo.Size, fileInfo.TimeStamp, fullpath)
if tmpFile, err = os.OpenFile(logpath+"/"+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644); err != nil {
log.Error(err)
return
}
defer tmpFile.Close()
tmpFile.WriteString(msg)
}
logKey = fmt.Sprintf("%s_%s", logDate, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {

if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); !ok {

this.SaveFileInfoToLevelDB(logKey, fileInfo)

if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, 1)

this.SaveStat()

}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
if filename == CONST_REMOME_Md5_FILE_NAME {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); ok {
this.RemoveKeyFromLevelDB(logKey)
md5Path = this.util.MD5(fullpath)
if err := this.RemoveKeyFromLevelDB(fileInfo.Md5); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
if err = this.RemoveKeyFromLevelDB(md5Path); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, -1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, -fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, -fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, -1)
this.SaveStat()
}
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, 1)

this.SaveStat()
}

sumset.Add(fileInfo.Md5)
Expand Down Expand Up @@ -1766,6 +1824,11 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {

}

func (this *Server) IsExistFromLevelDB(key string) (bool, error) {

return this.ldb.Has([]byte(key), nil)
}

func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
var (
err error
Expand Down Expand Up @@ -2260,7 +2323,6 @@ func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
var (
err error
md5sum string
md5path string
fileInfo *FileInfo
fpath string
delUrl string
Expand All @@ -2282,6 +2344,23 @@ func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
md5sum = this.util.MD5(fpath)
}

if inner != "1" {
for _, peer := range Config().Peers {
delFile := func(peer string, md5sum string, fileInfo *FileInfo) {
delUrl = fmt.Sprintf("%s%s", peer, this.getRequestURI("delete"))
req := httplib.Post(delUrl)
req.Param("md5", md5sum)
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*10)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
go delFile(peer, md5sum, fileInfo)

}
}

if len(md5sum) < 32 {
result.Message = "md5 unvalid"
w.Write([]byte(this.util.JsonEncodePretty(result)))
Expand All @@ -2294,7 +2373,7 @@ func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
}

if fileInfo.OffSet != -1 {
result.Message = "small delete not support"
result.Message = "small file delete not support"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
Expand All @@ -2304,33 +2383,14 @@ func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
name = fileInfo.ReName
}
fpath = fileInfo.Path + "/" + name
md5path = this.util.MD5(fpath)

if fileInfo.Path != "" && this.util.FileExists(DOCKER_DIR+fpath) {
if err = this.RemoveKeyFromLevelDB(fileInfo.Md5); err != nil {
log.Error(err)
}
if err = this.RemoveKeyFromLevelDB(md5path); err != nil {
log.Error(err)
}
this.SaveFileMd5Log(fileInfo, CONST_REMOME_Md5_FILE_NAME)
if err = os.Remove(DOCKER_DIR + fpath); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
} else {
if inner != "1" {
for _, peer := range Config().Peers {
delUrl = fmt.Sprintf("%s%s", peer, this.getRequestURI("delete"))
req := httplib.Post(delUrl)
req.Param("md5", fileInfo.Md5)
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*10)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
}
this.SaveFileMd5Log(fileInfo, CONST_REMOME_Md5_FILE_NAME)
result.Message = "remove success"
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
Expand Down Expand Up @@ -2943,6 +3003,18 @@ func (this *Server) ConsumerDownLoad() {

}

func (this *Server) ConsumerLog() {
go func() {
var (
fileLog *FileLog
)
for {
fileLog = <-this.queueFileLog
this.saveFileMd5Log(fileLog.FileInfo, fileLog.FileName)
}
}()
}

func (this *Server) Consumer() {

ConsumerFunc := func() {
Expand Down Expand Up @@ -3874,6 +3946,7 @@ func (this *Server) Main() {
go this.CleanMd5SumCache()
go this.Check()
go this.Consumer()
go this.ConsumerLog()
go this.ConsumerDownLoad()
if Config().AutoRepair {
go func() {
Expand Down

0 comments on commit 625242c

Please sign in to comment.