From b16ac4dee151578bb4d577a72ad68badc07c7e38 Mon Sep 17 00:00:00 2001 From: VM <112189277+sysvm@users.noreply.github.com> Date: Mon, 19 Dec 2022 10:36:46 +0800 Subject: [PATCH 01/12] fix mock file (#3) Co-authored-by: DylanYong --- store/piecestore/storage/mock/interface_mock.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/store/piecestore/storage/mock/interface_mock.go b/store/piecestore/storage/mock/interface_mock.go index e9364070b..ebc16f480 100644 --- a/store/piecestore/storage/mock/interface_mock.go +++ b/store/piecestore/storage/mock/interface_mock.go @@ -10,7 +10,7 @@ import ( reflect "reflect" time "time" - store "github.com/bnb-chain/inscription-storage-provider/store/piecestore/store" + storage "github.com/bnb-chain/inscription-storage-provider/store/piecestore/storage" gomock "github.com/golang/mock/gomock" ) @@ -95,10 +95,10 @@ func (mr *MockObjectStorageMockRecorder) HeadBucket(ctx interface{}) *gomock.Cal } // HeadObject mocks base method. -func (m *MockObjectStorage) HeadObject(ctx context.Context, key string) (store.Object, error) { +func (m *MockObjectStorage) HeadObject(ctx context.Context, key string) (storage.Object, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "HeadObject", ctx, key) - ret0, _ := ret[0].(store.Object) + ret0, _ := ret[0].(storage.Object) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -110,10 +110,10 @@ func (mr *MockObjectStorageMockRecorder) HeadObject(ctx, key interface{}) *gomoc } // ListAllObjects mocks base method. -func (m *MockObjectStorage) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan store.Object, error) { +func (m *MockObjectStorage) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan storage.Object, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListAllObjects", ctx, prefix, marker) - ret0, _ := ret[0].(<-chan store.Object) + ret0, _ := ret[0].(<-chan storage.Object) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -125,10 +125,10 @@ func (mr *MockObjectStorageMockRecorder) ListAllObjects(ctx, prefix, marker inte } // ListObjects mocks base method. -func (m *MockObjectStorage) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]store.Object, error) { +func (m *MockObjectStorage) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]storage.Object, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListObjects", ctx, prefix, marker, delimiter, limit) - ret0, _ := ret[0].([]store.Object) + ret0, _ := ret[0].([]storage.Object) ret1, _ := ret[1].(error) return ret0, ret1 } From a7f8aa937be6eb61c8fd2ce44419378fb1f43012 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Wed, 21 Dec 2022 22:45:14 +0800 Subject: [PATCH 02/12] add service lifecycle module --- model/{ => errors}/errors.go | 2 +- pkg/lifecycle/component.go | 28 ++++ pkg/lifecycle/lifecycle.go | 154 +++++++++++++++++++++ store/piecestore/piece/piece_store.go | 4 +- store/piecestore/storage/disk_file.go | 4 +- store/piecestore/storage/disk_file_test.go | 6 +- store/piecestore/storage/memory.go | 12 +- store/piecestore/storage/memory_test.go | 14 +- store/piecestore/storage/object_storage.go | 6 +- store/piecestore/storage/s3.go | 6 +- store/piecestore/storage/s3_test.go | 4 +- 11 files changed, 211 insertions(+), 29 deletions(-) rename model/{ => errors}/errors.go (95%) create mode 100644 pkg/lifecycle/component.go create mode 100644 pkg/lifecycle/lifecycle.go diff --git a/model/errors.go b/model/errors/errors.go similarity index 95% rename from model/errors.go rename to model/errors/errors.go index e258c5cae..50e01851a 100644 --- a/model/errors.go +++ b/model/errors/errors.go @@ -1,4 +1,4 @@ -package model +package errors import "errors" diff --git a/pkg/lifecycle/component.go b/pkg/lifecycle/component.go new file mode 100644 index 000000000..11712d85b --- /dev/null +++ b/pkg/lifecycle/component.go @@ -0,0 +1,28 @@ +package lifecycle + +import "context" + +type Component interface { + Name() string + Init(ctx context.Context) error +} + +type component struct { + name string + fn func(ctx context.Context) error +} + +func NewComponent(name string, fn func(ctx context.Context) error) Component { + return &component{ + name: name, + fn: fn, + } +} + +func (c *component) Name() string { + return c.name +} + +func (c *component) Init(ctx context.Context) error { + return c.fn(ctx) +} diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go new file mode 100644 index 000000000..d0ad302fc --- /dev/null +++ b/pkg/lifecycle/lifecycle.go @@ -0,0 +1,154 @@ +package lifecycle + +import ( + "context" + "errors" + "os" + "os/signal" + "sync" + "time" + + "github.com/bnb-chain/inscription-storage-provider/util/log" +) + +type Service interface { + Name() string + Start(ctx context.Context) error + Stop(ctx context.Context) error +} + +type ServiceLifecycle struct { + innerCtx context.Context + innerCancel context.CancelFunc + services []Service + failure bool + timeout time.Duration +} + +// NewService returns an initialized service lifecycle +func NewService(timeout time.Duration) *ServiceLifecycle { + innerCtx, innerCancel := context.WithCancel(context.Background()) + return &ServiceLifecycle{ + innerCtx: innerCtx, + innerCancel: innerCancel, + timeout: timeout, + } +} + +// Init can be used to initialize components +func (s *ServiceLifecycle) Init(ctx context.Context, components ...Component) *ServiceLifecycle { + if s.failure { + return s + } + for _, c := range components { + select { + case <-s.innerCtx.Done(): + s.failure = true + return s + default: + } + if err := c.Init(ctx); err != nil { + log.Panicf("Init %s error: %v", c.Name(), err) + s.failure = true + return s + } + log.Infof("Init %s successfully!", c.Name()) + } + return s +} + +// StartServices starts running services +func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Service) *ServiceLifecycle { + if s.failure { + return s + } + s.services = append(s.services, services...) + for _, service := range services { + select { + case <-s.innerCtx.Done(): + s.failure = true + return s + default: + } + go s.start(ctx, service) + } + return s +} + +func (s *ServiceLifecycle) start(ctx context.Context, service Service) { + defer s.innerCancel() + if err := service.Start(ctx); err != nil { + log.Panicf("Service %s starts error: %v", service.Name(), err) + return + } + log.Infof("Service %s starts successfully", service.Name()) +} + +// Signals registers monitor signals +func (s *ServiceLifecycle) Signals(sigs ...os.Signal) *ServiceLifecycle { + if s.failure { + return s + } + go s.signals(sigs...) + return s +} + +func (s *ServiceLifecycle) signals(sigs ...os.Signal) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, sigs...) + for { + select { + case <-s.innerCtx.Done(): + return + case sig := <-sigCh: + for _, j := range sigs { + if j == sig { + s.innerCancel() + return + } + } + } + } +} + +// Wait blocks until context is done +func (s *ServiceLifecycle) Wait(ctx context.Context) { + <-s.innerCtx.Done() + s.GracefulShutdown(ctx) +} + +// GracefulShutdown can stop services when context is done or timeout +func (s *ServiceLifecycle) GracefulShutdown(ctx context.Context) { + gCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + go s.stopService(ctx, cancel) + + <-gCtx.Done() + if errors.Is(gCtx.Err(), context.Canceled) { + log.Infow("Service graceful shutdown, context canceled, service stops working", "service config timeout", + s.timeout) + } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { + log.Panic("Timeout while stopping service, killing instance manually") + } +} + +func (s *ServiceLifecycle) stopService(ctx context.Context, cancel context.CancelFunc) { + var wg sync.WaitGroup + for _, service := range s.services { + wg.Add(1) + go func(ctx context.Context, service Service) { + defer wg.Done() + if err := service.Stop(ctx); err != nil { + log.Panicf("Service %s stops failure: %v", service.Name(), err) + } else { + log.Infof("Service %s stops successfully!", service.Name()) + } + }(ctx, service) + } + wg.Wait() + cancel() +} + +// Done check context is done +func (s *ServiceLifecycle) Done() <-chan struct{} { + return s.innerCtx.Done() +} diff --git a/store/piecestore/piece/piece_store.go b/store/piecestore/piece/piece_store.go index 5cb6a624e..02f3ad247 100644 --- a/store/piecestore/piece/piece_store.go +++ b/store/piecestore/piece/piece_store.go @@ -10,7 +10,7 @@ import ( "runtime" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model" + errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/bnb-chain/inscription-storage-provider/store/piecestore/storage" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -93,7 +93,7 @@ func createStorage(cfg *config.PieceStoreConfig) (storage.ObjectStorage, error) func checkBucket(ctx context.Context, store storage.ObjectStorage) error { if err := store.HeadBucket(ctx); err != nil { log.Errorw("HeadBucket error", "error", err) - if errors.Is(err, model.BucketNotExisted) { + if errors.Is(err, errors2.BucketNotExisted) { if err2 := store.CreateBucket(ctx); err2 != nil { return fmt.Errorf("Failed to create bucket in %s: %s, previous err: %s", store, err2, err) } diff --git a/store/piecestore/storage/disk_file.go b/store/piecestore/storage/disk_file.go index d29cfc9cc..eeed9f8f6 100644 --- a/store/piecestore/storage/disk_file.go +++ b/store/piecestore/storage/disk_file.go @@ -13,7 +13,7 @@ import ( "strings" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -145,7 +145,7 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error { func (d *diskFileStore) HeadBucket(ctx context.Context) error { if _, err := os.Stat(d.root); err != nil { if os.IsNotExist(err) { - return model.BucketNotExisted + return errors.BucketNotExisted } return err } diff --git a/store/piecestore/storage/disk_file_test.go b/store/piecestore/storage/disk_file_test.go index d2583d0ae..55fd8b2b0 100644 --- a/store/piecestore/storage/disk_file_test.go +++ b/store/piecestore/storage/disk_file_test.go @@ -11,7 +11,7 @@ import ( "strings" "testing" - "github.com/bnb-chain/inscription-storage-provider/model" + errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/stretchr/testify/assert" ) @@ -289,13 +289,13 @@ func TestDiskFile_HeadDirSuccess(t *testing.T) { func TestDiskFile_List(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListObjects(context.TODO(), emptyString, emptyString, emptyString, 0) - assert.Equal(t, model.NotSupportedMethod, err) + assert.Equal(t, errors2.NotSupportedMethod, err) } func TestDiskFile_ListAll(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, model.NotSupportedMethod, err) + assert.Equal(t, errors2.NotSupportedMethod, err) } func TestPath(t *testing.T) { diff --git a/store/piecestore/storage/memory.go b/store/piecestore/storage/memory.go index f7214dfb4..81fadebf7 100644 --- a/store/piecestore/storage/memory.go +++ b/store/piecestore/storage/memory.go @@ -12,7 +12,7 @@ import ( "time" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -43,11 +43,11 @@ func (m *memoryStore) GetObject(ctx context.Context, key string, offset, limit i defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, model.EmptyObjectKey + return nil, errors.EmptyObjectKey } d, ok := m.objects[key] if !ok { - return nil, model.EmptyMemoryObject + return nil, errors.EmptyMemoryObject } if offset > int64(len(d.data)) { @@ -65,7 +65,7 @@ func (m *memoryStore) PutObject(ctx context.Context, key string, reader io.Reade defer m.Unlock() // Minimum length is 1 if key == "" { - return model.EmptyObjectKey + return errors.EmptyObjectKey } if _, ok := m.objects[key]; ok { log.Info("overwrite key: ", key) @@ -95,7 +95,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, model.EmptyObjectKey + return nil, errors.EmptyObjectKey } o, ok := m.objects[key] if !ok { @@ -112,7 +112,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error func (m *memoryStore) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { if delimiter != "" { - return nil, model.NotSupportedDelimiter + return nil, errors.NotSupportedDelimiter } m.Lock() defer m.Unlock() diff --git a/store/piecestore/storage/memory_test.go b/store/piecestore/storage/memory_test.go index d0634cfdb..f1128739c 100644 --- a/store/piecestore/storage/memory_test.go +++ b/store/piecestore/storage/memory_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/stretchr/testify/assert" ) @@ -62,12 +62,12 @@ func TestMemory_GetError(t *testing.T) { { name: "memory_get_error_test1", key: emptyString, - wantedErr: model.EmptyObjectKey, + wantedErr: errors.EmptyObjectKey, }, { name: "memory_get_error_test2", key: mockKey, - wantedErr: model.EmptyMemoryObject, + wantedErr: errors.EmptyMemoryObject, }, } for _, tt := range cases { @@ -94,7 +94,7 @@ func TestMemory_Put(t *testing.T) { name: "memory_put_test1", key: emptyString, data: endPoint, - wantedErr: model.EmptyObjectKey, + wantedErr: errors.EmptyObjectKey, }, { name: "memory_put_test2", @@ -178,7 +178,7 @@ func TestMemory_HeadError(t *testing.T) { { name: "memory_head_error_test1", key: emptyString, - wantedErr: model.EmptyObjectKey, + wantedErr: errors.EmptyObjectKey, }, { name: "memory_head_error_test2", @@ -230,7 +230,7 @@ func TestMemory_ListError(t *testing.T) { { name: "memory_list_error_test1", delimiter: mockKey, - wantedErr: model.NotSupportedDelimiter, + wantedErr: errors.NotSupportedDelimiter, }, } for _, tt := range cases { @@ -246,5 +246,5 @@ func TestMemory_ListError(t *testing.T) { func TestMemory_ListAll(t *testing.T) { store := setupMemoryTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, model.NotSupportedMethod, err) + assert.Equal(t, errors.NotSupportedMethod, err) } diff --git a/store/piecestore/storage/object_storage.go b/store/piecestore/storage/object_storage.go index 976e65e6c..56ef4457d 100644 --- a/store/piecestore/storage/object_storage.go +++ b/store/piecestore/storage/object_storage.go @@ -8,7 +8,7 @@ import ( "sync" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/bnb-chain/inscription-storage-provider/model/piecestore" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -36,11 +36,11 @@ func (s DefaultObjectStorage) CreateBucket(ctx context.Context) error { } func (s DefaultObjectStorage) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { - return nil, model.NotSupportedMethod + return nil, errors.NotSupportedMethod } func (s DefaultObjectStorage) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, model.NotSupportedMethod + return nil, errors.NotSupportedMethod } type file struct { diff --git a/store/piecestore/storage/s3.go b/store/piecestore/storage/s3.go index 9a10ac726..49f36298a 100644 --- a/store/piecestore/storage/s3.go +++ b/store/piecestore/storage/s3.go @@ -17,7 +17,7 @@ import ( "time" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/bnb-chain/inscription-storage-provider/model/piecestore" "github.com/bnb-chain/inscription-storage-provider/util/log" @@ -148,7 +148,7 @@ func (s *s3Store) HeadBucket(ctx context.Context) error { log.Errorw("ObjectStorage S3 HeadBucket error", "error", err) if reqErr, ok := err.(awserr.RequestFailure); ok { if reqErr.StatusCode() == http.StatusNotFound { - return model.BucketNotExisted + return errors.BucketNotExisted } } return err @@ -216,7 +216,7 @@ func (s *s3Store) ListObjects(ctx context.Context, prefix, marker, delimiter str } func (s *s3Store) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, model.NotSupportedMethod + return nil, errors.NotSupportedMethod } // SessionCache holds session.Session according to model.ObjectStorage and it synchronizes access/modification diff --git a/store/piecestore/storage/s3_test.go b/store/piecestore/storage/s3_test.go index bf598da8e..760c749dd 100644 --- a/store/piecestore/storage/s3_test.go +++ b/store/piecestore/storage/s3_test.go @@ -8,7 +8,7 @@ import ( "testing" "time" - "github.com/bnb-chain/inscription-storage-provider/model" + errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" @@ -283,7 +283,7 @@ func TestS3_ListSuccess(t *testing.T) { func TestS3_ListAll(t *testing.T) { store := setupS3Test(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, model.NotSupportedMethod, err) + assert.Equal(t, errors2.NotSupportedMethod, err) } type mockS3ClientError struct { From 7ead554ca7772e1d1a5aefec3756c38b3e76285b Mon Sep 17 00:00:00 2001 From: DylanYong Date: Wed, 21 Dec 2022 22:54:52 +0800 Subject: [PATCH 03/12] fix --- model/{errors => }/errors.go | 2 +- pkg/lifecycle/lifecycle.go | 11 +++++------ store/piecestore/piece/piece_store.go | 2 +- store/piecestore/storage/disk_file.go | 4 ++-- store/piecestore/storage/disk_file_test.go | 3 +-- store/piecestore/storage/memory.go | 12 ++++++------ store/piecestore/storage/memory_test.go | 15 +++++++-------- store/piecestore/storage/object_storage.go | 6 +++--- store/piecestore/storage/s3.go | 6 +++--- store/piecestore/storage/s3_test.go | 3 +-- 10 files changed, 30 insertions(+), 34 deletions(-) rename model/{errors => }/errors.go (95%) diff --git a/model/errors/errors.go b/model/errors.go similarity index 95% rename from model/errors/errors.go rename to model/errors.go index 50e01851a..e258c5cae 100644 --- a/model/errors/errors.go +++ b/model/errors.go @@ -1,4 +1,4 @@ -package errors +package model import "errors" diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index d0ad302fc..deeb5b8df 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -50,9 +50,9 @@ func (s *ServiceLifecycle) Init(ctx context.Context, components ...Component) *S if err := c.Init(ctx); err != nil { log.Panicf("Init %s error: %v", c.Name(), err) s.failure = true - return s + } else { + log.Infof("Init %s successfully!", c.Name()) } - log.Infof("Init %s successfully!", c.Name()) } return s } @@ -79,9 +79,9 @@ func (s *ServiceLifecycle) start(ctx context.Context, service Service) { defer s.innerCancel() if err := service.Start(ctx); err != nil { log.Panicf("Service %s starts error: %v", service.Name(), err) - return + } else { + log.Infof("Service %s starts successfully", service.Name()) } - log.Infof("Service %s starts successfully", service.Name()) } // Signals registers monitor signals @@ -124,8 +124,7 @@ func (s *ServiceLifecycle) GracefulShutdown(ctx context.Context) { <-gCtx.Done() if errors.Is(gCtx.Err(), context.Canceled) { - log.Infow("Service graceful shutdown, context canceled, service stops working", "service config timeout", - s.timeout) + log.Infow("Service graceful shutdown", "service config timeout", s.timeout) } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { log.Panic("Timeout while stopping service, killing instance manually") } diff --git a/store/piecestore/piece/piece_store.go b/store/piecestore/piece/piece_store.go index 02f3ad247..e5f3c1818 100644 --- a/store/piecestore/piece/piece_store.go +++ b/store/piecestore/piece/piece_store.go @@ -10,7 +10,7 @@ import ( "runtime" "github.com/bnb-chain/inscription-storage-provider/config" - errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" + errors2 "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/store/piecestore/storage" "github.com/bnb-chain/inscription-storage-provider/util/log" ) diff --git a/store/piecestore/storage/disk_file.go b/store/piecestore/storage/disk_file.go index eeed9f8f6..d29cfc9cc 100644 --- a/store/piecestore/storage/disk_file.go +++ b/store/piecestore/storage/disk_file.go @@ -13,7 +13,7 @@ import ( "strings" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model/errors" + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -145,7 +145,7 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error { func (d *diskFileStore) HeadBucket(ctx context.Context) error { if _, err := os.Stat(d.root); err != nil { if os.IsNotExist(err) { - return errors.BucketNotExisted + return model.BucketNotExisted } return err } diff --git a/store/piecestore/storage/disk_file_test.go b/store/piecestore/storage/disk_file_test.go index 55fd8b2b0..ff1148f27 100644 --- a/store/piecestore/storage/disk_file_test.go +++ b/store/piecestore/storage/disk_file_test.go @@ -11,8 +11,7 @@ import ( "strings" "testing" - errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" - + errors2 "github.com/bnb-chain/inscription-storage-provider/model" "github.com/stretchr/testify/assert" ) diff --git a/store/piecestore/storage/memory.go b/store/piecestore/storage/memory.go index 81fadebf7..f7214dfb4 100644 --- a/store/piecestore/storage/memory.go +++ b/store/piecestore/storage/memory.go @@ -12,7 +12,7 @@ import ( "time" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model/errors" + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -43,11 +43,11 @@ func (m *memoryStore) GetObject(ctx context.Context, key string, offset, limit i defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, errors.EmptyObjectKey + return nil, model.EmptyObjectKey } d, ok := m.objects[key] if !ok { - return nil, errors.EmptyMemoryObject + return nil, model.EmptyMemoryObject } if offset > int64(len(d.data)) { @@ -65,7 +65,7 @@ func (m *memoryStore) PutObject(ctx context.Context, key string, reader io.Reade defer m.Unlock() // Minimum length is 1 if key == "" { - return errors.EmptyObjectKey + return model.EmptyObjectKey } if _, ok := m.objects[key]; ok { log.Info("overwrite key: ", key) @@ -95,7 +95,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error defer m.Unlock() // Minimum length is 1 if key == "" { - return nil, errors.EmptyObjectKey + return nil, model.EmptyObjectKey } o, ok := m.objects[key] if !ok { @@ -112,7 +112,7 @@ func (m *memoryStore) HeadObject(ctx context.Context, key string) (Object, error func (m *memoryStore) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { if delimiter != "" { - return nil, errors.NotSupportedDelimiter + return nil, model.NotSupportedDelimiter } m.Lock() defer m.Unlock() diff --git a/store/piecestore/storage/memory_test.go b/store/piecestore/storage/memory_test.go index f1128739c..060e1343b 100644 --- a/store/piecestore/storage/memory_test.go +++ b/store/piecestore/storage/memory_test.go @@ -7,8 +7,7 @@ import ( "strings" "testing" - "github.com/bnb-chain/inscription-storage-provider/model/errors" - + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/stretchr/testify/assert" ) @@ -62,12 +61,12 @@ func TestMemory_GetError(t *testing.T) { { name: "memory_get_error_test1", key: emptyString, - wantedErr: errors.EmptyObjectKey, + wantedErr: model.EmptyObjectKey, }, { name: "memory_get_error_test2", key: mockKey, - wantedErr: errors.EmptyMemoryObject, + wantedErr: model.EmptyMemoryObject, }, } for _, tt := range cases { @@ -94,7 +93,7 @@ func TestMemory_Put(t *testing.T) { name: "memory_put_test1", key: emptyString, data: endPoint, - wantedErr: errors.EmptyObjectKey, + wantedErr: model.EmptyObjectKey, }, { name: "memory_put_test2", @@ -178,7 +177,7 @@ func TestMemory_HeadError(t *testing.T) { { name: "memory_head_error_test1", key: emptyString, - wantedErr: errors.EmptyObjectKey, + wantedErr: model.EmptyObjectKey, }, { name: "memory_head_error_test2", @@ -230,7 +229,7 @@ func TestMemory_ListError(t *testing.T) { { name: "memory_list_error_test1", delimiter: mockKey, - wantedErr: errors.NotSupportedDelimiter, + wantedErr: model.NotSupportedDelimiter, }, } for _, tt := range cases { @@ -246,5 +245,5 @@ func TestMemory_ListError(t *testing.T) { func TestMemory_ListAll(t *testing.T) { store := setupMemoryTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, errors.NotSupportedMethod, err) + assert.Equal(t, model.NotSupportedMethod, err) } diff --git a/store/piecestore/storage/object_storage.go b/store/piecestore/storage/object_storage.go index 56ef4457d..976e65e6c 100644 --- a/store/piecestore/storage/object_storage.go +++ b/store/piecestore/storage/object_storage.go @@ -8,7 +8,7 @@ import ( "sync" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model/errors" + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/model/piecestore" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -36,11 +36,11 @@ func (s DefaultObjectStorage) CreateBucket(ctx context.Context) error { } func (s DefaultObjectStorage) ListObjects(ctx context.Context, prefix, marker, delimiter string, limit int64) ([]Object, error) { - return nil, errors.NotSupportedMethod + return nil, model.NotSupportedMethod } func (s DefaultObjectStorage) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, errors.NotSupportedMethod + return nil, model.NotSupportedMethod } type file struct { diff --git a/store/piecestore/storage/s3.go b/store/piecestore/storage/s3.go index 49f36298a..9a10ac726 100644 --- a/store/piecestore/storage/s3.go +++ b/store/piecestore/storage/s3.go @@ -17,7 +17,7 @@ import ( "time" "github.com/bnb-chain/inscription-storage-provider/config" - "github.com/bnb-chain/inscription-storage-provider/model/errors" + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/model/piecestore" "github.com/bnb-chain/inscription-storage-provider/util/log" @@ -148,7 +148,7 @@ func (s *s3Store) HeadBucket(ctx context.Context) error { log.Errorw("ObjectStorage S3 HeadBucket error", "error", err) if reqErr, ok := err.(awserr.RequestFailure); ok { if reqErr.StatusCode() == http.StatusNotFound { - return errors.BucketNotExisted + return model.BucketNotExisted } } return err @@ -216,7 +216,7 @@ func (s *s3Store) ListObjects(ctx context.Context, prefix, marker, delimiter str } func (s *s3Store) ListAllObjects(ctx context.Context, prefix, marker string) (<-chan Object, error) { - return nil, errors.NotSupportedMethod + return nil, model.NotSupportedMethod } // SessionCache holds session.Session according to model.ObjectStorage and it synchronizes access/modification diff --git a/store/piecestore/storage/s3_test.go b/store/piecestore/storage/s3_test.go index 760c749dd..d8c619961 100644 --- a/store/piecestore/storage/s3_test.go +++ b/store/piecestore/storage/s3_test.go @@ -8,12 +8,11 @@ import ( "testing" "time" - errors2 "github.com/bnb-chain/inscription-storage-provider/model/errors" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + errors2 "github.com/bnb-chain/inscription-storage-provider/model" "github.com/stretchr/testify/assert" ) From 4ed160edaf04147723c81f2a86c28746b3cc4bd6 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Wed, 21 Dec 2022 22:57:40 +0800 Subject: [PATCH 04/12] fix --- store/piecestore/piece/piece_store.go | 4 ++-- store/piecestore/storage/disk_file_test.go | 7 ++++--- store/piecestore/storage/memory_test.go | 1 + store/piecestore/storage/s3_test.go | 5 +++-- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/store/piecestore/piece/piece_store.go b/store/piecestore/piece/piece_store.go index e5f3c1818..5cb6a624e 100644 --- a/store/piecestore/piece/piece_store.go +++ b/store/piecestore/piece/piece_store.go @@ -10,7 +10,7 @@ import ( "runtime" "github.com/bnb-chain/inscription-storage-provider/config" - errors2 "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model" "github.com/bnb-chain/inscription-storage-provider/store/piecestore/storage" "github.com/bnb-chain/inscription-storage-provider/util/log" ) @@ -93,7 +93,7 @@ func createStorage(cfg *config.PieceStoreConfig) (storage.ObjectStorage, error) func checkBucket(ctx context.Context, store storage.ObjectStorage) error { if err := store.HeadBucket(ctx); err != nil { log.Errorw("HeadBucket error", "error", err) - if errors.Is(err, errors2.BucketNotExisted) { + if errors.Is(err, model.BucketNotExisted) { if err2 := store.CreateBucket(ctx); err2 != nil { return fmt.Errorf("Failed to create bucket in %s: %s, previous err: %s", store, err2, err) } diff --git a/store/piecestore/storage/disk_file_test.go b/store/piecestore/storage/disk_file_test.go index ff1148f27..d2583d0ae 100644 --- a/store/piecestore/storage/disk_file_test.go +++ b/store/piecestore/storage/disk_file_test.go @@ -11,7 +11,8 @@ import ( "strings" "testing" - errors2 "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/stretchr/testify/assert" ) @@ -288,13 +289,13 @@ func TestDiskFile_HeadDirSuccess(t *testing.T) { func TestDiskFile_List(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListObjects(context.TODO(), emptyString, emptyString, emptyString, 0) - assert.Equal(t, errors2.NotSupportedMethod, err) + assert.Equal(t, model.NotSupportedMethod, err) } func TestDiskFile_ListAll(t *testing.T) { store := setupDiskFileTest(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, errors2.NotSupportedMethod, err) + assert.Equal(t, model.NotSupportedMethod, err) } func TestPath(t *testing.T) { diff --git a/store/piecestore/storage/memory_test.go b/store/piecestore/storage/memory_test.go index 060e1343b..d0634cfdb 100644 --- a/store/piecestore/storage/memory_test.go +++ b/store/piecestore/storage/memory_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/stretchr/testify/assert" ) diff --git a/store/piecestore/storage/s3_test.go b/store/piecestore/storage/s3_test.go index d8c619961..bf598da8e 100644 --- a/store/piecestore/storage/s3_test.go +++ b/store/piecestore/storage/s3_test.go @@ -8,11 +8,12 @@ import ( "testing" "time" + "github.com/bnb-chain/inscription-storage-provider/model" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" - errors2 "github.com/bnb-chain/inscription-storage-provider/model" "github.com/stretchr/testify/assert" ) @@ -282,7 +283,7 @@ func TestS3_ListSuccess(t *testing.T) { func TestS3_ListAll(t *testing.T) { store := setupS3Test(t) _, err := store.ListAllObjects(context.TODO(), emptyString, emptyString) - assert.Equal(t, errors2.NotSupportedMethod, err) + assert.Equal(t, model.NotSupportedMethod, err) } type mockS3ClientError struct { From da5ce610ecc9f64287c7b94077a40ad3a7adc855 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Thu, 22 Dec 2022 14:47:09 +0800 Subject: [PATCH 05/12] fix --- pkg/lifecycle/READEME.md | 98 ++++++++++++++++++++++++++++++++++++++ pkg/lifecycle/lifecycle.go | 14 +++--- 2 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 pkg/lifecycle/READEME.md diff --git a/pkg/lifecycle/READEME.md b/pkg/lifecycle/READEME.md new file mode 100644 index 000000000..e9bb3d8af --- /dev/null +++ b/pkg/lifecycle/READEME.md @@ -0,0 +1,98 @@ +# Service Lifecycle + +This package provides useful method to manage the lifecycle of a service with convenient initializations of components. + +## Interface + +```go +type Component interface { + Name() string + Init(ctx context.Context) error +} + +type Service interface { + Name() string + Start(ctx context.Context) error + Stop(ctx context.Context) error +} +``` + +## Feature + +- Start and stop a group of services +- Init function with components +- Monitor signals to stop services + +## Example + +```go +package main + +import ( + "context" + "fmt" + "net/http" + "syscall" + "time" + + "github.com/bnb-chain/inscription-storage-provider/pkg/lifecycle" +) + +func server1(w http.ResponseWriter, req *http.Request) { + fmt.Fprintln(w, "server1!") +} + +func server2(w http.ResponseWriter, req *http.Request) { + fmt.Fprintln(w, "server2!") +} + +type Server1 struct{} + +func (s Server1) Start(ctx context.Context) error { + fmt.Println("Server1 start") + http.HandleFunc("/server1", server1) + http.ListenAndServe("localhost:8080", nil) + return nil +} + +func (s Server1) Stop(ctx context.Context) error { + fmt.Println("Stop server1 service...") + return nil +} + +func (s Server1) Init(ctx context.Context) error { return nil } + +func (s Server1) Name() string { + return "Service: server1" +} + +type Server2 struct{} + +func (s Server2) Start(ctx context.Context) error { + fmt.Println("Server2 start") + http.HandleFunc("/server2", server2) + http.ListenAndServe("localhost:8081", nil) + return nil +} + +func (s Server2) Stop(ctx context.Context) error { + fmt.Println("Stop server2 service...") + return nil +} + +func (s Server2) Init(ctx context.Context) error { return nil } + +func (s Server2) Name() string { + return "Service: server2" +} + +func main() { + ctx := context.Background() + l := lifecycle.NewService(5 * time.Second) + var ( + s1 = Server1{} + s2 = Server2{} + ) + l.Signals(syscall.SIGINT, syscall.SIGTERM).Init(ctx, s1, s2).StartServices(ctx, s1, s2).Wait(ctx) +} +``` diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index deeb5b8df..b6d51e94f 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -11,12 +11,14 @@ import ( "github.com/bnb-chain/inscription-storage-provider/util/log" ) +// Service provides abstract methods to control the lifecycle of a service type Service interface { Name() string Start(ctx context.Context) error Stop(ctx context.Context) error } +// ServiceLifecycle is a lifecycle of one service type ServiceLifecycle struct { innerCtx context.Context innerCancel context.CancelFunc @@ -114,23 +116,23 @@ func (s *ServiceLifecycle) signals(sigs ...os.Signal) { // Wait blocks until context is done func (s *ServiceLifecycle) Wait(ctx context.Context) { <-s.innerCtx.Done() - s.GracefulShutdown(ctx) + s.StopServices(ctx) } -// GracefulShutdown can stop services when context is done or timeout -func (s *ServiceLifecycle) GracefulShutdown(ctx context.Context) { +// StopServices can stop services when context is done or timeout +func (s *ServiceLifecycle) StopServices(ctx context.Context) { gCtx, cancel := context.WithTimeout(context.Background(), s.timeout) - go s.stopService(ctx, cancel) + go s.stop(ctx, cancel) <-gCtx.Done() if errors.Is(gCtx.Err(), context.Canceled) { - log.Infow("Service graceful shutdown", "service config timeout", s.timeout) + log.Infow("Services stop working", "service config timeout", s.timeout) } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { log.Panic("Timeout while stopping service, killing instance manually") } } -func (s *ServiceLifecycle) stopService(ctx context.Context, cancel context.CancelFunc) { +func (s *ServiceLifecycle) stop(ctx context.Context, cancel context.CancelFunc) { var wg sync.WaitGroup for _, service := range s.services { wg.Add(1) From 0e8f6e21a4644efb186bad4a03cee9ac2218da0f Mon Sep 17 00:00:00 2001 From: DylanYong Date: Thu, 22 Dec 2022 14:51:55 +0800 Subject: [PATCH 06/12] fix --- pkg/lifecycle/{READEME.md => README.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pkg/lifecycle/{READEME.md => README.md} (100%) diff --git a/pkg/lifecycle/READEME.md b/pkg/lifecycle/README.md similarity index 100% rename from pkg/lifecycle/READEME.md rename to pkg/lifecycle/README.md From 18fdf9bfb91101e29b460c1ffe45e175779149d9 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Thu, 22 Dec 2022 14:55:07 +0800 Subject: [PATCH 07/12] fix --- pkg/lifecycle/README.md | 2 +- pkg/lifecycle/component.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/lifecycle/README.md b/pkg/lifecycle/README.md index e9bb3d8af..dae0d5376 100644 --- a/pkg/lifecycle/README.md +++ b/pkg/lifecycle/README.md @@ -1,6 +1,6 @@ # Service Lifecycle -This package provides useful method to manage the lifecycle of a service with convenient initializations of components. +This package provides useful method to manage the lifecycle of a service or a group of services with convenient initializations of components. ## Interface diff --git a/pkg/lifecycle/component.go b/pkg/lifecycle/component.go index 11712d85b..42b10ef46 100644 --- a/pkg/lifecycle/component.go +++ b/pkg/lifecycle/component.go @@ -2,6 +2,7 @@ package lifecycle import "context" +// Component provides two methods to implement a component type Component interface { Name() string Init(ctx context.Context) error @@ -12,6 +13,7 @@ type component struct { fn func(ctx context.Context) error } +// NewComponent returns an instance of one component func NewComponent(name string, fn func(ctx context.Context) error) Component { return &component{ name: name, @@ -19,10 +21,12 @@ func NewComponent(name string, fn func(ctx context.Context) error) Component { } } +// Name describes the name of one component func (c *component) Name() string { return c.name } +// Init initializes one component func (c *component) Init(ctx context.Context) error { return c.fn(ctx) } From 773de0bdb46fe64f47bb18c95e467b6d5c501c7f Mon Sep 17 00:00:00 2001 From: DylanYong Date: Thu, 22 Dec 2022 14:56:38 +0800 Subject: [PATCH 08/12] fix --- pkg/lifecycle/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/lifecycle/README.md b/pkg/lifecycle/README.md index dae0d5376..76352e3ca 100644 --- a/pkg/lifecycle/README.md +++ b/pkg/lifecycle/README.md @@ -6,8 +6,8 @@ This package provides useful method to manage the lifecycle of a service or a gr ```go type Component interface { - Name() string - Init(ctx context.Context) error + Name() string + Init(ctx context.Context) error } type Service interface { From ed955dbe902e309ad527f9c678a5f1c46d769782 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Thu, 22 Dec 2022 22:17:13 +0800 Subject: [PATCH 09/12] fix --- pkg/lifecycle/1.go | 125 ++++++++++++++++++++++++++++++++++ pkg/lifecycle/2.go | 134 +++++++++++++++++++++++++++++++++++++ pkg/lifecycle/component.go | 32 --------- pkg/lifecycle/lifecycle.go | 28 +------- test/e2e/lifecycle/1.go | 126 ++++++++++++++++++++++++++++++++++ test/e2e/lifecycle/main.go | 69 +++++++++++++++++++ 6 files changed, 457 insertions(+), 57 deletions(-) create mode 100644 pkg/lifecycle/1.go create mode 100644 pkg/lifecycle/2.go delete mode 100644 pkg/lifecycle/component.go create mode 100644 test/e2e/lifecycle/1.go create mode 100644 test/e2e/lifecycle/main.go diff --git a/pkg/lifecycle/1.go b/pkg/lifecycle/1.go new file mode 100644 index 000000000..7b3201f59 --- /dev/null +++ b/pkg/lifecycle/1.go @@ -0,0 +1,125 @@ +package lifecycle + +/* +// Service provides abstract methods to control the lifecycle of a service +type Service1 interface { + Name() string + Start(ctx context.Context) error + Stop(ctx context.Context) error +} + +// ServiceLifecycle is a lifecycle of one service +type ServiceLifecycle1 struct { + innerCtx context.Context + innerCancel context.CancelFunc + services []Service1 + failure bool + timeout time.Duration +} + +// NewService returns an initialized service lifecycle +func NewService1(timeout time.Duration) *ServiceLifecycle1 { + innerCtx, innerCancel := context.WithCancel(context.Background()) + return &ServiceLifecycle1{ + innerCtx: innerCtx, + innerCancel: innerCancel, + timeout: timeout, + } +} + +// StartServices starts running services +func (s *ServiceLifecycle1) StartServices(ctx context.Context, services ...Service1) *ServiceLifecycle1 { + if s.failure { + return s + } + s.services = append(s.services, services...) + for _, service := range services { + select { + case <-s.innerCtx.Done(): + s.failure = true + return s + default: + } + go s.start(ctx, service) + } + return s +} + +func (s *ServiceLifecycle1) start(ctx context.Context, service Service1) { + defer s.innerCancel() + if err := service.Start(ctx); err != nil { + log.Panicf("Service %s starts error: %v", service.Name(), err) + } else { + log.Printf("Service %s starts successfully", service.Name()) + } +} + +// Signals registers monitor signals +func (s *ServiceLifecycle1) Signals(sigs ...os.Signal) *ServiceLifecycle1 { + if s.failure { + return s + } + go s.signals(sigs...) + return s +} + +func (s *ServiceLifecycle1) signals(sigs ...os.Signal) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, sigs...) + for { + select { + case <-s.innerCtx.Done(): + return + case sig := <-sigCh: + for _, j := range sigs { + if j == sig { + s.innerCancel() + return + } + } + } + } +} + +// Wait blocks until context is done +func (s *ServiceLifecycle1) Wait(ctx context.Context) { + <-s.innerCtx.Done() + s.StopServices(ctx) +} + +// StopServices can stop services when context is done or timeout +func (s *ServiceLifecycle1) StopServices(ctx context.Context) { + gCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + go s.stop(ctx, cancel) + + <-gCtx.Done() + if errors.Is(gCtx.Err(), context.Canceled) { + log.Printf("Services stop working, service config timeout : %s", s.timeout) + } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { + log.Panic("Timeout while stopping service, killing instance manually") + } +} + +func (s *ServiceLifecycle1) stop(ctx context.Context, cancel context.CancelFunc) { + var wg sync.WaitGroup + for _, service := range s.services { + wg.Add(1) + go func(ctx context.Context, service Service1) { + defer wg.Done() + if err := service.Stop(ctx); err != nil { + log.Panicf("Service %s stops failure: %v", service.Name(), err) + } else { + log.Printf("Service %s stops successfully!", service.Name()) + } + }(ctx, service) + } + wg.Wait() + cancel() +} + +// Done check context is done +func (s *ServiceLifecycle1) Done() <-chan struct{} { + return s.innerCtx.Done() +} + +*/ diff --git a/pkg/lifecycle/2.go b/pkg/lifecycle/2.go new file mode 100644 index 000000000..0912e563d --- /dev/null +++ b/pkg/lifecycle/2.go @@ -0,0 +1,134 @@ +package lifecycle + +/* +// Service provides abstract methods to control the lifecycle of a service +type Service interface { + Name() string + Start() error + Stop() error +} + +// ServiceLifecycle is a lifecycle of one service +type ServiceLifecycle struct { + services []Service + serviceMap sync.Map + sigChan chan os.Signal + stopChan chan bool + timeout time.Duration +} + +// NewService returns an initialized service lifecycle +func NewService(timeout time.Duration) *ServiceLifecycle { + s := &ServiceLifecycle{ + stopChan: make(chan bool, 1), + sigChan: make(chan os.Signal, 1), + timeout: timeout, + } + s.configureSignals() + return s +} + +func (s *ServiceLifecycle) configureSignals() { + signal.Notify(s.sigChan, syscall.SIGUSR1) +} + +// RegisterServices +func (s *ServiceLifecycle) RegisterServices(services ...Service) *ServiceLifecycle { + for _, service := range services { + s.serviceMap.Store(service.Name(), service) + } + return s +} + +// StartServices starts running services +func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Service) { + go func() { + <-ctx.Done() + s.StopServices() + }() + s.services = append(s.services, services...) + s.start() + //s.serviceMap.Range(func(key, value any) bool { + // // name := key.(string) + // srv := value.(Service) + // s.start(srv) + // return true + //}) +} + +func (s *ServiceLifecycle) start() { + for _, service := range s.services { + go func(service Service) { + log.Infow("start func", "name", service.Name()) + if err := service.Start(); err != nil { + log.Errorf("Service %s starts error: %v", service.Name(), err) + if err1 := service.Stop(); err1 != nil { + log.Errorf("Service %s stops failure: %v", service.Name(), err1) + } + } else { + log.Infof("Service %s starts successfully", service.Name()) + } + }(service) + } +} + +// Wait blocks until service shutdown +func (s *ServiceLifecycle) Wait() { + <-s.stopChan +} + +// StopServices stops services +func (s *ServiceLifecycle) StopServices() { + defer log.Info("Services stopped") + //s.serviceMap.Range(func(key, value any) bool { + // srv := value.(Service) + // s.stop(srv) + // return true + //}) + go s.stop() + s.stopChan <- true +} + +func (s *ServiceLifecycle) stop() { + var wg sync.WaitGroup + for _, service := range s.services { + wg.Add(1) + go func(service Service) { + defer wg.Done() + if err := service.Stop(); err != nil { + log.Errorf("Service %s stops failure: %v", service.Name(), err) + } else { + log.Errorf("Service %s stops successfully!", service.Name()) + } + }(service) + } + wg.Wait() + //if err := service.Stop(); err != nil { + // log.Errorf("Service %s stops failure: %v", service.Name(), err) + //} else { + // log.Infof("Service %s stops successfully!", service.Name()) + //} +} + +// Close destroy services +func (s *ServiceLifecycle) Close() { + ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + go func() { + <-ctx.Done() + if errors.Is(ctx.Err(), context.Canceled) { + log.Infow("Services stop working", "service config timeout", s.timeout) + } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { + log.Panic("Timeout while closing service, killing instance manually") + } + }() + + log.Info("close services") + signal.Stop(s.sigChan) + close(s.sigChan) + close(s.stopChan) + + // this func would add some other stopFunc services such as stopMetricsClient + cancel() +} + +*/ diff --git a/pkg/lifecycle/component.go b/pkg/lifecycle/component.go deleted file mode 100644 index 42b10ef46..000000000 --- a/pkg/lifecycle/component.go +++ /dev/null @@ -1,32 +0,0 @@ -package lifecycle - -import "context" - -// Component provides two methods to implement a component -type Component interface { - Name() string - Init(ctx context.Context) error -} - -type component struct { - name string - fn func(ctx context.Context) error -} - -// NewComponent returns an instance of one component -func NewComponent(name string, fn func(ctx context.Context) error) Component { - return &component{ - name: name, - fn: fn, - } -} - -// Name describes the name of one component -func (c *component) Name() string { - return c.name -} - -// Init initializes one component -func (c *component) Init(ctx context.Context) error { - return c.fn(ctx) -} diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index b6d51e94f..62ce382c0 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -37,35 +37,13 @@ func NewService(timeout time.Duration) *ServiceLifecycle { } } -// Init can be used to initialize components -func (s *ServiceLifecycle) Init(ctx context.Context, components ...Component) *ServiceLifecycle { - if s.failure { - return s - } - for _, c := range components { - select { - case <-s.innerCtx.Done(): - s.failure = true - return s - default: - } - if err := c.Init(ctx); err != nil { - log.Panicf("Init %s error: %v", c.Name(), err) - s.failure = true - } else { - log.Infof("Init %s successfully!", c.Name()) - } - } - return s -} - // StartServices starts running services func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Service) *ServiceLifecycle { if s.failure { return s } s.services = append(s.services, services...) - for _, service := range services { + for _, service := range s.services { select { case <-s.innerCtx.Done(): s.failure = true @@ -80,7 +58,7 @@ func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Servic func (s *ServiceLifecycle) start(ctx context.Context, service Service) { defer s.innerCancel() if err := service.Start(ctx); err != nil { - log.Panicf("Service %s starts error: %v", service.Name(), err) + log.Errorf("Service %s starts error: %v", service.Name(), err) } else { log.Infof("Service %s starts successfully", service.Name()) } @@ -139,7 +117,7 @@ func (s *ServiceLifecycle) stop(ctx context.Context, cancel context.CancelFunc) go func(ctx context.Context, service Service) { defer wg.Done() if err := service.Stop(ctx); err != nil { - log.Panicf("Service %s stops failure: %v", service.Name(), err) + log.Errorf("Service %s stops failure: %v", service.Name(), err) } else { log.Infof("Service %s stops successfully!", service.Name()) } diff --git a/test/e2e/lifecycle/1.go b/test/e2e/lifecycle/1.go new file mode 100644 index 000000000..1282090c9 --- /dev/null +++ b/test/e2e/lifecycle/1.go @@ -0,0 +1,126 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "sync" + "time" +) + +// Service provides abstract methods to control the lifecycle of a service +type Service interface { + Name() string + Start(ctx context.Context) error + Stop(ctx context.Context) error +} + +type Starter struct { + context context.Context + cancel context.CancelFunc + services []Service + fail bool +} + +func New() (*Starter, error) { + ctx, cancel := context.WithCancel(context.Background()) + return &Starter{ + context: ctx, + cancel: cancel, + }, nil +} + +func (s *Starter) RunServices(ctx context.Context, services ...Service) *Starter { + if s.fail { + return s + } + s.services = append(s.services, services...) + for _, service := range services { + select { + case <-s.context.Done(): + log.Println("shutdown ...") + s.fail = true + return s + default: + } + go s.start(ctx, service) + } + return s +} + +func (s *Starter) start(ctx context.Context, service Service) { + defer s.cancel() + err := service.Start(ctx) + if err != nil { + log.Printf("service %s is done: %v", service.Name(), err) + } else { + log.Printf("service %s is done", service.Name()) + } +} + +func (s *Starter) Signals(signals ...os.Signal) *Starter { + if s.fail { + return s + } + go s.signals(signals...) + return s +} + +func (s *Starter) signals(signals ...os.Signal) { + sigs := make(chan os.Signal, 1) + signal.Notify( + sigs, + signals..., + ) + for { + select { + case <-s.context.Done(): + return + case event := <-sigs: + for _, item := range signals { + if item == event { + s.cancel() + return + } + } + } + } +} + +func (s *Starter) Done() <-chan struct{} { + return s.context.Done() +} + +func (s *Starter) Wait(ctx context.Context) { + <-s.context.Done() + s.GracefulStop(ctx) +} + +func (s *Starter) GracefulStop(conf context.Context) { + log.Printf("Graceful shutdown ...") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + go s.gracefulStop(conf, cancel) + <-ctx.Done() +} + +func (s *Starter) gracefulStop(conf context.Context, cancel context.CancelFunc) { + wg := &sync.WaitGroup{} + for _, service := range s.services { + wg.Add(1) + go s.stopService(conf, wg, service) + } + wg.Wait() + cancel() +} + +func (s *Starter) stopService(conf context.Context, wg *sync.WaitGroup, service Service) { + defer wg.Done() + err := service.Stop(conf) + if err != nil { + log.Printf("service %s is stopped: %v", service.Name(), err) + } else { + log.Printf("service %s is stopped", service.Name()) + } +} diff --git a/test/e2e/lifecycle/main.go b/test/e2e/lifecycle/main.go new file mode 100644 index 000000000..9cd65dfcb --- /dev/null +++ b/test/e2e/lifecycle/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "syscall" + "time" + + "github.com/bnb-chain/inscription-storage-provider/pkg/lifecycle" + "github.com/bnb-chain/inscription-storage-provider/util/log" +) + +func morning(w http.ResponseWriter, req *http.Request) { + fmt.Fprintln(w, "morning!") +} + +func evening(w http.ResponseWriter, req *http.Request) { + fmt.Fprintln(w, "evening!") +} + +type Morning struct{} + +func (m Morning) Start(ctx context.Context) error { + http.HandleFunc("/morning", morning) + http.ListenAndServe("localhost:8080", nil) + return nil +} + +func (m Morning) Stop(ctx context.Context) error { + log.Info("Stop morning service...") + return nil +} + +func (m Morning) Name() string { + return "Morning 123" +} + +type Evening struct{} + +func (e Evening) Start(ctx context.Context) error { + http.HandleFunc("/evening", evening) + http.ListenAndServe("localhost:8081", nil) + return nil +} + +func (e Evening) Stop(ctx context.Context) error { + log.Info("Stop evening service...") + return nil +} + +func (e Evening) Name() string { + return "Evening 456" +} + +func main() { + ctx := context.Background() + l := lifecycle.NewService(5 * time.Second) + var ( + m Morning + e Evening + ) + l.Signals(syscall.SIGINT, syscall.SIGTERM).StartServices(ctx, m, e).Wait(ctx) + //fmt.Println(syscall.Getpid()) + //ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + //s.StartServices(ctx, s1, s2) + //defer s.Close() + //s.Wait() +} From 7ec58bbf15e87a6925b446ff8567c3a7ad2742d3 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Fri, 23 Dec 2022 18:41:42 +0800 Subject: [PATCH 10/12] fix --- pkg/lifecycle/1.go | 125 ----------------------------- pkg/lifecycle/2.go | 134 -------------------------------- pkg/lifecycle/README.md | 66 ++-------------- pkg/lifecycle/lifecycle.go | 68 +++++++--------- pkg/lifecycle/lifecycle_test.go | 5 ++ test/e2e/lifecycle/1.go | 126 ------------------------------ test/e2e/lifecycle/main.go | 69 ---------------- 7 files changed, 38 insertions(+), 555 deletions(-) delete mode 100644 pkg/lifecycle/1.go delete mode 100644 pkg/lifecycle/2.go create mode 100644 pkg/lifecycle/lifecycle_test.go delete mode 100644 test/e2e/lifecycle/1.go delete mode 100644 test/e2e/lifecycle/main.go diff --git a/pkg/lifecycle/1.go b/pkg/lifecycle/1.go deleted file mode 100644 index 7b3201f59..000000000 --- a/pkg/lifecycle/1.go +++ /dev/null @@ -1,125 +0,0 @@ -package lifecycle - -/* -// Service provides abstract methods to control the lifecycle of a service -type Service1 interface { - Name() string - Start(ctx context.Context) error - Stop(ctx context.Context) error -} - -// ServiceLifecycle is a lifecycle of one service -type ServiceLifecycle1 struct { - innerCtx context.Context - innerCancel context.CancelFunc - services []Service1 - failure bool - timeout time.Duration -} - -// NewService returns an initialized service lifecycle -func NewService1(timeout time.Duration) *ServiceLifecycle1 { - innerCtx, innerCancel := context.WithCancel(context.Background()) - return &ServiceLifecycle1{ - innerCtx: innerCtx, - innerCancel: innerCancel, - timeout: timeout, - } -} - -// StartServices starts running services -func (s *ServiceLifecycle1) StartServices(ctx context.Context, services ...Service1) *ServiceLifecycle1 { - if s.failure { - return s - } - s.services = append(s.services, services...) - for _, service := range services { - select { - case <-s.innerCtx.Done(): - s.failure = true - return s - default: - } - go s.start(ctx, service) - } - return s -} - -func (s *ServiceLifecycle1) start(ctx context.Context, service Service1) { - defer s.innerCancel() - if err := service.Start(ctx); err != nil { - log.Panicf("Service %s starts error: %v", service.Name(), err) - } else { - log.Printf("Service %s starts successfully", service.Name()) - } -} - -// Signals registers monitor signals -func (s *ServiceLifecycle1) Signals(sigs ...os.Signal) *ServiceLifecycle1 { - if s.failure { - return s - } - go s.signals(sigs...) - return s -} - -func (s *ServiceLifecycle1) signals(sigs ...os.Signal) { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, sigs...) - for { - select { - case <-s.innerCtx.Done(): - return - case sig := <-sigCh: - for _, j := range sigs { - if j == sig { - s.innerCancel() - return - } - } - } - } -} - -// Wait blocks until context is done -func (s *ServiceLifecycle1) Wait(ctx context.Context) { - <-s.innerCtx.Done() - s.StopServices(ctx) -} - -// StopServices can stop services when context is done or timeout -func (s *ServiceLifecycle1) StopServices(ctx context.Context) { - gCtx, cancel := context.WithTimeout(context.Background(), s.timeout) - go s.stop(ctx, cancel) - - <-gCtx.Done() - if errors.Is(gCtx.Err(), context.Canceled) { - log.Printf("Services stop working, service config timeout : %s", s.timeout) - } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { - log.Panic("Timeout while stopping service, killing instance manually") - } -} - -func (s *ServiceLifecycle1) stop(ctx context.Context, cancel context.CancelFunc) { - var wg sync.WaitGroup - for _, service := range s.services { - wg.Add(1) - go func(ctx context.Context, service Service1) { - defer wg.Done() - if err := service.Stop(ctx); err != nil { - log.Panicf("Service %s stops failure: %v", service.Name(), err) - } else { - log.Printf("Service %s stops successfully!", service.Name()) - } - }(ctx, service) - } - wg.Wait() - cancel() -} - -// Done check context is done -func (s *ServiceLifecycle1) Done() <-chan struct{} { - return s.innerCtx.Done() -} - -*/ diff --git a/pkg/lifecycle/2.go b/pkg/lifecycle/2.go deleted file mode 100644 index 0912e563d..000000000 --- a/pkg/lifecycle/2.go +++ /dev/null @@ -1,134 +0,0 @@ -package lifecycle - -/* -// Service provides abstract methods to control the lifecycle of a service -type Service interface { - Name() string - Start() error - Stop() error -} - -// ServiceLifecycle is a lifecycle of one service -type ServiceLifecycle struct { - services []Service - serviceMap sync.Map - sigChan chan os.Signal - stopChan chan bool - timeout time.Duration -} - -// NewService returns an initialized service lifecycle -func NewService(timeout time.Duration) *ServiceLifecycle { - s := &ServiceLifecycle{ - stopChan: make(chan bool, 1), - sigChan: make(chan os.Signal, 1), - timeout: timeout, - } - s.configureSignals() - return s -} - -func (s *ServiceLifecycle) configureSignals() { - signal.Notify(s.sigChan, syscall.SIGUSR1) -} - -// RegisterServices -func (s *ServiceLifecycle) RegisterServices(services ...Service) *ServiceLifecycle { - for _, service := range services { - s.serviceMap.Store(service.Name(), service) - } - return s -} - -// StartServices starts running services -func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Service) { - go func() { - <-ctx.Done() - s.StopServices() - }() - s.services = append(s.services, services...) - s.start() - //s.serviceMap.Range(func(key, value any) bool { - // // name := key.(string) - // srv := value.(Service) - // s.start(srv) - // return true - //}) -} - -func (s *ServiceLifecycle) start() { - for _, service := range s.services { - go func(service Service) { - log.Infow("start func", "name", service.Name()) - if err := service.Start(); err != nil { - log.Errorf("Service %s starts error: %v", service.Name(), err) - if err1 := service.Stop(); err1 != nil { - log.Errorf("Service %s stops failure: %v", service.Name(), err1) - } - } else { - log.Infof("Service %s starts successfully", service.Name()) - } - }(service) - } -} - -// Wait blocks until service shutdown -func (s *ServiceLifecycle) Wait() { - <-s.stopChan -} - -// StopServices stops services -func (s *ServiceLifecycle) StopServices() { - defer log.Info("Services stopped") - //s.serviceMap.Range(func(key, value any) bool { - // srv := value.(Service) - // s.stop(srv) - // return true - //}) - go s.stop() - s.stopChan <- true -} - -func (s *ServiceLifecycle) stop() { - var wg sync.WaitGroup - for _, service := range s.services { - wg.Add(1) - go func(service Service) { - defer wg.Done() - if err := service.Stop(); err != nil { - log.Errorf("Service %s stops failure: %v", service.Name(), err) - } else { - log.Errorf("Service %s stops successfully!", service.Name()) - } - }(service) - } - wg.Wait() - //if err := service.Stop(); err != nil { - // log.Errorf("Service %s stops failure: %v", service.Name(), err) - //} else { - // log.Infof("Service %s stops successfully!", service.Name()) - //} -} - -// Close destroy services -func (s *ServiceLifecycle) Close() { - ctx, cancel := context.WithTimeout(context.Background(), s.timeout) - go func() { - <-ctx.Done() - if errors.Is(ctx.Err(), context.Canceled) { - log.Infow("Services stop working", "service config timeout", s.timeout) - } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { - log.Panic("Timeout while closing service, killing instance manually") - } - }() - - log.Info("close services") - signal.Stop(s.sigChan) - close(s.sigChan) - close(s.stopChan) - - // this func would add some other stopFunc services such as stopMetricsClient - cancel() -} - -*/ diff --git a/pkg/lifecycle/README.md b/pkg/lifecycle/README.md index 76352e3ca..3feee562b 100644 --- a/pkg/lifecycle/README.md +++ b/pkg/lifecycle/README.md @@ -5,11 +5,6 @@ This package provides useful method to manage the lifecycle of a service or a gr ## Interface ```go -type Component interface { - Name() string - Init(ctx context.Context) error -} - type Service interface { Name() string Start(ctx context.Context) error @@ -20,8 +15,8 @@ type Service interface { ## Feature - Start and stop a group of services -- Init function with components - Monitor signals to stop services +- Graceful shutdown ## Example @@ -30,69 +25,18 @@ package main import ( "context" - "fmt" - "net/http" "syscall" "time" "github.com/bnb-chain/inscription-storage-provider/pkg/lifecycle" + "http_server" + "rpc_server" ) -func server1(w http.ResponseWriter, req *http.Request) { - fmt.Fprintln(w, "server1!") -} - -func server2(w http.ResponseWriter, req *http.Request) { - fmt.Fprintln(w, "server2!") -} - -type Server1 struct{} - -func (s Server1) Start(ctx context.Context) error { - fmt.Println("Server1 start") - http.HandleFunc("/server1", server1) - http.ListenAndServe("localhost:8080", nil) - return nil -} - -func (s Server1) Stop(ctx context.Context) error { - fmt.Println("Stop server1 service...") - return nil -} - -func (s Server1) Init(ctx context.Context) error { return nil } - -func (s Server1) Name() string { - return "Service: server1" -} - -type Server2 struct{} - -func (s Server2) Start(ctx context.Context) error { - fmt.Println("Server2 start") - http.HandleFunc("/server2", server2) - http.ListenAndServe("localhost:8081", nil) - return nil -} - -func (s Server2) Stop(ctx context.Context) error { - fmt.Println("Stop server2 service...") - return nil -} - -func (s Server2) Init(ctx context.Context) error { return nil } - -func (s Server2) Name() string { - return "Service: server2" -} - func main() { ctx := context.Background() l := lifecycle.NewService(5 * time.Second) - var ( - s1 = Server1{} - s2 = Server2{} - ) - l.Signals(syscall.SIGINT, syscall.SIGTERM).Init(ctx, s1, s2).StartServices(ctx, s1, s2).Wait(ctx) + l.RegisterServices(http_server, rpc_server) + l.Signals(syscall.SIGINT, syscall.SIGTERM).Init(ctx).StartServices(ctx).Wait(ctx) } ``` diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index 62ce382c0..7190efdaf 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -5,7 +5,6 @@ import ( "errors" "os" "os/signal" - "sync" "time" "github.com/bnb-chain/inscription-storage-provider/util/log" @@ -13,17 +12,18 @@ import ( // Service provides abstract methods to control the lifecycle of a service type Service interface { + // Name describe service name Name() string + // Start and Stop should be used in non-block form Start(ctx context.Context) error Stop(ctx context.Context) error } -// ServiceLifecycle is a lifecycle of one service +// ServiceLifecycle manages services' lifecycle type ServiceLifecycle struct { innerCtx context.Context innerCancel context.CancelFunc services []Service - failure bool timeout time.Duration } @@ -37,38 +37,32 @@ func NewService(timeout time.Duration) *ServiceLifecycle { } } -// StartServices starts running services -func (s *ServiceLifecycle) StartServices(ctx context.Context, services ...Service) *ServiceLifecycle { - if s.failure { - return s - } +// RegisterServices register services of an application +func (s *ServiceLifecycle) RegisterServices(services ...Service) { s.services = append(s.services, services...) - for _, service := range s.services { - select { - case <-s.innerCtx.Done(): - s.failure = true - return s - default: - } - go s.start(ctx, service) - } +} + +// StartServices starts running services +func (s *ServiceLifecycle) StartServices(ctx context.Context) *ServiceLifecycle { + s.start(ctx) return s } -func (s *ServiceLifecycle) start(ctx context.Context, service Service) { - defer s.innerCancel() - if err := service.Start(ctx); err != nil { - log.Errorf("Service %s starts error: %v", service.Name(), err) - } else { - log.Infof("Service %s starts successfully", service.Name()) +func (s *ServiceLifecycle) start(ctx context.Context) { + for i, service := range s.services { + if err := service.Start(ctx); err != nil { + log.Errorf("Service %s starts error: %v", service.Name(), err) + s.services = s.services[:i] + s.innerCancel() + break + } else { + log.Infof("Service %s starts successfully", service.Name()) + } } } // Signals registers monitor signals func (s *ServiceLifecycle) Signals(sigs ...os.Signal) *ServiceLifecycle { - if s.failure { - return s - } go s.signals(sigs...) return s } @@ -97,33 +91,27 @@ func (s *ServiceLifecycle) Wait(ctx context.Context) { s.StopServices(ctx) } -// StopServices can stop services when context is done or timeout +// StopServices stop services when context is done or timeout func (s *ServiceLifecycle) StopServices(ctx context.Context) { gCtx, cancel := context.WithTimeout(context.Background(), s.timeout) - go s.stop(ctx, cancel) + s.stop(ctx, cancel) <-gCtx.Done() if errors.Is(gCtx.Err(), context.Canceled) { log.Infow("Services stop working", "service config timeout", s.timeout) } else if errors.Is(gCtx.Err(), context.DeadlineExceeded) { - log.Panic("Timeout while stopping service, killing instance manually") + log.Error("Timeout while stopping service, killing instance manually") } } func (s *ServiceLifecycle) stop(ctx context.Context, cancel context.CancelFunc) { - var wg sync.WaitGroup for _, service := range s.services { - wg.Add(1) - go func(ctx context.Context, service Service) { - defer wg.Done() - if err := service.Stop(ctx); err != nil { - log.Errorf("Service %s stops failure: %v", service.Name(), err) - } else { - log.Infof("Service %s stops successfully!", service.Name()) - } - }(ctx, service) + if err := service.Stop(ctx); err != nil { + log.Errorf("Service %s stops failure: %v", service.Name(), err) + } else { + log.Infof("Service %s stops successfully!", service.Name()) + } } - wg.Wait() cancel() } diff --git a/pkg/lifecycle/lifecycle_test.go b/pkg/lifecycle/lifecycle_test.go new file mode 100644 index 000000000..dce5a4aa1 --- /dev/null +++ b/pkg/lifecycle/lifecycle_test.go @@ -0,0 +1,5 @@ +package lifecycle + +func Test() { + +} diff --git a/test/e2e/lifecycle/1.go b/test/e2e/lifecycle/1.go deleted file mode 100644 index 1282090c9..000000000 --- a/test/e2e/lifecycle/1.go +++ /dev/null @@ -1,126 +0,0 @@ -package main - -import ( - "context" - "log" - "os" - "os/signal" - "sync" - "time" -) - -// Service provides abstract methods to control the lifecycle of a service -type Service interface { - Name() string - Start(ctx context.Context) error - Stop(ctx context.Context) error -} - -type Starter struct { - context context.Context - cancel context.CancelFunc - services []Service - fail bool -} - -func New() (*Starter, error) { - ctx, cancel := context.WithCancel(context.Background()) - return &Starter{ - context: ctx, - cancel: cancel, - }, nil -} - -func (s *Starter) RunServices(ctx context.Context, services ...Service) *Starter { - if s.fail { - return s - } - s.services = append(s.services, services...) - for _, service := range services { - select { - case <-s.context.Done(): - log.Println("shutdown ...") - s.fail = true - return s - default: - } - go s.start(ctx, service) - } - return s -} - -func (s *Starter) start(ctx context.Context, service Service) { - defer s.cancel() - err := service.Start(ctx) - if err != nil { - log.Printf("service %s is done: %v", service.Name(), err) - } else { - log.Printf("service %s is done", service.Name()) - } -} - -func (s *Starter) Signals(signals ...os.Signal) *Starter { - if s.fail { - return s - } - go s.signals(signals...) - return s -} - -func (s *Starter) signals(signals ...os.Signal) { - sigs := make(chan os.Signal, 1) - signal.Notify( - sigs, - signals..., - ) - for { - select { - case <-s.context.Done(): - return - case event := <-sigs: - for _, item := range signals { - if item == event { - s.cancel() - return - } - } - } - } -} - -func (s *Starter) Done() <-chan struct{} { - return s.context.Done() -} - -func (s *Starter) Wait(ctx context.Context) { - <-s.context.Done() - s.GracefulStop(ctx) -} - -func (s *Starter) GracefulStop(conf context.Context) { - log.Printf("Graceful shutdown ...") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - go s.gracefulStop(conf, cancel) - <-ctx.Done() -} - -func (s *Starter) gracefulStop(conf context.Context, cancel context.CancelFunc) { - wg := &sync.WaitGroup{} - for _, service := range s.services { - wg.Add(1) - go s.stopService(conf, wg, service) - } - wg.Wait() - cancel() -} - -func (s *Starter) stopService(conf context.Context, wg *sync.WaitGroup, service Service) { - defer wg.Done() - err := service.Stop(conf) - if err != nil { - log.Printf("service %s is stopped: %v", service.Name(), err) - } else { - log.Printf("service %s is stopped", service.Name()) - } -} diff --git a/test/e2e/lifecycle/main.go b/test/e2e/lifecycle/main.go deleted file mode 100644 index 9cd65dfcb..000000000 --- a/test/e2e/lifecycle/main.go +++ /dev/null @@ -1,69 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net/http" - "syscall" - "time" - - "github.com/bnb-chain/inscription-storage-provider/pkg/lifecycle" - "github.com/bnb-chain/inscription-storage-provider/util/log" -) - -func morning(w http.ResponseWriter, req *http.Request) { - fmt.Fprintln(w, "morning!") -} - -func evening(w http.ResponseWriter, req *http.Request) { - fmt.Fprintln(w, "evening!") -} - -type Morning struct{} - -func (m Morning) Start(ctx context.Context) error { - http.HandleFunc("/morning", morning) - http.ListenAndServe("localhost:8080", nil) - return nil -} - -func (m Morning) Stop(ctx context.Context) error { - log.Info("Stop morning service...") - return nil -} - -func (m Morning) Name() string { - return "Morning 123" -} - -type Evening struct{} - -func (e Evening) Start(ctx context.Context) error { - http.HandleFunc("/evening", evening) - http.ListenAndServe("localhost:8081", nil) - return nil -} - -func (e Evening) Stop(ctx context.Context) error { - log.Info("Stop evening service...") - return nil -} - -func (e Evening) Name() string { - return "Evening 456" -} - -func main() { - ctx := context.Background() - l := lifecycle.NewService(5 * time.Second) - var ( - m Morning - e Evening - ) - l.Signals(syscall.SIGINT, syscall.SIGTERM).StartServices(ctx, m, e).Wait(ctx) - //fmt.Println(syscall.Getpid()) - //ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - //s.StartServices(ctx, s1, s2) - //defer s.Close() - //s.Wait() -} From b1f42de06fd54f71cc68edeae858abdcc0723c9c Mon Sep 17 00:00:00 2001 From: DylanYong Date: Fri, 23 Dec 2022 18:43:54 +0800 Subject: [PATCH 11/12] fix --- pkg/lifecycle/lifecycle_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/lifecycle/lifecycle_test.go b/pkg/lifecycle/lifecycle_test.go index dce5a4aa1..3307bddde 100644 --- a/pkg/lifecycle/lifecycle_test.go +++ b/pkg/lifecycle/lifecycle_test.go @@ -1,5 +1,8 @@ package lifecycle -func Test() { +import "testing" + +// TODO(VM):Make up later +func Test(t *testing.T) { } From 2a91b42c124d582ea0d039ade6984f97cb694016 Mon Sep 17 00:00:00 2001 From: DylanYong Date: Mon, 26 Dec 2022 10:47:21 +0800 Subject: [PATCH 12/12] fix --- pkg/lifecycle/lifecycle.go | 5 +- pkg/lifecycle/mock/lifecycle_mock.go | 77 ++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 pkg/lifecycle/mock/lifecycle_mock.go diff --git a/pkg/lifecycle/lifecycle.go b/pkg/lifecycle/lifecycle.go index 7190efdaf..18cb2ec15 100644 --- a/pkg/lifecycle/lifecycle.go +++ b/pkg/lifecycle/lifecycle.go @@ -11,11 +11,14 @@ import ( ) // Service provides abstract methods to control the lifecycle of a service +// +//go:generate mockgen -source=./lifecycle.go -destination=./mock/lifecycle_mock.go -package=mock type Service interface { // Name describe service name Name() string - // Start and Stop should be used in non-block form + // Start a service, this method should be used in non-block form Start(ctx context.Context) error + // Stop a service, this method should be used in non-block form Stop(ctx context.Context) error } diff --git a/pkg/lifecycle/mock/lifecycle_mock.go b/pkg/lifecycle/mock/lifecycle_mock.go new file mode 100644 index 000000000..848e45d2d --- /dev/null +++ b/pkg/lifecycle/mock/lifecycle_mock.go @@ -0,0 +1,77 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./lifecycle.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockService is a mock of Service interface. +type MockService struct { + ctrl *gomock.Controller + recorder *MockServiceMockRecorder +} + +// MockServiceMockRecorder is the mock recorder for MockService. +type MockServiceMockRecorder struct { + mock *MockService +} + +// NewMockService creates a new mock instance. +func NewMockService(ctrl *gomock.Controller) *MockService { + mock := &MockService{ctrl: ctrl} + mock.recorder = &MockServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockService) EXPECT() *MockServiceMockRecorder { + return m.recorder +} + +// Name mocks base method. +func (m *MockService) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockServiceMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name)) +} + +// Start mocks base method. +func (m *MockService) Start(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockServiceMockRecorder) Start(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockService)(nil).Start), ctx) +} + +// Stop mocks base method. +func (m *MockService) Stop(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stop", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Stop indicates an expected call of Stop. +func (mr *MockServiceMockRecorder) Stop(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockService)(nil).Stop), ctx) +}