Skip to content

Commit

Permalink
perf(metarepos): add a pool for *mrpb.RaftEntry (#536)
Browse files Browse the repository at this point in the history
### What this PR does

This change adds the pool for *mrpb.RaftEntry to reuse it.
  • Loading branch information
ijsong authored Jul 28, 2023
2 parents 16b2181 + be9f121 commit 96ab5e2
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 43 deletions.
103 changes: 60 additions & 43 deletions internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,16 @@ Loop:
b, err := e.Marshal()
if err != nil {
mr.logger.Error(err.Error())
e.Release()
continue
}
// Release *mrpb.Reports only if e contains Reports.
e.GetRequest().Report.Release()
nodeIndex, requestIndex := e.NodeIndex, e.RequestIndex
e.Release()

select {
case mr.rnProposeC <- b:
case <-ctx.Done():
mr.sendAck(e.NodeIndex, e.RequestIndex, ctx.Err())
mr.sendAck(nodeIndex, requestIndex, ctx.Err())
}
case <-ctx.Done():
break Loop
Expand Down Expand Up @@ -1150,53 +1151,69 @@ func (mr *RaftMetadataRepository) proposeReport(snID types.StorageNodeID, ur []s
return nil
}

func (mr *RaftMetadataRepository) propose(ctx context.Context, r interface{}, guarantee bool) error {
e := &mrpb.RaftEntry{}
e.Request.SetValue(r)
e.NodeIndex = uint64(mr.nodeID)
e.RequestIndex = UnusedRequestIndex
func (mr *RaftMetadataRepository) leaseRaftEntry(request any, requestIndex uint64) *mrpb.RaftEntry {
re := mrpb.NewRaftEntry()
re.Request.SetValue(request)
re.NodeIndex = uint64(mr.nodeID)
re.RequestIndex = requestIndex
return re
}

if guarantee {
c := make(chan error, 1)
rIdx := mr.requestNum.Add(1)
func (mr *RaftMetadataRepository) proposeWithoutGuarantee(ctx context.Context, request any) (err error) {
re := mr.leaseRaftEntry(request, UnusedRequestIndex)
select {
case mr.proposeC <- re:
return nil
case <-ctx.Done():
err = ctx.Err()
default:
err = verrors.ErrIgnore
}
if err != nil {
re.Release()
}
return err
}

e.RequestIndex = rIdx
mr.requestMap.Store(rIdx, c)
defer mr.requestMap.Delete(rIdx)
func (mr *RaftMetadataRepository) proposeWithGuarantee(ctx context.Context, request any) error {
rIdx := mr.requestNum.Add(1)
c := make(chan error, 1)
mr.requestMap.Store(rIdx, c)
defer mr.requestMap.Delete(rIdx)

t := time.NewTimer(mr.raftProposeTimeout)
defer t.Stop()
t := time.NewTimer(mr.raftProposeTimeout)
defer t.Stop()

PROPOSE:
select {
case mr.proposeC <- e:
case <-t.C:
t.Reset(mr.raftProposeTimeout)
goto PROPOSE
case <-ctx.Done():
return ctx.Err()
}
re := mr.leaseRaftEntry(request, rIdx)

select {
case err := <-c:
return err
case <-t.C:
t.Reset(mr.raftProposeTimeout)
goto PROPOSE
case <-ctx.Done():
return ctx.Err()
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case mr.proposeC <- e:
default:
return verrors.ErrIgnore
}
Propose:
select {
case mr.proposeC <- re:
case <-t.C:
t.Reset(mr.raftProposeTimeout)
goto Propose
case <-ctx.Done():
re.Release()
return ctx.Err()
}

return nil
select {
case err := <-c:
return err
case <-t.C:
t.Reset(mr.raftProposeTimeout)
re = mr.leaseRaftEntry(request, rIdx)
goto Propose
case <-ctx.Done():
return ctx.Err()
}
}

func (mr *RaftMetadataRepository) propose(ctx context.Context, request interface{}, guarantee bool) error {
if !guarantee {
return mr.proposeWithoutGuarantee(ctx, request)
}
return mr.proposeWithGuarantee(ctx, request)
}

func (mr *RaftMetadataRepository) proposeConfChange(ctx context.Context, r raftpb.ConfChange) error {
Expand Down
18 changes: 18 additions & 0 deletions proto/mrpb/raft_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,21 @@ func (rq *ReportQueue) Release() {
reportQueuePool.Put(rq)
}
}

var raftEntryPool = sync.Pool{
New: func() any {
return &RaftEntry{}
},
}

func NewRaftEntry() *RaftEntry {
return raftEntryPool.Get().(*RaftEntry)
}

func (re *RaftEntry) Release() {
if re != nil {
re.Request.Report.Release()
re.Reset()
raftEntryPool.Put(re)
}
}

0 comments on commit 96ab5e2

Please sign in to comment.