Skip to content

Commit

Permalink
Merge pull request kubeagi#329 from 0xff-dev/refactor-minio
Browse files Browse the repository at this point in the history
refactor: minio
  • Loading branch information
bjwswang authored Dec 12, 2023
2 parents b242c29 + 13e9ce3 commit da12bf8
Show file tree
Hide file tree
Showing 27 changed files with 1,913 additions and 1,122 deletions.
242 changes: 0 additions & 242 deletions api/base/v1alpha1/versioneddataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,251 +16,9 @@ limitations under the License.

package v1alpha1

import (
"context"
"fmt"
"sort"
"strings"

"github.com/minio/minio-go/v7"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"github.com/kubeagi/arcadia/pkg/utils/minioutils"
)

var (
LabelVersionedDatasetVersion = Group + "/version"
LabelVersionedDatasetVersionOwner = Group + "/owner"
)

const InheritedFromVersionName = "inheritfrom-"

func generateInheriedFileStatus(minioClient *minio.Client, instance *VersionedDataset) []FileStatus {
srcBucket := instance.Spec.Dataset.Namespace
prefix := fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.InheritedFrom)
name := InheritedFromVersionName + instance.Spec.InheritedFrom
phase := FileProcessPhaseProcessing
if instance.Spec.InheritedFrom == "" {
prefix = fmt.Sprintf("dataset/%s/%s/", instance.Spec.Dataset.Name, instance.Spec.Version)
name = InheritedFromVersionName + instance.Spec.Version
phase = FileProcessPhaseSucceeded
}

status := make([]FileDetails, 0)
if instance.Spec.InheritedFrom != "" {
filePaths := minioutils.ListObjects(context.TODO(), *srcBucket, prefix, minioClient, -1)
sort.Strings(filePaths)
for _, fp := range filePaths {
status = append(status, FileDetails{
Path: strings.TrimPrefix(fp, prefix),
Phase: phase,
})
}
}

return []FileStatus{
{
TypedObjectReference: TypedObjectReference{
Name: name,
Namespace: &instance.Namespace,
Kind: "VersionedDataset",
},
Status: status,
}}
}

func generateDatasourceFileStatus(instance *VersionedDataset) []FileStatus {
// 2. Organize the contents of the fileGroup into this format: {"datasourceNamespace datasourceName": ["file1", "file2"]}
fileGroup := make(map[string][]string)
for _, fg := range instance.Spec.FileGroups {
namespace := instance.Namespace
if fg.Source.Namespace != nil {
namespace = *fg.Source.Namespace
}
key := fmt.Sprintf("%s %s", namespace, fg.Source.Name)
if _, ok := fileGroup[key]; !ok {
fileGroup[key] = make([]string, 0)
}
fileGroup[key] = append(fileGroup[key], fg.Paths...)
}

// 3. Convert fileGroup to []DatasourceFileStatus format
targetDatasourceFileStatus := make([]FileStatus, 0)
var namespace, name string
for datasource, filePaths := range fileGroup {
_, _ = fmt.Sscanf(datasource, "%s %s", &namespace, &name)
item := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: name,
Namespace: &namespace,
Kind: "Datasource",
},
Status: []FileDetails{},
}
for _, fp := range filePaths {
item.Status = append(item.Status, FileDetails{
Path: fp,
Phase: FileProcessPhaseProcessing,
})
}
sort.Slice(item.Status, func(i, j int) bool {
return item.Status[i].Path < item.Status[j].Path
})

targetDatasourceFileStatus = append(targetDatasourceFileStatus, item)
}
return targetDatasourceFileStatus
}

// CopyedFileGroup2Status the function will eventually return, whether there are new files added. and a list of files that were deleted.
func CopyedFileGroup2Status(minioClient *minio.Client, instance *VersionedDataset) (bool, []FileStatus) {
if instance.DeletionTimestamp != nil {
source := instance.Status.Files
instance.Status.Files = nil
return false, source
}

// 1. First store the information about the status of the file that has been saved in the current status.
oldDatasourceFiles := make(map[string]map[string]FileDetails)
for _, fileStatus := range instance.Status.Files {
key := fmt.Sprintf("%s %s", *fileStatus.Namespace, fileStatus.Name)
if _, ok := oldDatasourceFiles[key]; !ok {
oldDatasourceFiles[key] = make(map[string]FileDetails)
}
for _, item := range fileStatus.Status {
oldDatasourceFiles[key][item.Path] = item
}
}

targetDatasourceFileStatus := generateDatasourceFileStatus(instance)
targetDatasourceFileStatus = append(targetDatasourceFileStatus, generateInheriedFileStatus(minioClient, instance)...)

// 4. If a file from a data source is found to exist in oldDatasourceFiles,
// replace it with the book inside oldDatasourceFiles.
// Otherwise set the file as being processed.
update := false
deletedFiles := make([]FileStatus, 0)
for idx := range targetDatasourceFileStatus {
item := targetDatasourceFileStatus[idx]
key := fmt.Sprintf("%s %s", *item.Namespace, item.Name)

// if the datasource itself is not in status, then it is a new series of files added.
datasourceFiles, ok := oldDatasourceFiles[key]
if !ok {
update = true
continue
}

// We need to check if the file under spec has existed in status, if so, how to update its status, otherwise it is a new file.
for i, status := range item.Status {
oldFileStatus, ok := datasourceFiles[status.Path]
if !ok {
update = true
continue
}
item.Status[i] = oldFileStatus

// do the deletion here and the last data that still exists in the map then is the file that needs to be deleted.
delete(datasourceFiles, status.Path)
}
if len(datasourceFiles) > 0 {
ds := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: item.Name,
Namespace: item.Namespace,
},
Status: make([]FileDetails, 0),
}
for _, r := range datasourceFiles {
ds.Status = append(ds.Status, r)
}
deletedFiles = append(deletedFiles, ds)
}
targetDatasourceFileStatus[idx] = item
delete(oldDatasourceFiles, key)
}

for key, item := range oldDatasourceFiles {
var namespace, name string
fmt.Sscanf(key, "%s %s", &namespace, &name)
ds := FileStatus{
TypedObjectReference: TypedObjectReference{
Name: name,
Namespace: &namespace,
},
Status: make([]FileDetails, 0),
}
for _, r := range item {
ds.Status = append(ds.Status, r)
}
deletedFiles = append(deletedFiles, ds)
}

sort.Slice(targetDatasourceFileStatus, func(i, j int) bool {
return targetDatasourceFileStatus[i].Name < targetDatasourceFileStatus[j].Name
})

index := -1
for idx, item := range instance.Status.Conditions {
if item.Type == TypeReady {
if item.Status != corev1.ConditionTrue {
index = idx
}
break
}
}
if len(instance.Status.Conditions) == 0 || index != -1 {
message := "sync files."
if index != -1 {
message = "file synchronization failed, try again"
}
cond := Condition{
Type: TypeReady,
Status: corev1.ConditionFalse,
Reason: ReasonFileSyncing,
Message: message,
LastTransitionTime: v1.Now(),
}
instance.Status.ConditionedStatus.SetConditions(cond)
}
klog.V(4).Infof("[Debug] delete filestatus %+v\n", deletedFiles)

instance.Status.Files = targetDatasourceFileStatus
// update condition to sync
return update, deletedFiles
}

func UpdateFileStatus(ctx context.Context, instance *VersionedDataset, datasource, srcPath string, syncStatus FileProcessPhase, errMsg string) error {
datasourceFileLen := len(instance.Status.Files)
datasourceIndex := sort.Search(datasourceFileLen, func(i int) bool {
return instance.Status.Files[i].Name >= datasource
})
if datasourceIndex == datasourceFileLen {
return fmt.Errorf("not found datasource %s in %s/%s.status", datasource, instance.Namespace, instance.Name)
}

filePathLen := len(instance.Status.Files[datasourceIndex].Status)
fileIndex := sort.Search(filePathLen, func(i int) bool {
return instance.Status.Files[datasourceIndex].Status[i].Path >= srcPath
})
if fileIndex == filePathLen {
return fmt.Errorf("not found srcPath %s in datasource %s", srcPath, datasource)
}

// Only this state transfer is allowed
curPhase := instance.Status.Files[datasourceIndex].Status[fileIndex].Phase
if curPhase == FileProcessPhaseProcessing && (syncStatus == FileProcessPhaseSucceeded || syncStatus == FileProcessPhaseFailed) {
instance.Status.Files[datasourceIndex].Status[fileIndex].Phase = syncStatus
if syncStatus == FileProcessPhaseFailed {
instance.Status.Files[datasourceIndex].Status[fileIndex].ErrMessage = errMsg
}
if syncStatus == FileProcessPhaseSucceeded {
instance.Status.Files[datasourceIndex].Status[fileIndex].LastUpdateTime = v1.Now()
}
return nil
}

return fmt.Errorf("wrong state. from %s to %s", curPhase, syncStatus)
}
3 changes: 2 additions & 1 deletion controllers/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kubeagi/arcadia/pkg/datasource"
"github.com/kubeagi/arcadia/pkg/scheduler"
"github.com/kubeagi/arcadia/pkg/utils"
"github.com/kubeagi/arcadia/pkg/versioneddataset"
)

// VersionedDatasetReconciler reconciles a VersionedDataset object
Expand Down Expand Up @@ -227,7 +228,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log
return false, nil, err
}

update, deleteFileStatus := v1alpha1.CopyedFileGroup2Status(oss.Client, instance)
update, deleteFileStatus := versioneddataset.CopyedFileGroup2Status(oss, instance)
return update, deleteFileStatus, nil
}

Expand Down
94 changes: 94 additions & 0 deletions graphql-server/go-server/examples/upload-download-file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
## Introduction

The back-end code implements chunked uploads as well as chunked downloads. Here is the complete set of calling logic.

### Build the code and get the executable

```shell
go build -o main main.go
```

### View command line arguments

```shell
Usage of ./main:
-action string
you can only choose download, upload. (default "upload")
-bucket string
(default "abc")
-bucket-path string
(default "dataset/ds1/v1")
-file string
if it's an uploaded file, then it's the path to the local file, if it's a downloaded file, it's the path in minio, remember, bucketPath+filename make up the full storage path in minio.
-host string
graphql-server address (default "http://localhost:8099")
```
- The `action` parameter specifies whether you want to upload or download a file.
- The `bucket` specifies the name of the bucket in the object store.
- The `bucket-path` indicates the path prefix of the file to be stored. For example, `bucket-path=abc/def`, and you want to store the file under `text/a.txt`, then the final storage path is abc/def/text/a.txt
- If you are uploading a file, then file is the storage path of the local file, **if you write absolute path /local/a.txt, bucket-path=abc/def then your file will be stored under abc/def/local/a.txt eventually**.
If you want to download a file, **specify the path after removing the bucket-path**, or the above example, you just need to write file=local/a.txt and it will download the file `abc/def/local/a.txt`.
- The host is the address of the backend service.
### Example of use
1. Upload local file, bucket is abc, bucket-path=def, local file is local/a.txt
```shell
./main --file=tmp.tar.gz --bucket=abc --bucket-path=def

I1211 15:15:56.834382 2903826 main.go:276] [DEBUG] ***** part 0, md5 is 8e3192f72fba1faee864edaa6f1636fc
I1211 15:15:56.889742 2903826 main.go:276] [DEBUG] ***** part 1, md5 is 29ee8a4e3f980fb9fe8b572c39d3caea
I1211 15:15:56.889786 2903826 main.go:323] [DEBUG] file md5 7ed8b2a8ea5798202b81eedf14dc978c, etag: 70b40381f8881546c265afe4f279cd01-2...
I1211 15:15:56.889808 2903826 main.go:331] [Step 1] check the number of chunks the file has completed.
I1211 15:15:56.889821 2903826 main.go:103] [DEBUG] check success chunks...
I1211 15:15:56.889840 2903826 main.go:111] [DEBUG] send get request to http://localhost:8099/bff/versioneddataset/files/chunks?bucket=abc&bucketPath=def&etag=70b40381f8881546c265afe4f279cd01-2&fileName=tmp.tar.gz&md5=7ed8b2a8ea5798202b81eedf14dc978c
I1211 15:15:56.918185 2903826 main.go:348] [Step 2] get new uploadid
I1211 15:15:56.918208 2903826 main.go:143] [DEBUG] request new multipart uploadid...
I1211 15:15:56.918244 2903826 main.go:155] [DEBUG] send post request to http://localhost:8099/bff/versioneddataset/files/chunks, with body {"chunkCount":2,"size":33554432,"md5":"7ed8b2a8ea5798202b81eedf14dc978c","fileName":"tmp.tar.gz","bucket":"abc","bucketPath":"def"}...
I1211 15:15:56.948222 2903826 main.go:359] [Step 3] tart uploading files based on uploadid NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ.
I1211 15:15:56.948298 2903826 main.go:190] [DEBUG] request upload url by uploadid: NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ...
I1211 15:15:56.948343 2903826 main.go:190] [DEBUG] request upload url by uploadid: NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ...
I1211 15:15:56.948368 2903826 main.go:202] [DEBUG] send post request to http://localhost:8099/bff/versioneddataset/files/chunk_url, with body {"partNumber":2,"size":33554432,"md5":"7ed8b2a8ea5798202b81eedf14dc978c","uploadID":"NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ","bucket":"abc","bucketPath":"def"}...
I1211 15:15:56.948383 2903826 main.go:202] [DEBUG] send post request to http://localhost:8099/bff/versioneddataset/files/chunk_url, with body {"partNumber":1,"size":33554432,"md5":"7ed8b2a8ea5798202b81eedf14dc978c","uploadID":"NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ","bucket":"abc","bucketPath":"def"}...
I1211 15:15:57.987389 2903826 main.go:389] [Step 4], all chunks are uploaded successfully and merging of chunks begins.
I1211 15:15:57.987408 2903826 main.go:231] [DEBUG] all chunks are uploaded, merge all chunks...
I1211 15:15:57.987456 2903826 main.go:242] [DEBUG] send put request to http://localhost:8099/bff/versioneddataset/files/chunks, with body {"md5":"7ed8b2a8ea5798202b81eedf14dc978c","bucket_path":"def","bucket":"abc","file_name":"tmp.tar.gz","uploadID":"NTg5MTkzYmMtNGNiYS00M2ExLWJiNDYtNjUyMDNkNDQyZjFkLmRkYWE1ZDA5LTUxZjUtNGUzNS05Yjg3LTQyZmRiMzVjOGE4MQ"}...
I1211 15:15:58.089296 2903826 main.go:406] [Step 5], Congratulations, the file was uploaded successfully



# run again, Because the same file already exists, it will not be uploaded again, whether the file is the same or not is calculated by etag.
./main --file=tmp.tar.gz --bucket=abc --bucket-path=def

I1211 15:16:22.142334 2903929 main.go:276] [DEBUG] ***** part 0, md5 is 8e3192f72fba1faee864edaa6f1636fc
I1211 15:16:22.198029 2903929 main.go:276] [DEBUG] ***** part 1, md5 is 29ee8a4e3f980fb9fe8b572c39d3caea
I1211 15:16:22.198065 2903929 main.go:323] [DEBUG] file md5 7ed8b2a8ea5798202b81eedf14dc978c, etag: 70b40381f8881546c265afe4f279cd01-2...
I1211 15:16:22.198085 2903929 main.go:331] [Step 1] check the number of chunks the file has completed.
I1211 15:16:22.198098 2903929 main.go:103] [DEBUG] check success chunks...
I1211 15:16:22.198117 2903929 main.go:111] [DEBUG] send get request to http://localhost:8099/bff/versioneddataset/files/chunks?bucket=abc&bucketPath=def&etag=70b40381f8881546c265afe4f279cd01-2&fileName=tmp.tar.gz&md5=7ed8b2a8ea5798202b81eedf14dc978c
I1211 15:16:22.200152 2903929 main.go:339] [Done], the file already exists and does not need to be uploaded again
```
2. Download the file you just uploaded
```shell
./main --action=download --file=tmp.tar.gz --bucket-path=def --bucket=abc

I1211 15:16:42.701067 2904044 main.go:410] [Step 1] get file size
I1211 15:16:42.703150 2904044 main.go:436] [DEBUG] file size is 51392595
I1211 15:16:42.703168 2904044 main.go:440] [Step 2] create local file tmp.gz
I1211 15:16:42.703225 2904044 main.go:461] [Step 3] start to donwload...
I1211 15:16:42.703303 2904044 main.go:482] [Chunk 41943040-51392595] send request to http://localhost:8099/bff/versioneddataset/files/download?bucket=abc&bucketPath=def&end=51392595&fileName=tmp.tar.gz&from=41943040
I1211 15:16:42.703391 2904044 main.go:482] [Chunk 10485760-20971520] send request to http://localhost:8099/bff/versioneddataset/files/download?bucket=abc&bucketPath=def&end=20971520&fileName=tmp.tar.gz&from=10485760
I1211 15:16:42.703452 2904044 main.go:482] [Chunk 20971520-31457280] send request to http://localhost:8099/bff/versioneddataset/files/download?bucket=abc&bucketPath=def&end=31457280&fileName=tmp.tar.gz&from=20971520
I1211 15:16:42.703574 2904044 main.go:482] [Chunk 0-10485760] send request to http://localhost:8099/bff/versioneddataset/files/download?bucket=abc&bucketPath=def&end=10485760&fileName=tmp.tar.gz&from=0
I1211 15:16:42.704383 2904044 main.go:482] [Chunk 31457280-41943040] send request to http://localhost:8099/bff/versioneddataset/files/download?bucket=abc&bucketPath=def&end=41943040&fileName=tmp.tar.gz&from=31457280
I1211 15:16:42.990110 2904044 main.go:502] [Step 4] File download complete
```
### How to calculate etag
Refer to function `fileMD5andEtag`
Loading

0 comments on commit da12bf8

Please sign in to comment.