Skip to content

Commit

Permalink
#736 feat(kv): clear RPC method which completely cleans storage
Browse files Browse the repository at this point in the history
#736 feat(kv): `clear` RPC method which completely cleans storage
  • Loading branch information
rustatian authored Jun 24, 2021
2 parents ce53a8e + 60001db commit e9249c7
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 161 deletions.
96 changes: 0 additions & 96 deletions .github/workflows/windows.yml

This file was deleted.

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ v2.3.1 (_.06.2021)
## 👀 New:

- ✏️ Rework `broadcast` plugin. Add architecture diagrams to the `doc` folder. [PR](https://github.com/spiral/roadrunner/pull/732)
- ✏️ Add `Clear` method to the KV plugin RPC. [PR](https://github.com/spiral/roadrunner/pull/736)

## 🩹 Fixes:

Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
<a href="https://packagist.org/packages/spiral/roadrunner"><img src="https://poser.pugx.org/spiral/roadrunner/version"></a>
<a href="https://pkg.go.dev/github.com/spiral/roadrunner/v2?tab=doc"><img src="https://godoc.org/github.com/spiral/roadrunner/v2?status.svg"></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linux/badge.svg" alt=""></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Windows/badge.svg" alt=""></a>
<a href="https://github.com/spiral/roadrunner/actions"><img src="https://github.com/spiral/roadrunner/workflows/Linters/badge.svg" alt=""></a>
<a href="https://goreportcard.com/report/github.com/spiral/roadrunner"><img src="https://goreportcard.com/badge/github.com/spiral/roadrunner"></a>
<a href="https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master"><img src="https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png"></a>
Expand Down
34 changes: 34 additions & 0 deletions plugins/kv/drivers/boltdb/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

type Driver struct {
clearMu sync.RWMutex
// db instance
DB *bolt.DB
// name should be UTF-8
Expand Down Expand Up @@ -373,6 +374,35 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
return m, nil
}

func (d *Driver) Clear() error {
err := d.DB.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(d.bucket)
if err != nil {
d.log.Error("boltdb delete bucket", "error", err)
return err
}

_, err = tx.CreateBucket(d.bucket)
if err != nil {
d.log.Error("boltdb create bucket", "error", err)
return err
}

return nil
})

if err != nil {
d.log.Error("clear transaction failed", "error", err)
return err
}

d.clearMu.Lock()
d.gc = sync.Map{}
d.clearMu.Unlock()

return nil
}

// ========================= PRIVATE =================================

func (d *Driver) startGCLoop() { //nolint:gocognit
Expand All @@ -382,6 +412,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
for {
select {
case <-t.C:
d.clearMu.RLock()

// calculate current time before loop started to be fair
now := time.Now()
d.gc.Range(func(key, value interface{}) bool {
Expand Down Expand Up @@ -414,6 +446,8 @@ func (d *Driver) startGCLoop() { //nolint:gocognit
}
return true
})

d.clearMu.RUnlock()
case <-d.stop:
err := d.DB.Close()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions plugins/kv/drivers/memcached/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,13 @@ func (d *Driver) Delete(keys ...string) error {
}
return nil
}

func (d *Driver) Clear() error {
err := d.client.DeleteAll()
if err != nil {
d.log.Error("flush_all operation failed", "error", err)
return err
}

return nil
}
5 changes: 4 additions & 1 deletion plugins/kv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ type Storage interface {
MExpire(items ...*kvv1.Item) error

// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
// Not supported for the memcached
TTL(keys ...string) (map[string]string, error)

// Clear clean the entire storage
Clear() error

// Delete one or multiple keys.
Delete(keys ...string) error
}
Expand Down
16 changes: 16 additions & 0 deletions plugins/kv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,19 @@ func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error {

return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}

// Clear clean the storage
func (r *rpc) Clear(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rcp_delete")

if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Clear()
if err != nil {
return errors.E(op, err)
}

return nil
}

return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
16 changes: 15 additions & 1 deletion plugins/memory/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type Driver struct {
heap sync.Map
clearMu sync.RWMutex
heap sync.Map
// stop is used to stop keys GC and close boltdb connection
stop chan struct{}
log logger.Logger
Expand Down Expand Up @@ -203,6 +204,14 @@ func (s *Driver) Delete(keys ...string) error {
return nil
}

func (s *Driver) Clear() error {
s.clearMu.Lock()
s.heap = sync.Map{}
s.clearMu.Unlock()

return nil
}

// ================================== PRIVATE ======================================

func (s *Driver) gc() {
Expand All @@ -213,6 +222,9 @@ func (s *Driver) gc() {
ticker.Stop()
return
case now := <-ticker.C:
// mutes needed to clear the map
s.clearMu.RLock()

// check every second
s.heap.Range(func(key, value interface{}) bool {
v := value.(*kvv1.Item)
Expand All @@ -231,6 +243,8 @@ func (s *Driver) gc() {
}
return true
})

s.clearMu.RUnlock()
}
}
}
9 changes: 9 additions & 0 deletions plugins/redis/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,12 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) {
}
return m, nil
}

func (d *Driver) Clear() error {
fdb := d.universalClient.FlushDB(context.Background())
if fdb.Err() != nil {
return fdb.Err()
}

return nil
}
6 changes: 4 additions & 2 deletions tests/plugins/broadcast/broadcast_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func TestBroadcastSameSubscriber(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
mockLogger,
&logger.ZapLogger{},
// mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
Expand Down Expand Up @@ -314,7 +315,8 @@ func TestBroadcastSameSubscriberGlobal(t *testing.T) {
cfg,
&broadcast.Plugin{},
&rpcPlugin.Plugin{},
mockLogger,
&logger.ZapLogger{},
// mockLogger,
&server.Plugin{},
&redis.Plugin{},
&websockets.Plugin{},
Expand Down
Loading

0 comments on commit e9249c7

Please sign in to comment.