Skip to content

Commit

Permalink
Fix uploading issue (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu authored Nov 26, 2019
1 parent 4950416 commit 4e832fb
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
vendor
coverage.out
test.sh
tmp*
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.11
- 1.12
- 1.13

Expand All @@ -15,7 +14,6 @@ env:
before_script:
- go get -u github.com/mattn/goveralls
- cd $HOME
- mkdir -p $HOME/gopath/src/github.com/chrislusf/seaweedfs
- mkdir -p /tmp/sw/master/metadata
- mkdir -p /tmp/sw/volume1/data
- mkdir -p /tmp/sw/filer
Expand Down
4 changes: 0 additions & 4 deletions filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ func newFiler(u string, client *httpClient) (f *Filer, err error) {
return
}

var dirHeader = map[string]string{
"Accept": "application/json",
}

// Close underlying daemons.
func (f *Filer) Close() (err error) {
if f.client != nil {
Expand Down
43 changes: 22 additions & 21 deletions seaweed.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (c *Seaweed) LookupServerByFileID(fileID string, args url.Values, readonly

if err == nil {
if readonly {
server = lookup.VolumeLocations.RandomPickForRead().URL
server = lookup.VolumeLocations.RandomPickForRead().PublicURL
} else {
server = lookup.VolumeLocations.Head().URL
}
Expand Down Expand Up @@ -295,30 +295,31 @@ func (c *Seaweed) SubmitFilePart(f *FilePart, args url.Values) (result *SubmitRe
}

// Upload file by reader.
func (c *Seaweed) Upload(fileReader io.Reader, fileName string, size int64, collection, ttl string) (fp *FilePart, fileID string, err error) {
func (c *Seaweed) Upload(fileReader io.Reader, fileName string, size int64, collection, ttl string) (fp *FilePart, err error) {
fp = NewFilePartFromReader(ioutil.NopCloser(fileReader), fileName, size)
fp.Collection, fp.TTL = collection, ttl
_, fileID, err = c.UploadFilePart(fp)
_, err = c.UploadFilePart(fp)
return
}

// UploadFile with full file dir/path.
func (c *Seaweed) UploadFile(filePath string, collection, ttl string) (cm *ChunkManifest, fp *FilePart, fileID string, err error) {
func (c *Seaweed) UploadFile(filePath string, collection, ttl string) (cm *ChunkManifest, fp *FilePart, err error) {
fp, err = NewFilePart(filePath)
if err == nil {
fp.Collection, fp.TTL = collection, ttl
cm, fileID, err = c.UploadFilePart(fp)
cm, err = c.UploadFilePart(fp)
_ = fp.Close()
}
return
}

// UploadFilePart uploads a file part.
func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string, err error) {
func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, err error) {
if f.FileID == "" {
res, err := c.Assign(normalize(nil, f.Collection, f.TTL))
var res *AssignResult
res, err = c.Assign(normalize(nil, f.Collection, f.TTL))
if err != nil {
return nil, "", err
return
}
f.Server, f.FileID = res.URL, res.FileID
}
Expand All @@ -345,7 +346,7 @@ func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string,
_, id, count, e := c.uploadChunk(f, baseName+"_"+strconv.FormatInt(i+1, 10))
if e != nil { // delete all uploaded chunks
_ = c.DeleteChunks(cm, normalize(nil, f.Collection, ""))
return nil, "", e
return nil, e
}

cm.Chunks[i] = &ChunkInfo{
Expand All @@ -360,16 +361,14 @@ func (c *Seaweed) UploadFilePart(f *FilePart) (cm *ChunkManifest, fileID string,
}
} else {
args := normalize(nil, f.Collection, f.TTL)
args.Set("Content-Type", "multipart/form-data")
if f.ModTime != 0 {
args.Set("ts", strconv.FormatInt(f.ModTime, 10))
}

_, _, err = c.client.upload(encodeURI(*c.master, f.FileID, args), baseName, f.Reader, f.MimeType)
}
base := *c.master
base.Host = f.Server

if err == nil {
fileID = f.FileID
_, _, err = c.client.upload(encodeURI(base, f.FileID, args), baseName, f.Reader, f.MimeType)
}

return
Expand Down Expand Up @@ -433,8 +432,7 @@ func (c *Seaweed) BatchUploadFileParts(files []*FilePart, collection string, ttl

func (c *Seaweed) uploadTask(file *FilePart) *workerpool.Task {
return workerpool.NewTask(context.Background(), func(ctx context.Context) (res interface{}, err error) {
fmt.Println("Uploading", file)
_, _, err = c.UploadFilePart(file)
_, err = c.UploadFilePart(file)
return
})
}
Expand All @@ -444,7 +442,7 @@ func (c *Seaweed) Replace(fileID string, newContent io.Reader, fileName string,
fp := NewFilePartFromReader(ioutil.NopCloser(newContent), fileName, size)
fp.Collection, fp.TTL = collection, ttl
fp.FileID = fileID
_, err = c.ReplaceFilePart(fp, deleteFirst)
err = c.ReplaceFilePart(fp, deleteFirst)
return
}

Expand All @@ -453,19 +451,19 @@ func (c *Seaweed) ReplaceFile(fileID, localFilePath string, deleteFirst bool) (e
fp, err := NewFilePart(localFilePath)
if err == nil {
fp.FileID = fileID
_, err = c.ReplaceFilePart(fp, deleteFirst)
err = c.ReplaceFilePart(fp, deleteFirst)
_ = fp.Close()
}
return
}

// ReplaceFilePart replaces file part.
func (c *Seaweed) ReplaceFilePart(f *FilePart, deleteFirst bool) (fileID string, err error) {
func (c *Seaweed) ReplaceFilePart(f *FilePart, deleteFirst bool) (err error) {
if deleteFirst && f.FileID != "" {
_ = c.DeleteFile(f.FileID, nil)
}

_, fileID, err = c.UploadFilePart(f)
_, err = c.UploadFilePart(f)
return
}

Expand Down Expand Up @@ -507,7 +505,10 @@ func (c *Seaweed) uploadManifest(f *FilePart, manifest *ChunkManifest) (err erro
}
args.Set("cm", "true")

_, _, err = c.client.upload(encodeURI(*c.master, f.FileID, args), manifest.Name, bufReader, "application/json")
base := *c.master
base.Host = f.Server

_, _, err = c.client.upload(encodeURI(base, f.FileID, args), manifest.Name, bufReader, "application/json")
}
return
}
Expand Down
112 changes: 68 additions & 44 deletions seaweed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,74 +39,90 @@ func init() {
filer = []string{_filer}
}

sw, _ = NewSeaweed(masterURL, filer, 2*1024*1024, &http.Client{Timeout: 5 * time.Minute})
sw, _ = NewSeaweed(masterURL, filer, 8096, &http.Client{Timeout: 5 * time.Minute})
_ = sw.Close()

sw, _ = NewSeaweed(masterURL, filer, 2*1024*1024, &http.Client{Timeout: 5 * time.Minute})
sw, _ = NewSeaweed(masterURL, filer, 8096, &http.Client{Timeout: 5 * time.Minute})

MediumFile = os.Getenv("GOSWFS_MEDIUM_FILE")
SmallFile = os.Getenv("GOSWFS_SMALL_FILE")
}

func TestUploadLookupserverReplaceDeleteFile(t *testing.T) {
for i := 0; i < 2; i++ {
_, _, fID, err := sw.UploadFile(MediumFile, "", "")
_, fp, err := sw.UploadFile(MediumFile, "", "")
require.Nil(t, err)

_, err = sw.LookupServerByFileID(fID, nil, true)
_, err = sw.LookupServerByFileID(fp.FileID, nil, true)
require.Nil(t, err)

//
_, err = sw.LookupFileID(fID, nil, true)
// verify by downloading
downloaded := verifyDownloadFile(t, fp.FileID)
fh, err := os.Open(MediumFile)
require.Nil(t, err)
allContent, _ := ioutil.ReadAll(fh)
require.Nil(t, fh.Close())
require.EqualValues(t, downloaded, allContent)

// try to looking up
_, err = sw.LookupFileID(fp.FileID, nil, true)
require.Nil(t, err)

//
require.Nil(t, sw.ReplaceFile(fID, SmallFile, false))
_, err = sw.LookupFileID(fID, nil, true)
// try to replace with small file
require.Nil(t, sw.ReplaceFile(fp.FileID, SmallFile, false))
_, err = sw.LookupFileID(fp.FileID, nil, true)
require.Nil(t, err)

//
require.Nil(t, sw.ReplaceFile(fID, SmallFile, true))
_, err = sw.LookupFileID(fID, nil, true)
// verify by downloading
downloaded = verifyDownloadFile(t, fp.FileID)
fh, err = os.Open(SmallFile)
require.Nil(t, err)
allContent, _ = ioutil.ReadAll(fh)
require.Nil(t, fh.Close())
require.EqualValues(t, downloaded, allContent)

//
require.Nil(t, sw.DeleteFile(fID, nil))
// replace again but delete first
require.Nil(t, sw.ReplaceFile(fp.FileID, SmallFile, true))
_, err = sw.LookupFileID(fp.FileID, nil, true)
require.Nil(t, err)

// test upload file
fh, err := os.Open(MediumFile)
// verify by downloading
downloaded = verifyDownloadFile(t, fp.FileID)
fh, err = os.Open(SmallFile)
require.Nil(t, err)
allContent, _ = ioutil.ReadAll(fh)
require.Nil(t, fh.Close())
require.EqualValues(t, downloaded, allContent)

// delete file
require.Nil(t, sw.DeleteFile(fp.FileID, nil))

// uploading with file reader
fh, err = os.Open(MediumFile)
require.Nil(t, err)
var size int64
fi, fiErr := fh.Stat()
require.Nil(t, fiErr)
size = fi.Size()
_, fID, err = sw.Upload(fh, "test.txt", size, "col", "")
fp, err = sw.Upload(fh, "test.txt", size, "col", "")
require.Nil(t, err)
require.Nil(t, fh.Close())

// Replace with small file
// Replace with small file reader
fs, err := os.Open(SmallFile)
require.Nil(t, err)
fi, fiErr = fs.Stat()
require.Nil(t, fiErr)
size = fi.Size()
require.Nil(t, sw.Replace(fID, fs, "ta.txt", size, "", "", false))
require.Nil(t, sw.DeleteFile(fID, nil))
require.Nil(t, sw.Replace(fp.FileID, fs, "ta.txt", size, "", "", false))
require.Nil(t, sw.DeleteFile(fp.FileID, nil))
fs.Close()
}
}

func TestBatchUploadFiles(t *testing.T) {
if MediumFile != "" && SmallFile != "" {
_, err := sw.BatchUploadFiles([]string{MediumFile, SmallFile}, "", "")
require.Nil(t, err)
} else if MediumFile != "" {
_, err := sw.BatchUploadFiles([]string{MediumFile, MediumFile}, "", "")
require.Nil(t, err)
} else if SmallFile != "" {
_, err := sw.BatchUploadFiles([]string{SmallFile, SmallFile}, "", "")
require.Nil(t, err)
}
_, err := sw.BatchUploadFiles([]string{MediumFile, SmallFile}, "", "")
require.Nil(t, err)
}

func TestLookup(t *testing.T) {
Expand All @@ -129,25 +145,34 @@ func TestClusterStatus(t *testing.T) {
require.Nil(t, err)
}

func TestSubmit(t *testing.T) {
if SmallFile != "" {
_, err := sw.Submit(SmallFile, "", "")
require.Nil(t, err)
}
func TestDownloadFile(t *testing.T) {
result, err := sw.Submit(SmallFile, "", "")
require.Nil(t, err)
require.NotNil(t, result)

// return fake error
_, err = sw.Download(result.FileID, nil, func(r io.Reader) error {
return fmt.Errorf("Fake error")
})
require.NotNil(t, err)

// verifying
verifyDownloadFile(t, result.FileID)
}

func TestDownloadFile(t *testing.T) {
if SmallFile != "" {
_, err := sw.Download(SmallFile, nil, func(r io.Reader) error {
return fmt.Errorf("Fake error")
})
require.NotNil(t, err)
}
func verifyDownloadFile(t *testing.T, fid string) (data []byte) {
_, err := sw.Download(fid, nil, func(r io.Reader) (err error) {
data, err = ioutil.ReadAll(r)
return
})
require.Nil(t, err)
require.NotZero(t, len(data))
return
}

func TestDeleteChunks(t *testing.T) {
if MediumFile != "" {
cm, _, _, err := sw.UploadFile(MediumFile, "", "")
cm, _, err := sw.UploadFile(MediumFile, "", "")
require.Nil(t, err)

err = sw.DeleteChunks(cm, nil)
Expand All @@ -171,7 +196,6 @@ func TestFiler(t *testing.T) {
})
require.Nil(t, err)
require.NotZero(t, buf.Len())
t.Log(string(buf.Bytes()))

// try to delete this file
err = filer.Delete("/js/test.txt", nil)
Expand Down

0 comments on commit 4e832fb

Please sign in to comment.