Skip to content

Commit

Permalink
feat: storage write buffer size (#3127)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Mar 15, 2024
1 parent 0e635e1 commit fe033b0
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 4 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/e2e-with-client-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- "ipv6"
- "split-running-tasks"
- "cache-list-metadata"
- "write-buffer-size"
include:
- module: normal
charts-config: test/testdata/charts/config.yaml
Expand All @@ -60,6 +61,9 @@ jobs:
- module: cache-list-metadata
charts-config: test/testdata/charts/config-cache-list-metadata.yaml
skip: ""
- module: write-buffer-size
charts-config: test/testdata/charts/config-write-buffer-size.yaml
skip: ""
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
Expand Down
3 changes: 3 additions & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ type StorageOption struct {
// Multiplex indicates reusing underlying storage for same task id
Multiplex bool `mapstructure:"multiplex" yaml:"multiplex"`
StoreStrategy StoreStrategy `mapstructure:"strategy" yaml:"strategy"`
// WriteBufferSize indicates the buffer size when read from source, same usage with io.Copy
// for some resource plugins, bigger buffer size with better performance, on the other hand, bigger buffer size cost huge memory
WriteBufferSize unit.Bytes `mapstructure:"writeBufferSize" yaml:"writeBufferSize"`
}

type StoreStrategy string
Expand Down
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
}
dirMode := os.FileMode(opt.DataDirMode)
storageManager, err := storage.NewStorageManager(opt.Storage.StoreStrategy, &opt.Storage,
gcCallback, dirMode, storage.WithGCInterval(opt.GCInterval.Duration))
gcCallback, dirMode, storage.WithGCInterval(opt.GCInterval.Duration),
storage.WithWriteBufferSize(opt.Storage.WriteBufferSize.ToNumber()))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/storage/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (t *localTaskStore) WritePiece(ctx context.Context, req *WritePieceRequest)
return 0, err
}

n, err = io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
n, err = tryWriteWithBuffer(file, req.Reader, req.Range.Length)
if err != nil {
return n, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/storage/local_storage_subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (t *localSubTaskStore) WritePiece(ctx context.Context, req *WritePieceReque
return 0, err
}

n, err = io.Copy(file, io.LimitReader(req.Reader, req.Range.Length))
n, err = tryWriteWithBuffer(file, req.Reader, req.Range.Length)
if err != nil {
return 0, err
}
Expand Down
28 changes: 27 additions & 1 deletion client/daemon/storage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ const (
GCName = "StorageManager"
)

var tracer trace.Tracer
var (
tracer trace.Tracer
writeBufferPool *sync.Pool
)

func init() {
tracer = otel.Tracer("dfget-daemon-gc")
Expand Down Expand Up @@ -222,6 +225,17 @@ func WithGCInterval(gcInterval time.Duration) func(*storageManager) error {
}
}

func WithWriteBufferSize(size int64) func(*storageManager) error {
return func(manager *storageManager) error {
if size > 0 {
writeBufferPool = &sync.Pool{New: func() any {
return make([]byte, size)
}}
}
return nil
}
}

func (s *storageManager) RegisterTask(ctx context.Context, req *RegisterTaskRequest) (TaskStorageDriver, error) {
ts, ok := s.LoadTask(
PeerTaskMetadata{
Expand Down Expand Up @@ -960,3 +974,15 @@ func (s *storageManager) diskUsageExceed() (exceed bool, bytes int64) {
usage.UsedPercent, s.storeOption.DiskGCThresholdPercent, int64(bs))
return true, int64(bs)
}

func tryWriteWithBuffer(writer io.Writer, reader io.Reader, readSize int64) (written int64, err error) {
if writeBufferPool != nil {
buf := writeBufferPool.Get().([]byte)
written, err = io.CopyBuffer(writer, io.LimitReader(reader, readSize), buf)
//nolint:all
writeBufferPool.Put(buf)
} else {
written, err = io.Copy(writer, io.LimitReader(reader, readSize))
}
return written, err
}
137 changes: 137 additions & 0 deletions test/testdata/charts/config-write-buffer-size.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
scheduler:
image: dragonflyoss/scheduler
tag: latest
replicas: 3
resources:
requests:
cpu: "0"
memory: "0"
limits:
cpu: "2"
memory: "4Gi"
extraVolumeMounts:
- name: logs
mountPath: "/var/log/"
- name: artifact
mountPath: /tmp/artifact
extraVolumes:
- name: logs
emptyDir: { }
- name: artifact
hostPath:
path: /tmp/artifact
metrics:
enable: true
enableHost: true
config:
verbose: true

seedPeer:
image: dragonflyoss/dfdaemon
tag: latest
replicas: 3
resources:
requests:
cpu: "0"
memory: "0"
limits:
cpu: "1"
memory: "1Gi"
extraVolumeMounts:
- name: logs
mountPath: "/var/log/dragonfly"
subPath: dragonfly
- name: artifact
mountPath: /tmp/artifact
extraVolumes:
- name: logs
emptyDir: { }
- name: artifact
hostPath:
path: /tmp/artifact
metrics:
enable: true
config:
verbose: true
storage:
writeBufferSize: 1Mi
download:
prefetch: true

dfdaemon:
image: dragonflyoss/dfdaemon
tag: latest
resources:
requests:
cpu: "0"
memory: "0"
limits:
cpu: "1"
memory: "1Gi"
extraVolumeMounts:
- name: logs
mountPath: "/var/log/"
- name: artifact
mountPath: /tmp/artifact
extraVolumes:
- name: logs
emptyDir: { }
- name: artifact
hostPath:
path: /tmp/artifact
metrics:
enable: true
config:
verbose: true
pprofPort: 9999
download:
prefetch: true
storage:
writeBufferSize: 1Mi
scheduler:
disableAutoBackSource: true
proxy:
defaultFilter: "Expires&Signature&ns"
security:
insecure: true
tcpListen:
namespace: /run/dragonfly/net
# if you want to change port, please update hostPort in $.Values.dfdaemon.hostPort
# port in configmap is generated from $.Values.dfdaemon.hostPort
# port: 65001
registryMirror:
url: https://index.docker.io
proxies:
- regx: blobs/sha256.*
- regx: file-server

manager:
image: dragonflyoss/manager
tag: latest
replicas: 1
resources:
requests:
cpu: "0"
memory: "0"
limits:
cpu: "1"
memory: "2Gi"
extraVolumeMounts:
- name: logs
mountPath: "/var/log/"
- name: artifact
mountPath: /tmp/artifact
extraVolumes:
- name: logs
emptyDir: { }
- name: artifact
hostPath:
path: /tmp/artifact
metrics:
enable: true
config:
verbose: true

containerRuntime:
containerd:
enable: true

0 comments on commit fe033b0

Please sign in to comment.