-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_sst_snapshot_storage.go
147 lines (134 loc) · 4.12 KB
/
replica_sst_snapshot_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
// SSTSnapshotStorage keeps track of the SST files created when receiving a
// snapshot with the SST strategy.
type SSTSnapshotStorage struct {
st *cluster.Settings
limiter *rate.Limiter
activeFile bool
currFile *os.File
ssts []string
rangeDir string
snapDir string
dirCreated bool
eng engine.Engine
}
func newSSTSnapshotStorage(
st *cluster.Settings,
rangeID roachpb.RangeID,
snapUUID uuid.UUID,
baseDir string,
limiter *rate.Limiter,
eng engine.Engine,
) (*SSTSnapshotStorage, error) {
rangeDir := filepath.Join(baseDir, "sstsnapshot")
snapDir := filepath.Join(rangeDir, snapUUID.String())
var ssts []string
matches, err := filepath.Glob(filepath.Join(snapDir, "*.sst"))
if err != nil {
return nil, err
}
ssts = append(ssts, matches...)
sss := &SSTSnapshotStorage{
st: st,
limiter: limiter,
activeFile: false,
currFile: nil,
ssts: ssts,
rangeDir: rangeDir,
snapDir: snapDir,
eng: eng,
}
return sss, nil
}
func (sss *SSTSnapshotStorage) filename(index int) string {
return filepath.Join(sss.snapDir, fmt.Sprintf("%d.sst", index))
}
func (sss *SSTSnapshotStorage) createDir() error {
err := os.MkdirAll(sss.snapDir, 0755)
sss.dirCreated = sss.dirCreated || err == nil
return err
}
func (sss *SSTSnapshotStorage) createFile(index int) error {
if !sss.dirCreated {
if err := sss.createDir(); err != nil {
return err
}
}
filename := sss.filename(index)
// Use 0644 since that's what RocksDB uses:
// https://github.com/facebook/rocksdb/blob/56656e12d67d8a63f1e4c4214da9feeec2bd442b/env/env_posix.cc#L171
currFile, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
sss.currFile = currFile
return err
}
// NewFile adds another file to SSSSnapshotStorage. This file is lazily created
// when the file is written to the first time. This file must be closed before
// NewFile is called again.
func (sss *SSTSnapshotStorage) NewFile() error {
if sss.activeFile {
return errors.New("exists an active file that hasn't been closed")
}
sss.ssts = append(sss.ssts, sss.filename(len(sss.ssts)))
sss.activeFile = true
return nil
}
// Write writes contents to the current file while respecting the limiter
// passed into SSTSnapshotStorage.
func (sss *SSTSnapshotStorage) Write(ctx context.Context, contents []byte) error {
if !sss.activeFile {
return errors.New("no active file")
}
if sss.currFile == nil {
if err := sss.createFile(len(sss.ssts) - 1); err != nil {
return err
}
}
// TODO(jeffreyxiao): We should limit the size of a single SST, but right now
// we won't need this because the size of ranges aren't large enough.
limitBulkIOWrite(ctx, sss.limiter, len(contents))
_, err := sss.currFile.Write(contents)
return err
}
// SSTs returns the names of the files created.
func (sss *SSTSnapshotStorage) SSTs() []string {
return sss.ssts
}
// Close closes the current file, if any. Calling this function multiple times
// is idempotent.
func (sss *SSTSnapshotStorage) Close() error {
// We throw an error for empty files because it would be an error to ingest
// an empty SST so catch this error earlier.
if sss.activeFile && sss.currFile == nil {
return errors.New("closing an empty file")
}
if sss.currFile != nil {
return sss.currFile.Close()
}
sss.activeFile = false
return nil
}
// Clear removes the directory and all SST files created.
func (sss *SSTSnapshotStorage) Clear() error {
return os.RemoveAll(sss.rangeDir)
}