Skip to content

Commit

Permalink
[patch] optimize foreach goroutines (#9)
Browse files Browse the repository at this point in the history
* [patch] optimize foreach goroutines

* fix
  • Loading branch information
Yusuke Kato authored Feb 5, 2019
1 parent 4d9c8e7 commit 2c02269
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 127 deletions.
124 changes: 124 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
alias:
default: &default
working_directory: /go/src/github.com/kpango/gache
docker:
- image: circleci/golang:1.11.5
environment:
GOPATH: "/go"
GO111MODULE: "on"
REPO_NAME: "kpango"
IMAGE_NAME: "gache"
GITHUB_API: "https://api.github.com/"
DOCKER_USER: "kpango"
setup_remote_docker: &setup_remote_docker
version: 18.06.0-ce
docker_layer_caching: true

version: 2
jobs:
test:
<<: *default
steps:
- checkout
- restore_cache:
key: gosum-{{ .Branch }}-{{ checksum "go.sum" }}
- run:
name: preparation
command: |
go mod vendor
- run:
name: run tests
command: |
go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html
bash <(curl -s https://codecov.io/bash)
- store_artifacts:
path: ./coverage.html
- save_cache:
key: gosum-{{ .Branch }}-{{ checksum "go.sum" }}
paths:
- ./vendor
versioning:
<<: *default
steps:
- checkout
- run:
name: check
command: |
mkdir -p $HOME/.ssh/ && echo -e "Host github.com\n\tStrictHostKeyChecking no\n" > ~/.ssh/config
LAST_COMMIT=`git log -1 --pretty=%B`
VERSION=`git describe --abbrev=0 --tags`
touch ./.tag
if [ ! -z "`git diff $VERSION`" -o -z "$VERSION" ]; then
VERSION=${VERSION:-'0.0.0'}
MAJOR="${VERSION%%.*}"; VERSION="${VERSION#*.}"
MINOR="${VERSION%%.*}"; VERSION="${VERSION#*.}"
PATCH="${VERSION%%.*}"; VERSION="${VERSION#*.}"
if echo $LAST_COMMIT | grep "\[\(major\|MAJOR\)\]" > /dev/null; then
MAJOR=$((MAJOR+1))
echo "$MAJOR.0.0" > ./.tag
elif echo $LAST_COMMIT | grep "\[\(minor\|MINOR\)\]" > /dev/null; then
MINOR=$((MINOR+1))
echo "$MAJOR.$MINOR.0" > ./.tag
elif echo $LAST_COMMIT | grep "\[\(patch\|PATCH\)\]" > /dev/null; then
PATCH=$((PATCH+1))
echo "$MAJOR.$MINOR.$PATCH" > ./.tag
fi
fi
- persist_to_workspace:
root: .
paths:
- .
push:
<<: *default
steps:
- attach_workspace:
at: .
- run:
name: push tag and check PR body
command: |
mkdir -p $HOME/.ssh/ && echo -e "Host github.com\n\tStrictHostKeyChecking no\n" > ~/.ssh/config
TAG=`cat ./.tag`
if [ ! -z "$TAG" ]; then
echo $TAG
git tag $TAG
git push https://${GITHUB_ACCESS_TOKEN}:x-oauth-basic@github.com/${REPO_NAME}/${IMAGE_NAME} --tags
fi
- persist_to_workspace:
root: .
paths:
- .
gh_release:
<<: *default
steps:
- attach_workspace:
at: .
- run:
name: release
command: |
mkdir -p $HOME/.ssh/ && echo -e "Host github.com\n\tStrictHostKeyChecking no\n" > ~/.ssh/config
TAG=`cat ./.tag`
if [ ! -z "$TAG" ]; then
echo "Create release: ${TAG}"
curl -H "Authorization: token ${GITHUB_ACCESS_TOKEN}" \
-X POST \
-d "{\"tag_name\": \"${TAG}\"}" \
${GITHUB_API}repos/${REPO_NAME}/${IMAGE_NAME}/releases
fi
workflows:
version: 2
build:
jobs:
- test
- versioning:
filters:
branches:
only:
- master
- push:
requires:
- versioning
- gh_release:
requires:
- push
37 changes: 0 additions & 37 deletions circle.yml

This file was deleted.

13 changes: 0 additions & 13 deletions coverage.sh

This file was deleted.

106 changes: 49 additions & 57 deletions gache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/kpango/fastime"
"golang.org/x/sync/singleflight"
)
Expand All @@ -16,7 +16,7 @@ type (
Gache interface {
Clear()
Delete(string)
DeleteExpired(ctx context.Context) <-chan uint64
DeleteExpired(ctx context.Context) uint64
Foreach(context.Context, func(string, interface{}, int64) bool) Gache
Get(string) (interface{}, bool)
Set(string, interface{})
Expand Down Expand Up @@ -88,7 +88,7 @@ func GetGache() Gache {

// isValid checks expiration of value
func (v *value) isValid() bool {
return v.expire == 0 || fastime.Now().UnixNano() < v.expire
return v.expire == 0 || fastime.UnixNanoNow() < v.expire
}

// SetDefaultExpire set expire duration
Expand Down Expand Up @@ -172,8 +172,7 @@ func ToMap(ctx context.Context) *sync.Map {

// get returns value & exists from key
func (g *gache) get(key string) (interface{}, bool) {
shard := g.getShard(key)
v, ok := shard.Load(key)
v, ok := g.getShard(key).Load(key)

if !ok {
return nil, false
Expand Down Expand Up @@ -203,7 +202,7 @@ func Get(key string) (interface{}, bool) {
func (g *gache) set(key string, val interface{}, expire time.Duration) {
var exp int64
if expire > 0 {
exp = fastime.Now().Add(expire).UnixNano()
exp = fastime.UnixNanoNow() + int64(expire)
}
g.getShard(key).Store(key, &value{
expire: exp,
Expand Down Expand Up @@ -252,71 +251,64 @@ func (g *gache) expiration(key string) {
}

// DeleteExpired deletes expired value from Gache it can be cancel using context
func (g *gache) DeleteExpired(ctx context.Context) <-chan uint64 {
ch := make(chan uint64)
go func() {
wg := new(sync.WaitGroup)
rows := new(uint64)
for i := range g.shards {
g.shards[i].data.Range(func(k, v interface{}) bool {
result := make(chan bool)
wg.Add(1)
go func(c context.Context) {
select {
case <-c.Done():
wg.Done()
result <- false
return
default:
d, ok := v.(*value)
if ok {
if !d.isValid() {
g.expiration(k.(string))
atomic.AddUint64(rows, 1)
}
result <- true
wg.Done()
return
func (g *gache) DeleteExpired(ctx context.Context) uint64 {
wg := new(sync.WaitGroup)
var rows uint64
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
defer wg.Done()
g.shards[idx].data.Range(func(k, v interface{}) bool {
select {
case <-c.Done():
return false
default:
d, ok := v.(*value)
if ok {
if !d.isValid() {
g.expiration(k.(string))
atomic.AddUint64(&rows, 1)
}
result <- false
wg.Done()
return
return true
}
}(ctx)
return <-result
return false
}
})
}
wg.Wait()
ch <- atomic.LoadUint64(rows)
}()
return ch
}(ctx, i)
}
wg.Wait()
return atomic.LoadUint64(&rows)
}

// DeleteExpired deletes expired value from Gache it can be cancel using context
func DeleteExpired(ctx context.Context) <-chan uint64 {
func DeleteExpired(ctx context.Context) uint64 {
return instance.DeleteExpired(ctx)
}

// Foreach calls f sequentially for each key and value present in the Gache.
func (g *gache) Foreach(ctx context.Context, f func(string, interface{}, int64) bool) Gache {
wg := new(sync.WaitGroup)
for _, shard := range g.shards {
shard.data.Range(func(k, v interface{}) bool {
select {
case <-ctx.Done():
return false
default:
d, ok := v.(*value)
if ok {
if d.isValid() {
return f(k.(string), *d.val, d.expire)
for i := range g.shards {
wg.Add(1)
go func(c context.Context, idx int) {
defer wg.Done()
g.shards[idx].data.Range(func(k, v interface{}) bool {
select {
case <-c.Done():
return false
default:
d, ok := v.(*value)
if ok {
if d.isValid() {
return f(k.(string), *d.val, d.expire)
}
g.expiration(k.(string))
return true
}
g.expiration(k.(string))
return true
return false
}
return false
}
})
})
}(ctx, i)
}
wg.Wait()
return g
Expand Down
2 changes: 0 additions & 2 deletions gache_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func randStr(n int) string {
}

func BenchmarkGacheWithSmallDataset(b *testing.B) {
New()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -104,7 +103,6 @@ func BenchmarkGacheWithSmallDataset(b *testing.B) {
}

func BenchmarkGacheWithBigDataset(b *testing.B) {
New()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
Expand Down
8 changes: 4 additions & 4 deletions gache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func Test_gache_DeleteExpired(t *testing.T) {
name string
fields fields
args args
want <-chan uint64
want uint64
}{
// TODO: Add test cases.
}
Expand All @@ -787,7 +787,7 @@ func Test_gache_DeleteExpired(t *testing.T) {
expChan: tt.fields.expChan,
expGroup: tt.fields.expGroup,
}
if got := g.DeleteExpired(tt.args.ctx); !reflect.DeepEqual(got, tt.want) {
if got := g.DeleteExpired(tt.args.ctx); got != tt.want {
t.Errorf("gache.DeleteExpired() = %v, want %v", got, tt.want)
}
})
Expand All @@ -801,13 +801,13 @@ func TestDeleteExpired(t *testing.T) {
tests := []struct {
name string
args args
want <-chan uint64
want uint64
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := DeleteExpired(tt.args.ctx); !reflect.DeepEqual(got, tt.want) {
if got := DeleteExpired(tt.args.ctx); got != tt.want {
t.Errorf("DeleteExpired() = %v, want %v", got, tt.want)
}
})
Expand Down
Loading

0 comments on commit 2c02269

Please sign in to comment.