Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] [agent-NGT, sidecar] Improve S3 backup/recover behavior #556

Merged
merged 37 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7382a36
:recycle: not to overwrite existing backup files
rinx Jul 7, 2020
8741a1c
:white_check_mark: update tests for internal/compress
rinx Jul 7, 2020
b6bd4f8
:wrench: add default_pool_size option / add internal/json package
rinx Jul 8, 2020
ff6a2d0
:sparkles: Add metadata package to write agent metadata when the inde…
rinx Jul 8, 2020
2b25b77
:sparkles: revise file.Open interface
rinx Jul 8, 2020
814467f
:white_check_mark: fix file test
rinx Jul 8, 2020
1b61d09
:white_check_mark: fix json tests
rinx Jul 8, 2020
8d92ecc
:art: use filepath.Join
rinx Jul 8, 2020
74b5f2f
:sparkles: revise watching method for ngt index backup files
rinx Jul 8, 2020
1c3c9e6
:white_check_mark: remove dps from ngt_test
rinx Jul 8, 2020
f0a8d88
:sparkles: kill own process when NGT index cannot be loaded within ti…
rinx Jul 9, 2020
c2240f1
:rotating_light: fix deepsource issues
rinx Jul 9, 2020
839c13b
:truck: move internal/json -> internal/encoding/json
rinx Jul 9, 2020
7974dcf
[agent-NGT] refactor pkg/agent/core/service using FOP (#566)
rinx Jul 9, 2020
154f395
:recycle: refactor initNGT func
rinx Jul 9, 2020
d877f5e
:white_check_mark: update
rinx Jul 10, 2020
db4a6d4
:sparkles: add isValid flag to metadata
rinx Jul 13, 2020
9bd724d
:green_heart: revise fetch depth for run test workflow
rinx Jul 13, 2020
f82e27b
:recycle: use isInvalid because it should be default to false
rinx Jul 13, 2020
d2770b8
:art: fix tag format
rinx Jul 13, 2020
5c37eca
:recycle: fix poststop logic
rinx Jul 13, 2020
6bf1dec
:sparkles: add Escape() method to errgroup
rinx Jul 13, 2020
9c98268
:package: add *.containers field
rinx Jul 15, 2020
27801a3
Revert ":package: add *.containers field"
rinx Jul 15, 2020
3158cfe
:white_check_mark: update
rinx Jul 17, 2020
a32eae2
Revert ":white_check_mark: update"
rinx Jul 20, 2020
e83db86
Revert ":sparkles: add Escape() method to errgroup"
rinx Jul 20, 2020
30ae1f8
:recycle: exit safely using goroutine
rinx Jul 20, 2020
8c4ed90
:truck: move internal/metadata -> pkg/agent/internal/metadata
rinx Jul 20, 2020
fa8c6cb
:recycle: add defaultPoolSize const
rinx Jul 20, 2020
1554330
:recycle: pass cfg struct to agent service
rinx Jul 20, 2020
eac9b47
:whale: fix dockerfiles
rinx Jul 20, 2020
60a704d
:recycle: add watch_enabled & auto_backup_enabled options
rinx Jul 20, 2020
a8db9be
:white_check_mark: fix tests
rinx Jul 20, 2020
c1755e5
:recycle: revise agent-ngt codes based on suggestions
rinx Jul 20, 2020
bc77f2c
:white_check_mark: update
rinx Jul 21, 2020
25d315b
:recycle: revise observer poststop
rinx Jul 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Check out code.
uses: actions/checkout@v1
with:
fetch-depth: 1
fetch-depth: 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please teach me about this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes this workflow fails on master branch because of failing to fetch commits.
https://github.com/vdaas/vald/actions/runs/164399448
To prevent this, it is better to fetch not only the latest commit.

- name: Run tests for cmd packages / tparse
run: |
make test/cmd/tparse | tee tparse.txt || cat tparse.txt
Expand Down
27 changes: 23 additions & 4 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ agent:
revisionHistoryLimit: 2
# @schema {"name": "agent.terminationGracePeriodSeconds", "type": "integer", "minimum": 0}
# agent.terminationGracePeriodSeconds -- duration in seconds pod needs to terminate gracefully
terminationGracePeriodSeconds: 120
terminationGracePeriodSeconds: 600
kpango marked this conversation as resolved.
Show resolved Hide resolved
# @schema {"name": "agent.podManagementPolicy", "type": "string", "enum": ["OrderedReady", "Parallel"]}
# agent.podManagementPolicy -- pod management policy: OrderedReady or Parallel
podManagementPolicy: OrderedReady
Expand Down Expand Up @@ -1266,6 +1266,19 @@ agent:
# @schema {"name": "agent.ngt.enable_in_memory_mode", "type": "boolean"}
# agent.ngt.enable_in_memory_mode -- in-memory mode enabled
enable_in_memory_mode: true
# @schema {"name": "agent.ngt.default_pool_size", "type": "integer"}
# agent.ngt.default_pool_size -- default create index batch pool size
default_pool_size: 10000
# @schema {"name": "agent.ngt.min_load_index_timeout", "type": "string"}
# agent.ngt.min_load_index_timeout -- minimum duration of load index timeout
min_load_index_timeout: 3m
# @schema {"name": "agent.ngt.max_load_index_timeout", "type": "string"}
# agent.ngt.max_load_index_timeout -- maximum duration of load index timeout
max_load_index_timeout: 10m
# @schema {"name": "agent.ngt.load_index_timeout_factor", "type": "string"}
# agent.ngt.load_index_timeout_factor -- a factor of load index timeout.
# timeout duration will be calculated by (index count to be loaded) * (factor).
load_index_timeout_factor: 1ms
# @schema {"name": "agent.sidecar", "type": "object"}
sidecar:
# @schema {"name": "agent.sidecar.enabled", "type": "boolean"}
Expand Down Expand Up @@ -1372,12 +1385,18 @@ agent:
memory: 100Mi
# @schema {"name": "agent.sidecar.config", "type": "object"}
config:
# @schema {"name": "agent.sidecar.config.watch_enabled", "type": "boolean"}
# agent.sidecar.config.watch_enabled -- auto backup triggered by file changes is enabled
watch_enabled: true
# @schema {"name": "agent.sidecar.config.auto_backup_enabled", "type": "boolean"}
# agent.sidecar.config.auto_backup_enabled -- auto backup triggered by timer is enabled
auto_backup_enabled: true
# @schema {"name": "agent.sidecar.config.auto_backup_duration", "type": "string"}
# agent.sidecar.config.auto_backup_duration -- auto backup duration
auto_backup_duration: 10m
auto_backup_duration: 24h
# @schema {"name": "agent.sidecar.config.post_stop_timeout", "type": "string"}
# agent.sidecar.config.post_stop_timeout -- timeout duration for file changing during post stop
post_stop_timeout: 20s
# agent.sidecar.config.post_stop_timeout -- timeout for observing file changes during post stop
post_stop_timeout: 2m
# @schema {"name": "agent.sidecar.config.filename", "type": "string"}
# agent.sidecar.config.filename -- backup filename
filename: _MY_POD_NAME_
Expand Down
4 changes: 4 additions & 0 deletions dockers/agent/core/ngt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ FROM vdaas/vald-base:latest AS builder
ENV ORG vdaas
ENV REPO vald
ENV PKG agent/core/ngt
ENV PKG_INTERNAL agent/internal
ENV APP_NAME ngt

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal
Expand All @@ -30,6 +31,9 @@ COPY apis/grpc .
WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG}
COPY pkg/${PKG} .

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG_INTERNAL}
COPY pkg/${PKG_INTERNAL} .

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG}
COPY cmd/${PKG} .

Expand Down
4 changes: 4 additions & 0 deletions dockers/agent/sidecar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ FROM vdaas/vald-base:latest AS builder
ENV ORG vdaas
ENV REPO vald
ENV PKG agent/sidecar
ENV PKG_INTERNAL agent/internal
ENV APP_NAME sidecar

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal
Expand All @@ -30,6 +31,9 @@ COPY apis/grpc .
WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG}
COPY pkg/${PKG} .

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG_INTERNAL}
COPY pkg/${PKG_INTERNAL} .

WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG}
COPY cmd/${PKG} .

Expand Down
2 changes: 1 addition & 1 deletion internal/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ import "io"
type Compressor interface {
CompressVector(vector []float32) (bytes []byte, err error)
DecompressVector(bytes []byte) (vector []float32, err error)
Reader(src io.Reader) (io.Reader, error)
Reader(src io.ReadCloser) (io.ReadCloser, error)
Writer(dst io.WriteCloser) (io.WriteCloser, error)
}
8 changes: 6 additions & 2 deletions internal/compress/gob.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (g *gobCompressor) DecompressVector(bs []byte) ([]float32, error) {
return vector, nil
}

func (g *gobCompressor) Reader(src io.Reader) (io.Reader, error) {
func (g *gobCompressor) Reader(src io.ReadCloser) (io.ReadCloser, error) {
return &gobReader{
src: src,
decoder: gob.NewDecoder(src),
Expand All @@ -75,7 +75,7 @@ func (g *gobCompressor) Writer(dst io.WriteCloser) (io.WriteCloser, error) {
}

type gobReader struct {
src io.Reader
src io.ReadCloser
decoder *gob.Decoder
}

Expand All @@ -88,6 +88,10 @@ func (gr *gobReader) Read(p []byte) (n int, err error) {
return len(p), nil
}

func (gr *gobReader) Close() error {
return gr.src.Close()
}

type gobWriter struct {
dst io.WriteCloser
encoder *gob.Encoder
Expand Down
104 changes: 91 additions & 13 deletions internal/compress/gob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestNewGob(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func Test_gobCompressor_CompressVector(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func Test_gobCompressor_DecompressVector(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -297,22 +297,22 @@ func Test_gobCompressor_DecompressVector(t *testing.T) {

func Test_gobCompressor_Reader(t *testing.T) {
type args struct {
src io.Reader
src io.ReadCloser
}
type want struct {
want io.Reader
want io.ReadCloser
err error
}
type test struct {
name string
args args
g *gobCompressor
want want
checkFunc func(want, io.Reader, error) error
checkFunc func(want, io.ReadCloser, error) error
beforeFunc func(args)
afterFunc func(args)
}
defaultCheckFunc := func(w want, got io.Reader, err error) error {
defaultCheckFunc := func(w want, got io.ReadCloser, err error) error {
if !errors.Is(err, w.err) {
return errors.Errorf("got error = %v, want %v", err, w.err)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func Test_gobCompressor_Reader(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand Down Expand Up @@ -428,7 +428,7 @@ func Test_gobCompressor_Writer(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -454,7 +454,7 @@ func Test_gobReader_Read(t *testing.T) {
p []byte
}
type fields struct {
src io.Reader
src io.ReadCloser
decoder *gob.Decoder
}
type want struct {
Expand Down Expand Up @@ -517,7 +517,7 @@ func Test_gobReader_Read(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand All @@ -541,6 +541,84 @@ func Test_gobReader_Read(t *testing.T) {
}
}

func Test_gobReader_Close(t *testing.T) {
type fields struct {
src io.ReadCloser
decoder *gob.Decoder
}
type want struct {
err error
}
type test struct {
name string
fields fields
want want
checkFunc func(want, error) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, err error) error {
if !errors.Is(err, w.err) {
return errors.Errorf("got error = %v, want %v", err, w.err)
}
return nil
}
tests := []test{
// TODO test cases
/*
{
name: "test_case_1",
fields: fields {
src: nil,
decoder: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
},
*/

// TODO test cases
/*
func() test {
return test {
name: "test_case_2",
fields: fields {
src: nil,
decoder: nil,
},
want: want{},
checkFunc: defaultCheckFunc,
}
}(),
*/
}

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc()
}
if test.afterFunc != nil {
defer test.afterFunc()
}
if test.checkFunc == nil {
test.checkFunc = defaultCheckFunc
}
gr := &gobReader{
src: test.fields.src,
decoder: test.fields.decoder,
}

err := gr.Close()
if err := test.checkFunc(test.want, err); err != nil {
tt.Errorf("error = %v", err)
}

})
}
}

func Test_gobWriter_Write(t *testing.T) {
type args struct {
p []byte
Expand Down Expand Up @@ -609,7 +687,7 @@ func Test_gobWriter_Write(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc(test.args)
}
Expand Down Expand Up @@ -687,7 +765,7 @@ func Test_gobWriter_Close(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {
defer goleak.VerifyNone(t)
defer goleak.VerifyNone(tt)
if test.beforeFunc != nil {
test.beforeFunc()
}
Expand Down
30 changes: 28 additions & 2 deletions internal/compress/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,16 @@ func (g *gzipCompressor) DecompressVector(bs []byte) ([]float32, error) {
return vec, nil
}

func (g *gzipCompressor) Reader(src io.Reader) (io.Reader, error) {
return gzip.NewReader(src)
func (g *gzipCompressor) Reader(src io.ReadCloser) (io.ReadCloser, error) {
r, err := gzip.NewReader(src)
if err != nil {
return nil, err
}

return &gzipReader{
src: src,
r: r,
}, nil
}

func (g *gzipCompressor) Writer(dst io.WriteCloser) (io.WriteCloser, error) {
Expand All @@ -103,6 +111,24 @@ func (g *gzipCompressor) Writer(dst io.WriteCloser) (io.WriteCloser, error) {
}, nil
}

type gzipReader struct {
src io.ReadCloser
r io.ReadCloser
}

func (g *gzipReader) Read(p []byte) (n int, err error) {
return g.r.Read(p)
}

func (g *gzipReader) Close() (err error) {
err = g.r.Close()
if err != nil {
return errors.Wrap(g.src.Close(), err.Error())
}

return g.src.Close()
}

type gzipWriter struct {
dst io.WriteCloser
w io.WriteCloser
Expand Down
Loading