Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(oss api): support ceph oss #775

Merged
merged 30 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
73d023b
feat(oss api): support ceph oss
wlwilliamx Aug 30, 2022
d705b73
feat(oss api): support ceph oss
wlwilliamx Aug 30, 2022
228b281
chore(ceph oss test): update ceph oss test
wlwilliamx Aug 30, 2022
a9d99b7
Merge branch 'main' into dev
wenxuwan Aug 30, 2022
e71cce9
chore: update goimports
wlwilliamx Aug 30, 2022
5ae0a2d
Merge remote-tracking branch 'main' into dev
wlwilliamx Aug 30, 2022
2e2e288
style: format code
wlwilliamx Aug 30, 2022
2e66fb3
Merge branch 'main' into dev
seeflood Sep 8, 2022
60d0aee
style(ceph): format code
wlwilliamx Sep 16, 2022
e556fc0
Merge remote-tracking branch 'fork/dev' into dev
wlwilliamx Sep 16, 2022
8aa327e
chore(proto): regenerate proto code
wlwilliamx Sep 16, 2022
995dde7
Merge remote-tracking branch 'origin/main' into dev
wlwilliamx Sep 17, 2022
1d2be8a
fix(typo): fix a typo
wlwilliamx Sep 17, 2022
24356a5
fix(typo): fix a typo
wlwilliamx Sep 22, 2022
7b3d8f5
fix(typo): fix a typo
wlwilliamx Sep 22, 2022
9919f23
Merge branch 'main' into dev
wenxuwan Oct 9, 2022
fd653b2
test(oss): add some UT
wlwilliamx Oct 10, 2022
f7b8e60
fix(typo): fix a typo
wlwilliamx Oct 10, 2022
562a93b
Merge branch 'main' into dev
seeflood Oct 10, 2022
36fbd9c
Merge branch 'main' into dev
seeflood Oct 12, 2022
ef7f469
chore(goimports): goimports
wlwilliamx Oct 13, 2022
f01edc8
Merge branch 'main' into dev
wlwilliamx Oct 13, 2022
e3c9ac3
chore(goimports): goimports
wlwilliamx Oct 13, 2022
6537c0b
test(oss): add some UT
wlwilliamx Oct 13, 2022
98581ed
chore(goimports): goimports file
wlwilliamx Oct 13, 2022
754058f
chore(oss): extract output functions and add UT for these functions
wlwilliamx Oct 13, 2022
0928526
Update cmd/layotto/main.go
seeflood Oct 13, 2022
3818890
Update cmd/layotto_without_xds/main.go
seeflood Oct 13, 2022
cb4356a
Update cmd/layotto_multiple_api/main.go
seeflood Oct 13, 2022
fa62804
Merge branch 'main' into dev
seeflood Oct 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cmd/layotto_multiple_api/nohup.out
default.etcd/
demo/configuration/common/client
demo/file/client
demo/oss/client
demo/flowcontrol/client
demo/lock/redis/client
demo/pubsub/redis/client/publisher
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

"mosn.io/mosn/pkg/istio"

aliyun_file "mosn.io/layotto/components/file/aliyun"
Expand Down Expand Up @@ -289,6 +291,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),
// PubSub
runtime.WithPubSubFactory(
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto_multiple_api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

aliyun_file "mosn.io/layotto/components/file/aliyun"
"mosn.io/layotto/components/file/local"

Expand Down Expand Up @@ -303,6 +305,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),

// PubSub
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto_without_xds/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

"mosn.io/layotto/components/file/aliyun"
aws_file "mosn.io/layotto/components/file/aws"
"mosn.io/layotto/components/file/minio"
Expand Down Expand Up @@ -408,6 +410,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),

// Sequencer
Expand Down
43 changes: 0 additions & 43 deletions components/oss/aws/option.go

This file was deleted.

90 changes: 19 additions & 71 deletions components/oss/aws/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,8 @@ func (a *AwsOss) GetObject(ctx context.Context, req *oss.GetObjectInput) (*oss.G
if err != nil {
return nil, err
}
out := &oss.GetObjectOutput{}
err = copier.Copy(out, ob)
if err != nil {
return nil, err
}
out.DataStream = ob.Body
return out, nil

return oss.GetGetObjectOutput(ob)
}

func (a *AwsOss) PutObject(ctx context.Context, req *oss.PutObjectInput) (*oss.PutObjectOutput, error) {
Expand All @@ -114,12 +109,8 @@ func (a *AwsOss) PutObject(ctx context.Context, req *oss.PutObjectInput) (*oss.P
if err != nil {
return nil, err
}
out := &oss.PutObjectOutput{}
err = copier.Copy(out, resp)
if err != nil {
return nil, err
}
return out, err

return oss.GetPutObjectOutput(resp)
}

func (a *AwsOss) DeleteObject(ctx context.Context, req *oss.DeleteObjectInput) (*oss.DeleteObjectOutput, error) {
Expand All @@ -135,7 +126,7 @@ func (a *AwsOss) DeleteObject(ctx context.Context, req *oss.DeleteObjectInput) (
if err != nil {
return nil, err
}
return &oss.DeleteObjectOutput{DeleteMarker: resp.DeleteMarker, RequestCharged: string(resp.RequestCharged), VersionId: *resp.VersionId}, err
return oss.GetDeleteObjectOutput(resp)
}

func (a *AwsOss) PutObjectTagging(ctx context.Context, req *oss.PutObjectTaggingInput) (*oss.PutObjectTaggingOutput, error) {
Expand Down Expand Up @@ -169,7 +160,7 @@ func (a *AwsOss) DeleteObjectTagging(ctx context.Context, req *oss.DeleteObjectT
if err != nil {
return nil, err
}
return &oss.DeleteObjectTaggingOutput{VersionId: *resp.VersionId}, err
return oss.GetDeleteObjectTaggingOutput(resp)
}

func (a *AwsOss) GetObjectTagging(ctx context.Context, req *oss.GetObjectTaggingInput) (*oss.GetObjectTaggingOutput, error) {
Expand All @@ -187,11 +178,7 @@ func (a *AwsOss) GetObjectTagging(ctx context.Context, req *oss.GetObjectTagging
return nil, err
}

output := &oss.GetObjectTaggingOutput{Tags: map[string]string{}}
for _, tags := range resp.TagSet {
output.Tags[*tags.Key] = *tags.Value
}
return output, err
return oss.GetGetObjectTaggingOutput(resp)
}

func (a *AwsOss) CopyObject(ctx context.Context, req *oss.CopyObjectInput) (*oss.CopyObjectOutput, error) {
Expand Down Expand Up @@ -259,14 +246,8 @@ func (a *AwsOss) ListObjects(ctx context.Context, req *oss.ListObjectsInput) (*o
if err != nil {
return nil, err
}
output := &oss.ListObjectsOutput{}
err = copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
// if not return NextMarker, use the value of the last Key in the response as the marker
if output.IsTruncated && output.NextMarker == "" {
index := len(output.Contents) - 1
output.NextMarker = output.Contents[index].Key
}
return output, err

return oss.GetListObjectsOutput(resp)
}
func (a *AwsOss) GetObjectCannedAcl(ctx context.Context, req *oss.GetObjectCannedAclInput) (*oss.GetObjectCannedAclOutput, error) {
return nil, errors.New("GetObjectCannedAcl method not supported on AWS")
Expand Down Expand Up @@ -304,7 +285,7 @@ func (a *AwsOss) CreateMultipartUpload(ctx context.Context, req *oss.CreateMulti
return nil, err
}
input := &s3.CreateMultipartUploadInput{}
err = copier.CopyWithOption(input, req, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{int642time}})
err = copier.CopyWithOption(input, req, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.Int64ToTime}})
if err != nil {
log.DefaultLogger.Errorf("copy CreateMultipartUploadInput fail, err: %+v", err)
return nil, err
Expand All @@ -314,7 +295,7 @@ func (a *AwsOss) CreateMultipartUpload(ctx context.Context, req *oss.CreateMulti
return nil, err
}
output := &oss.CreateMultipartUploadOutput{}
copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.TimeToInt64}})
return output, err
}
func (a *AwsOss) UploadPart(ctx context.Context, req *oss.UploadPartInput) (*oss.UploadPartOutput, error) {
Expand All @@ -332,12 +313,8 @@ func (a *AwsOss) UploadPart(ctx context.Context, req *oss.UploadPartInput) (*oss
if err != nil {
return nil, err
}
output := &oss.UploadPartOutput{}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
return output, err

return oss.GetUploadPartOutput(resp)
}
func (a *AwsOss) UploadPartCopy(ctx context.Context, req *oss.UploadPartCopyInput) (*oss.UploadPartCopyOutput, error) {
client, err := a.getClient()
Expand All @@ -360,9 +337,8 @@ func (a *AwsOss) UploadPartCopy(ctx context.Context, req *oss.UploadPartCopyInpu
if err != nil {
return nil, err
}
output := &oss.UploadPartCopyOutput{}
err = copier.Copy(output, resp)
return output, err

return oss.GetUploadPartCopyOutput(resp)
}
func (a *AwsOss) CompleteMultipartUpload(ctx context.Context, req *oss.CompleteMultipartUploadInput) (*oss.CompleteMultipartUploadOutput, error) {
client, err := a.getClient()
Expand Down Expand Up @@ -417,20 +393,8 @@ func (a *AwsOss) ListMultipartUploads(ctx context.Context, req *oss.ListMultipar
if err != nil {
return nil, err
}
output := &oss.ListMultipartUploadsOutput{CommonPrefixes: []string{}, Uploads: []*oss.MultipartUpload{}}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
for _, v := range resp.CommonPrefixes {
output.CommonPrefixes = append(output.CommonPrefixes, *v.Prefix)
}
for _, v := range resp.Uploads {
upload := &oss.MultipartUpload{}
copier.CopyWithOption(upload, v, copier.Option{IgnoreEmpty: true, DeepCopy: true})
output.Uploads = append(output.Uploads, upload)
}
return output, err

return oss.GetListMultipartUploadsOutput(resp)
}
func (a *AwsOss) ListObjectVersions(ctx context.Context, req *oss.ListObjectVersionsInput) (*oss.ListObjectVersionsOutput, error) {
client, err := a.getClient()
Expand All @@ -446,24 +410,8 @@ func (a *AwsOss) ListObjectVersions(ctx context.Context, req *oss.ListObjectVers
if err != nil {
return nil, err
}
output := &oss.ListObjectVersionsOutput{}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
for _, v := range resp.CommonPrefixes {
output.CommonPrefixes = append(output.CommonPrefixes, *v.Prefix)
}
for _, v := range resp.DeleteMarkers {
entry := &oss.DeleteMarkerEntry{IsLatest: v.IsLatest, Key: *v.Key, Owner: &oss.Owner{DisplayName: *v.Owner.DisplayName, ID: *v.Owner.ID}, VersionId: *v.VersionId}
output.DeleteMarkers = append(output.DeleteMarkers, entry)
}
for _, v := range resp.Versions {
version := &oss.ObjectVersion{}
copier.CopyWithOption(version, v, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
output.Versions = append(output.Versions, version)
}
return output, err

return oss.GetListObjectVersionsOutput(resp)
}

func (a *AwsOss) HeadObject(ctx context.Context, req *oss.HeadObjectInput) (*oss.HeadObjectOutput, error) {
Expand Down
31 changes: 30 additions & 1 deletion components/oss/aws/oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/feature/s3/manager"

"github.com/aws/aws-sdk-go-v2/service/s3"

"github.com/jinzhu/copier"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
Expand Down Expand Up @@ -136,6 +140,31 @@ func TestAwsOss(t *testing.T) {

err = instance.UpdateUploadBandwidthRateLimit(context.TODO(), &oss.UpdateBandwidthRateLimitInput{})
assert.NotNil(t, err)

_, err = oss.GetGetObjectOutput(&s3.GetObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetPutObjectOutput(&manager.UploadOutput{})
assert.Nil(t, err)
_, err = oss.GetDeleteObjectOutput(&s3.DeleteObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetDeleteObjectOutput(&s3.DeleteObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetGetObjectTaggingOutput(&s3.GetObjectTaggingOutput{})
assert.Nil(t, err)
_, err = oss.GetListObjectsOutput(&s3.ListObjectsOutput{})
assert.Nil(t, err)
_, err = oss.GetGetObjectCannedAclOutput(&s3.GetObjectAclOutput{})
assert.Nil(t, err)
_, err = oss.GetUploadPartOutput(&s3.UploadPartOutput{})
assert.Nil(t, err)
_, err = oss.GetUploadPartCopyOutput(&s3.UploadPartCopyOutput{})
assert.Nil(t, err)
_, err = oss.GetListPartsOutput(&s3.ListPartsOutput{})
assert.Nil(t, err)
_, err = oss.GetListMultipartUploadsOutput(&s3.ListMultipartUploadsOutput{})
assert.Nil(t, err)
_, err = oss.GetListObjectVersionsOutput(&s3.ListObjectVersionsOutput{})
assert.Nil(t, err)
}

func TestDeepCopy(t *testing.T) {
Expand All @@ -152,7 +181,7 @@ func TestDeepCopy(t *testing.T) {
VersionId: &value,
}
tovalue := &oss.ObjectVersion{}
err := copier.CopyWithOption(tovalue, fromValue, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
err := copier.CopyWithOption(tovalue, fromValue, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.TimeToInt64}})
assert.Nil(t, err)
assert.Equal(t, tovalue.Owner.DisplayName, value)
}
Loading