-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathopen.go
111 lines (100 loc) · 2.53 KB
/
open.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package s3db
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/jrhy/mast"
"github.com/jrhy/mast/persist/s3test"
"github.com/jrhy/s3db/kv"
v1proto "github.com/jrhy/s3db/proto/v1"
)
type KV struct {
Root *kv.DB
Closer func()
}
var (
inMemoryS3Lock sync.Mutex
inMemoryS3 *s3.S3
inMemoryBucket string
)
func OpenKV(ctx context.Context, s3opts S3Options, subdir string) (*KV, error) {
var err error
var c kv.S3Interface
if s3opts.Bucket == "" {
if s3opts.Endpoint != "" {
return nil, fmt.Errorf("s3_endpoint specified without s3_bucket")
}
inMemoryS3Lock.Lock()
defer inMemoryS3Lock.Unlock()
if inMemoryS3 == nil {
inMemoryS3, inMemoryBucket, _ = s3test.Client()
}
s3opts.Endpoint = inMemoryS3.Endpoint
s3opts.Bucket = inMemoryBucket
c = inMemoryS3
} else {
c, err = getS3(s3opts.Endpoint)
if err != nil {
return nil, fmt.Errorf("s3 client: %w", err)
}
}
path := strings.TrimPrefix(strings.TrimPrefix(strings.TrimSuffix(s3opts.Prefix, "/"), "/")+"/"+strings.TrimPrefix(subdir, "/"), "/")
cfg := kv.Config{
Storage: &kv.S3BucketInfo{
EndpointURL: s3opts.Endpoint,
BucketName: s3opts.Bucket,
Prefix: path,
},
KeysLike: &Key{},
ValuesLike: &v1proto.Row{},
CustomMerge: mergeValues,
CustomMarshal: marshalProto,
CustomUnmarshal: unmarshalProto,
MastNodeFormat: string(mast.V1Marshaler),
UnmarshalUsesRegisteredTypes: true,
}
if s3opts.NodeCacheEntries > 0 {
cfg.NodeCache = mast.NewNodeCache(s3opts.NodeCacheEntries)
}
if s3opts.EntriesPerNode > 0 {
cfg.BranchFactor = uint(s3opts.EntriesPerNode)
}
openOpts := kv.OpenOptions{
ReadOnly: s3opts.ReadOnly,
OnlyVersions: s3opts.OnlyVersions,
}
s, err := kv.Open(ctx, c, cfg, openOpts, time.Now())
if err != nil {
return nil, fmt.Errorf("open: %w", err)
}
dbg("%s size:%d\n", subdir, s.Size())
return &KV{
Root: s,
}, nil
}
type S3Options struct {
Bucket string
Endpoint string
Prefix string
EntriesPerNode int
NodeCacheEntries int
ReadOnly bool
OnlyVersions []string
}
func getS3(endpoint string) (*s3.S3, error) {
config := aws.Config{}
if endpoint != "" {
config.Endpoint = &endpoint
config.S3ForcePathStyle = aws.Bool(true)
}
sess, err := session.NewSession(&config)
if err != nil {
return nil, fmt.Errorf("session: %w", err)
}
return s3.New(sess), nil
}