diff --git a/README.md b/README.md index 5df9677f..014649e2 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ # go-fastdfs是一个基于http协议的分布式文件系统,它基于大道至简的设计理念,一切从简设计,使得它的运维及扩展变得更加简单,它具有高性能、高可靠、无中心、免维护等优点。 -### 大家担心的是这么简单的文件系统,靠不靠谱,可不可以用于生产环境?答案是肯定的,正因为简单所以稳定。如果你担心功能,那就跑单元测试,如果担心性能,那就跑压力测试,项目都自带了,跑一跑更放心^_^。 +### 大家担心的是这么简单的文件系统,靠不靠谱,可不可以用于生产环境?答案是肯定的,正因为简单所以高效,因为简单所以稳定。如果你担心功能,那就跑单元测试,如果担心性能,那就跑压力测试,项目都自带了,跑一跑更放心^_^。 - 支持curl命令上传 @@ -44,6 +44,7 @@ - 支持集群监控邮件告警 - 支持小文件自动合并(减少inode占用) - 支持秒传 +- 低资源开销 - 支持断点续传([tus](https://tus.io/)) - 支持docker部署 - 支持token下载 token=md5(file_md5+timestamp) @@ -324,6 +325,29 @@ sts["Fs.ErrorSetSize"] = this.errorset.Cardinality() 这个会导致内存增 ``` + +- 如何编译? +``` +git clone https://github.com/sjqzhang/go-fastdfs.git +cd go-fastdfs +mv vendor src +pwd=`pwd` +GOPATH=$pwd go build -o fileserver fileserver.go +``` + +- 如何跑单元测试 (尽量在linux下进行)? +``` + +git clone https://github.com/sjqzhang/go-fastdfs.git +cd go-fastdfs +mv vendor src +pwd=`pwd` +GOPATH=$pwd go test -v fileserver.go fileserver_test.go + +``` + + + - 如何压测? ``` 先用gen_file.py产生大量文件(注意如果要生成大文件,自已在内容中乘上一个大的数即可) @@ -412,6 +436,7 @@ issue中chengyuansen同学向我提议使用增加扩容特性,我觉得对代 ``` + - 访问限制问题 ``` 出于安全考虑,管理API只能在群集内部调用或者用127.0.0.1调用. diff --git a/fileserver.go b/fileserver.go index d58474a4..d97d4ab5 100644 --- a/fileserver.go +++ b/fileserver.go @@ -15,6 +15,7 @@ import ( "github.com/sjqzhang/tusd" "github.com/sjqzhang/tusd/filestore" "github.com/syndtr/goleveldb/leveldb" + _ "github.com/eventials/go-tus" "io" "io/ioutil" slog "log" @@ -137,7 +138,7 @@ const ( "组号": "用于区别不同的集群(上传或下载)与support_group_upload配合使用,带在下载路径中", "group": "group1", "是否合并小文件": "默认不合并,合并可以解决inode不够用的情况(当前对于小于1M文件)进行合并", - "enable_merge_small_file": true, + "enable_merge_small_file": false, "重试同步失败文件的时间": "单位秒", "refresh_interval": 1800, "是否自动重命名": "默认不自动重命名,使用原文件名", diff --git a/fileserver_test.go b/fileserver_test.go index 354ecab0..4bca204a 100644 --- a/fileserver_test.go +++ b/fileserver_test.go @@ -12,205 +12,279 @@ import ( ) const ( - CONST_SMALL_FILE_NAME="small.txt" - CONST_BIG_FILE_NAME="big.txt" - CONST_DOWNLOAD_BIG_FILE_NAME="big_dowload.txt" - CONST_DOWNLOAD_SMALL_FILE_NAME="small_dowload.txt" + CONST_SMALL_FILE_NAME = "small.txt" + CONST_BIG_FILE_NAME = "big.txt" + CONST_DOWNLOAD_BIG_FILE_NAME = "big_dowload.txt" + CONST_DOWNLOAD_SMALL_FILE_NAME = "small_dowload.txt" ) -var testUtil=Common{} +var testUtil = Common{} +var endPoint = "http://127.0.0.1:8080" -var testSmallFileMd5="" -var testBigFileMd5="" +var testCfg *GloablConfig -func init() { +var testSmallFileMd5 = "" +var testBigFileMd5 = "" + +func init() { var ( err error ) - smallBytes:=make([]byte,1025*512) - for i:=0;i This is not a full protocol client implementation. + +Checksum, Termination and Concatenation extensions are not implemented yet. + +This client allows to resume an upload if a Store is used. + +## Built in Store + +Store is used to map an upload's fingerprint with the corresponding upload URL. + +| Name | Backend | Dependencies | +|:----:|:-------:|:------------:| +| MemoryStore | In-Memory | None | +| LeveldbStore | LevelDB | [goleveldb](https://github.com/syndtr/goleveldb) | + +## Future Work + +- [ ] SQLite store +- [ ] Redis store +- [ ] Memcached store +- [ ] Checksum extension +- [ ] Termination extension +- [ ] Concatenation extension diff --git a/vendor/github.com/eventials/go-tus/client.go b/vendor/github.com/eventials/go-tus/client.go new file mode 100644 index 00000000..0b0f8625 --- /dev/null +++ b/vendor/github.com/eventials/go-tus/client.go @@ -0,0 +1,260 @@ +package tus + +import ( + "io" + "io/ioutil" + "net/http" + netUrl "net/url" + "strconv" +) + +const ( + ProtocolVersion = "1.0.0" +) + +// Client represents the tus client. +// You can use it in goroutines to create parallels uploads. +type Client struct { + Config *Config + Url string + Version string + Header http.Header + + client *http.Client +} + +// NewClient creates a new tus client. +func NewClient(url string, config *Config) (*Client, error) { + if config == nil { + config = DefaultConfig() + } else { + if err := config.Validate(); err != nil { + return nil, err + } + } + + if config.Header == nil { + config.Header = make(http.Header) + } + + var c *http.Client + + if config.Transport == nil { + c = &http.Client{} + } else { + c = &http.Client{ + Transport: config.Transport, + } + } + + return &Client{ + Config: config, + Url: url, + Version: ProtocolVersion, + Header: config.Header, + + client: c, + }, nil +} + +func (c *Client) Do(req *http.Request) (*http.Response, error) { + for k, v := range c.Header { + req.Header[k] = v + } + + req.Header.Set("Tus-Resumable", ProtocolVersion) + + return c.client.Do(req) +} + +// CreateUpload creates a new upload in the server. +func (c *Client) CreateUpload(u *Upload) (*Uploader, error) { + if u == nil { + return nil, ErrNilUpload + } + + if c.Config.Resume && len(u.Fingerprint) == 0 { + return nil, ErrFingerprintNotSet + } + + req, err := http.NewRequest("POST", c.Url, nil) + + if err != nil { + return nil, err + } + + req.Header.Set("Content-Length", "0") + req.Header.Set("Upload-Length", strconv.FormatInt(u.size, 10)) + req.Header.Set("Upload-Metadata", u.EncodedMetadata()) + + res, err := c.Do(req) + + if err != nil { + return nil, err + } + defer res.Body.Close() + + switch res.StatusCode { + case 201: + url := res.Header.Get("Location") + + baseUrl, err := netUrl.Parse(c.Url) + if err != nil { + return nil, ErrUrlNotRecognized + } + + newUrl, err := netUrl.Parse(url) + if err != nil { + return nil, ErrUrlNotRecognized + } + if newUrl.Scheme == "" { + newUrl.Scheme = baseUrl.Scheme + url = newUrl.String() + } + + if c.Config.Resume { + c.Config.Store.Set(u.Fingerprint, url) + } + + return NewUploader(c, url, u, 0), nil + case 412: + return nil, ErrVersionMismatch + case 413: + return nil, ErrLargeUpload + default: + return nil, newClientError(res) + } +} + +// ResumeUpload resumes the upload if already created, otherwise it will return an error. +func (c *Client) ResumeUpload(u *Upload) (*Uploader, error) { + if u == nil { + return nil, ErrNilUpload + } + + if !c.Config.Resume { + return nil, ErrResumeNotEnabled + } else if len(u.Fingerprint) == 0 { + return nil, ErrFingerprintNotSet + } + + url, found := c.Config.Store.Get(u.Fingerprint) + + if !found { + return nil, ErrUploadNotFound + } + + offset, err := c.getUploadOffset(url) + + if err != nil { + return nil, err + } + + return NewUploader(c, url, u, offset), nil +} + +// CreateOrResumeUpload resumes the upload if already created or creates a new upload in the server. +func (c *Client) CreateOrResumeUpload(u *Upload) (*Uploader, error) { + if u == nil { + return nil, ErrNilUpload + } + + uploader, err := c.ResumeUpload(u) + + if err == nil { + return uploader, err + } else if (err == ErrResumeNotEnabled) || (err == ErrUploadNotFound) { + return c.CreateUpload(u) + } + + return nil, err +} + +func (c *Client) uploadChunck(url string, body io.Reader, size int64, offset int64) (int64, error) { + var method string + + if !c.Config.OverridePatchMethod { + method = "PATCH" + } else { + method = "POST" + } + + req, err := http.NewRequest(method, url, body) + + if err != nil { + return -1, err + } + + req.Header.Set("Content-Type", "application/offset+octet-stream") + req.Header.Set("Content-Length", strconv.FormatInt(size, 10)) + req.Header.Set("Upload-Offset", strconv.FormatInt(offset, 10)) + + if c.Config.OverridePatchMethod { + req.Header.Set("X-HTTP-Method-Override", "PATCH") + } + + res, err := c.Do(req) + + if err != nil { + return -1, err + } + defer res.Body.Close() + + switch res.StatusCode { + case 204: + if newOffset, err := strconv.ParseInt(res.Header.Get("Upload-Offset"), 10, 64); err == nil { + return newOffset, nil + } else { + return -1, err + } + case 409: + return -1, ErrOffsetMismatch + case 412: + return -1, ErrVersionMismatch + case 413: + return -1, ErrLargeUpload + default: + return -1, newClientError(res) + } +} + +func (c *Client) getUploadOffset(url string) (int64, error) { + req, err := http.NewRequest("HEAD", url, nil) + + if err != nil { + return -1, err + } + + res, err := c.Do(req) + + if err != nil { + return -1, err + } + defer res.Body.Close() + + switch res.StatusCode { + case 200: + i, err := strconv.ParseInt(res.Header.Get("Upload-Offset"), 10, 64) + + if err == nil { + return i, nil + } else { + return -1, err + } + case 403, 404, 410: + // file doesn't exists. + return -1, ErrUploadNotFound + case 412: + return -1, ErrVersionMismatch + default: + return -1, newClientError(res) + } +} + +func newClientError(res *http.Response) ClientError { + body, _ := ioutil.ReadAll(res.Body) + return ClientError{ + Code: res.StatusCode, + Body: body, + } +} diff --git a/vendor/github.com/eventials/go-tus/config.go b/vendor/github.com/eventials/go-tus/config.go new file mode 100644 index 00000000..45ad9354 --- /dev/null +++ b/vendor/github.com/eventials/go-tus/config.go @@ -0,0 +1,48 @@ +package tus + +import ( + "net/http" +) + +// Config provides a way to configure the Client depending on your needs. +type Config struct { + // ChunkSize divide the file into chunks. + ChunkSize int64 + // Resume enables resumable upload. + Resume bool + // OverridePatchMethod allow to by pass proxies sendind a POST request instead of PATCH. + OverridePatchMethod bool + // Store map an upload's fingerprint with the corresponding upload URL. + // If Resume is true the Store is required. + Store Store + // Set custom header values used in all requests. + Header http.Header + // Set custom Transport settings. + // Use this if you ahe behind a proxy. + Transport *http.Transport +} + +// DefaultConfig return the default Client configuration. +func DefaultConfig() *Config { + return &Config{ + ChunkSize: 2 * 1024 * 1024, + Resume: false, + OverridePatchMethod: false, + Store: nil, + Header: make(http.Header), + Transport: nil, + } +} + +// Validate validates the custom configuration. +func (c *Config) Validate() error { + if c.ChunkSize < 1 { + return ErrChuckSize + } + + if c.Resume && c.Store == nil { + return ErrNilStore + } + + return nil +} diff --git a/vendor/github.com/eventials/go-tus/doc.go b/vendor/github.com/eventials/go-tus/doc.go new file mode 100644 index 00000000..125a72ca --- /dev/null +++ b/vendor/github.com/eventials/go-tus/doc.go @@ -0,0 +1,8 @@ +// Package tus provides a client to tus protocol version 1.0.0. +// +// tus is a protocol based on HTTP for resumable file uploads. Resumable means that +// an upload can be interrupted at any moment and can be resumed without +// re-uploading the previous data again. An interruption may happen willingly, if +// the user wants to pause, or by accident in case of an network issue or server +// outage (http://tus.io). +package tus diff --git a/vendor/github.com/eventials/go-tus/docker-compose.yml b/vendor/github.com/eventials/go-tus/docker-compose.yml new file mode 100644 index 00000000..2033c5f0 --- /dev/null +++ b/vendor/github.com/eventials/go-tus/docker-compose.yml @@ -0,0 +1,8 @@ +version: '2' +services: + app: + build: . + working_dir: /go/src/github.com/eventials/go-tus + command: go run main.go + volumes: + - .:/go/src/github.com/eventials/go-tus diff --git a/vendor/github.com/eventials/go-tus/errors.go b/vendor/github.com/eventials/go-tus/errors.go new file mode 100644 index 00000000..ead5e675 --- /dev/null +++ b/vendor/github.com/eventials/go-tus/errors.go @@ -0,0 +1,29 @@ +package tus + +import ( + "errors" + "fmt" +) + +var ( + ErrChuckSize = errors.New("chunk size must be greater than zero.") + ErrNilLogger = errors.New("logger can't be nil.") + ErrNilStore = errors.New("store can't be nil if Resume is enable.") + ErrNilUpload = errors.New("upload can't be nil.") + ErrLargeUpload = errors.New("upload body is to large.") + ErrVersionMismatch = errors.New("protocol version mismatch.") + ErrOffsetMismatch = errors.New("upload offset mismatch.") + ErrUploadNotFound = errors.New("upload not found.") + ErrResumeNotEnabled = errors.New("resuming not enabled.") + ErrFingerprintNotSet = errors.New("fingerprint not set.") + ErrUrlNotRecognized = errors.New("url not recognized") +) + +type ClientError struct { + Code int + Body []byte +} + +func (c ClientError) Error() string { + return fmt.Sprintf("unexpected status code: %d", c.Code) +} diff --git a/vendor/github.com/eventials/go-tus/store.go b/vendor/github.com/eventials/go-tus/store.go new file mode 100644 index 00000000..a2edbfda --- /dev/null +++ b/vendor/github.com/eventials/go-tus/store.go @@ -0,0 +1,8 @@ +package tus + +type Store interface { + Get(fingerprint string) (string, bool) + Set(fingerprint, url string) + Delete(fingerprint string) + Close() +} diff --git a/vendor/github.com/eventials/go-tus/upload.go b/vendor/github.com/eventials/go-tus/upload.go new file mode 100644 index 00000000..61975bc6 --- /dev/null +++ b/vendor/github.com/eventials/go-tus/upload.go @@ -0,0 +1,107 @@ +package tus + +import ( + "bytes" + "encoding/base64" + "fmt" + "io" + "os" + "strings" +) + +type Metadata map[string]string + +type Upload struct { + stream io.ReadSeeker + size int64 + offset int64 + + Fingerprint string + Metadata Metadata +} + +// Updates the Upload information based on offset. +func (u *Upload) updateProgress(offset int64) { + u.offset = offset +} + +// Returns whether this upload is finished or not. +func (u *Upload) Finished() bool { + return u.offset >= u.size +} + +// Returns the progress in a percentage. +func (u *Upload) Progress() int64 { + return (u.offset * 100) / u.size +} + +// Returns the current upload offset. +func (u *Upload) Offset() int64 { + return u.offset +} + +// Returns the size of the upload body. +func (u *Upload) Size() int64 { + return u.size +} + +// EncodedMetadata encodes the upload metadata. +func (u *Upload) EncodedMetadata() string { + var encoded []string + + for k, v := range u.Metadata { + encoded = append(encoded, fmt.Sprintf("%s %s", k, b64encode(v))) + } + + return strings.Join(encoded, ",") +} + +// NewUploadFromFile creates a new Upload from an os.File. +func NewUploadFromFile(f *os.File) (*Upload, error) { + fi, err := f.Stat() + + if err != nil { + return nil, err + } + + metadata := map[string]string{ + "filename": fi.Name(), + } + + fingerprint := fmt.Sprintf("%s-%d-%s", fi.Name(), fi.Size(), fi.ModTime()) + + return NewUpload(f, fi.Size(), metadata, fingerprint), nil +} + +// NewUploadFromBytes creates a new upload from a byte array. +func NewUploadFromBytes(b []byte) *Upload { + buffer := bytes.NewReader(b) + return NewUpload(buffer, buffer.Size(), nil, "") +} + +// NewUpload creates a new upload from an io.Reader. +func NewUpload(reader io.Reader, size int64, metadata Metadata, fingerprint string) *Upload { + stream, ok := reader.(io.ReadSeeker) + + if !ok { + buf := new(bytes.Buffer) + buf.ReadFrom(reader) + stream = bytes.NewReader(buf.Bytes()) + } + + if metadata == nil { + metadata = make(Metadata) + } + + return &Upload{ + stream: stream, + size: size, + + Fingerprint: fingerprint, + Metadata: metadata, + } +} + +func b64encode(s string) string { + return base64.StdEncoding.EncodeToString([]byte(s)) +} diff --git a/vendor/github.com/eventials/go-tus/uploader.go b/vendor/github.com/eventials/go-tus/uploader.go new file mode 100644 index 00000000..8e39cd7c --- /dev/null +++ b/vendor/github.com/eventials/go-tus/uploader.go @@ -0,0 +1,115 @@ +package tus + +import ( + "bytes" +) + +type Uploader struct { + client *Client + url string + upload *Upload + offset int64 + aborted bool + uploadSubs []chan Upload + notifyChan chan bool +} + +// Subscribes to progress updates. +func (u *Uploader) NotifyUploadProgress(c chan Upload) { + u.uploadSubs = append(u.uploadSubs, c) +} + +// Abort aborts the upload process. +// It doens't abort the current chunck, only the remaining. +func (u *Uploader) Abort() { + u.aborted = true +} + +// IsAborted returns true if the upload was aborted. +func (u *Uploader) IsAborted() bool { + return u.aborted +} + +// Url returns the upload url. +func (u *Uploader) Url() string { + return u.url +} + +// Offset returns the current offset uploaded. +func (u *Uploader) Offset() int64 { + return u.offset +} + +// Upload uploads the entire body to the server. +func (u *Uploader) Upload() error { + for u.offset < u.upload.size && !u.aborted { + err := u.UploadChunck() + + if err != nil { + return err + } + } + + return nil +} + +// UploadChunck uploads a single chunck. +func (u *Uploader) UploadChunck() error { + data := make([]byte, u.client.Config.ChunkSize) + + _, err := u.upload.stream.Seek(u.offset, 0) + + if err != nil { + return err + } + + size, err := u.upload.stream.Read(data) + + if err != nil { + return err + } + + body := bytes.NewBuffer(data[:size]) + + newOffset, err := u.client.uploadChunck(u.url, body, int64(size), u.offset) + + if err != nil { + return err + } + + u.offset = newOffset + + u.upload.updateProgress(u.offset) + + u.notifyChan <- true + + return nil +} + +// Waits for a signal to broadcast to all subscribers +func (u *Uploader) broadcastProgress() { + for _ = range u.notifyChan { + for _, c := range u.uploadSubs { + c <- *u.upload + } + } +} + +// NewUploader creates a new Uploader. +func NewUploader(client *Client, url string, upload *Upload, offset int64) *Uploader { + notifyChan := make(chan bool) + + uploader := &Uploader{ + client, + url, + upload, + offset, + false, + nil, + notifyChan, + } + + go uploader.broadcastProgress() + + return uploader +} diff --git a/vendor/vendor.json b/vendor/vendor.json index efa961f5..7bb51f43 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -45,6 +45,12 @@ "revision": "1d4478f51bed434f1dadf96dcd9b43aabac66795", "revisionTime": "2017-10-13T11:49:54Z" }, + { + "checksumSHA1": "jWLtXBO8ZTCPxYC+GMRCSk/Qtw0=", + "path": "github.com/eventials/go-tus", + "revision": "03d9c4b616ff3fdc12c47e8207e3b058b33054b6", + "revisionTime": "2018-10-20T12:00:34Z" + }, { "checksumSHA1": "h1d2lPZf6j2dW/mIqVnd1RdykDo=", "path": "github.com/golang/snappy",