Skip to content

Commit

Permalink
Merge pull request nsqio#100 from youzan/add-more-grpc
Browse files Browse the repository at this point in the history
optimize commit log and disk queue read
  • Loading branch information
absolute8511 authored Aug 21, 2019
2 parents d9a508c + 2c1eca3 commit 2ecb5ca
Show file tree
Hide file tree
Showing 15 changed files with 3,363 additions and 461 deletions.
13 changes: 13 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 13 additions & 17 deletions consistence/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ var LOGROTATE_NUM = 500000
var MIN_KEEP_LOG_ITEM = 1000

var bp sync.Pool
var emptyLogData CommitLogData
var logDataSize int

func init() {
bp.New = func() interface{} {
return &bytes.Buffer{}
}
logDataSize = binary.Size(emptyLogData)
}

func bufferPoolGet() *bytes.Buffer {
Expand All @@ -73,10 +76,8 @@ type CommitLogData struct {
MsgNum int32
}

var emptyLogData CommitLogData

func GetLogDataSize() int {
return binary.Size(emptyLogData)
return logDataSize
}

func GetPrevLogOffset(cur int64) int64 {
Expand Down Expand Up @@ -121,26 +122,21 @@ func getCommitLogCountFromFile(path string, start int64) (int64, error) {
}

func getCommitLogFromFile(file *os.File, offset int64) (*CommitLogData, error) {
f, err := file.Stat()
if err != nil {
return nil, err
}
fsize := f.Size()
if offset == fsize {
return nil, ErrCommitLogEOF
}

if offset > fsize-int64(GetLogDataSize()) {
return nil, ErrCommitLogOutofBound
}

if (offset % int64(GetLogDataSize())) != 0 {
return nil, ErrCommitLogOffsetInvalid
}
b := bytes.NewBuffer(make([]byte, GetLogDataSize()))
n, err := file.ReadAt(b.Bytes(), offset)
if err != nil {
return nil, err
if err == io.EOF {
if n == 0 {
return nil, ErrCommitLogEOF
} else if n < int(GetLogDataSize()) {
return nil, ErrCommitLogOutofBound
}
} else {
return nil, err
}
}
if n != GetLogDataSize() {
return nil, ErrCommitLogOffsetInvalid
Expand Down
108 changes: 89 additions & 19 deletions consistence/coord_grpc_server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package consistence

import (
"net"
"os"
"time"

pb "github.com/youzan/nsq/consistence/coordgrpc"
"github.com/youzan/nsq/internal/levellogger"
"github.com/youzan/nsq/nsqd"
"golang.org/x/net/context"
"google.golang.org/grpc"
"net"
"os"
"time"
)

// grpc is not used anymore
Expand Down Expand Up @@ -80,7 +81,16 @@ func (s *nsqdCoordGRpcServer) UpdateChannelOffset(ctx context.Context, req *pb.R
var chOffset ChannelConsumerOffset
chOffset.Flush = req.ChannelOffset.Flush
chOffset.VOffset = req.ChannelOffset.Voffset
chOffset.VCnt = req.ChannelOffset.Vcnt
chOffset.AllowBackward = req.ChannelOffset.AllowBackward
chOffset.NeedUpdateConfirmed = req.ChannelOffset.NeedUpdateConfirmed
for _, interval := range req.ChannelOffset.ConfirmedIntervals {
chOffset.ConfirmedInterval = append(chOffset.ConfirmedInterval, nsqd.MsgQueueInterval{
Start: interval.Start,
End: interval.End,
EndCnt: interval.EndCnt,
})
}
err = s.nsqdCoord.updateChannelOffsetOnSlave(tc.GetData(), req.Channel, chOffset)
if err != nil {
coordErr.ErrMsg = err.ErrMsg
Expand Down Expand Up @@ -110,14 +120,7 @@ func (s *nsqdCoordGRpcServer) PutMessage(ctx context.Context, req *pb.RpcPutMess
return &coordErr, nil
}
// do local pub message
var commitData CommitLogData
commitData.Epoch = EpochType(req.LogData.Epoch)
commitData.LogID = req.LogData.LogID
commitData.MsgNum = req.LogData.MsgNum
commitData.MsgCnt = req.LogData.MsgCnt
commitData.MsgSize = req.LogData.MsgSize
commitData.MsgOffset = req.LogData.MsgOffset
commitData.LastMsgLogID = req.LogData.LastMsgLogID
commitData := fromPbCommitLogData(req.LogData)
var msg nsqd.Message
msg.ID = nsqd.MessageID(req.TopicMessage.ID)
msg.TraceID = req.TopicMessage.Trace_ID
Expand Down Expand Up @@ -154,14 +157,7 @@ func (s *nsqdCoordGRpcServer) PutMessages(ctx context.Context, req *pb.RpcPutMes
return &coordErr, nil
}
// do local pub message
var commitData CommitLogData
commitData.Epoch = EpochType(req.LogData.Epoch)
commitData.LogID = req.LogData.LogID
commitData.MsgNum = req.LogData.MsgNum
commitData.MsgCnt = req.LogData.MsgCnt
commitData.MsgSize = req.LogData.MsgSize
commitData.MsgOffset = req.LogData.MsgOffset
commitData.LastMsgLogID = req.LogData.LastMsgLogID
commitData := fromPbCommitLogData(req.LogData)
var msgs []*nsqd.Message
for _, pbm := range req.TopicMessage {
var msg nsqd.Message
Expand All @@ -181,3 +177,77 @@ func (s *nsqdCoordGRpcServer) PutMessages(ctx context.Context, req *pb.RpcPutMes
}
return &coordErr, nil
}

func (s *nsqdCoordGRpcServer) PullCommitLogsAndData(ctx context.Context, req *pb.PullCommitLogsReq) (*pb.PullCommitLogsRsp, error) {
rsp := &pb.PullCommitLogsRsp{}
rreq := fromPbPullCommitLogsReq(req)
rrsp, err := s.nsqdCoord.pullCommitLogsAndData(rreq, false)
if err != nil {
return rsp, err
}
rsp = toPbPullCommitLogRsp(rrsp)
return rsp, nil
}

func toPbCommitLogData(l CommitLogData) pb.CommitLogData {
var commitData pb.CommitLogData
commitData.Epoch = int64(l.Epoch)
commitData.LogID = l.LogID
commitData.MsgNum = l.MsgNum
commitData.MsgCnt = l.MsgCnt
commitData.MsgSize = l.MsgSize
commitData.MsgOffset = l.MsgOffset
commitData.LastMsgLogID = l.LastMsgLogID
return commitData
}

func fromPbCommitLogData(l *pb.CommitLogData) CommitLogData {
var commitData CommitLogData
commitData.Epoch = EpochType(l.Epoch)
commitData.LogID = l.LogID
commitData.MsgNum = l.MsgNum
commitData.MsgCnt = l.MsgCnt
commitData.MsgSize = l.MsgSize
commitData.MsgOffset = l.MsgOffset
commitData.LastMsgLogID = l.LastMsgLogID
return commitData
}

func toPbPullCommitLogRsp(rrsp *RpcPullCommitLogsRsp) *pb.PullCommitLogsRsp {
rsp := &pb.PullCommitLogsRsp{}
rsp.DataList = rrsp.DataList
rsp.Logs = make([]pb.CommitLogData, 0, len(rrsp.Logs))
for _, l := range rrsp.Logs {
rsp.Logs = append(rsp.Logs, toPbCommitLogData(l))
}
return rsp
}

func fromPbPullCommitLogsReq(req *pb.PullCommitLogsReq) *RpcPullCommitLogsReq {
rreq := &RpcPullCommitLogsReq{
StartLogOffset: req.StartLogOffset,
LogMaxNum: int(req.LogMaxNum),
StartIndexCnt: req.StartIndexCnt,
LogCountNumIndex: req.LogCountNumIndex,
UseCountIndex: req.UseCountIndex,
}
rreq.TopicName = req.TopicData.TopicName
rreq.TopicPartition = int(req.TopicData.TopicPartition)
rreq.Epoch = EpochType(req.TopicData.Epoch)
rreq.TopicWriteEpoch = EpochType(req.TopicData.TopicWriteEpoch)
rreq.TopicLeaderSessionEpoch = EpochType(req.TopicData.TopicLeaderSessionEpoch)
rreq.TopicLeaderSession = req.TopicData.TopicLeaderSession
rreq.TopicLeader = req.TopicData.TopicLeader
return rreq
}

func (s *nsqdCoordGRpcServer) PullDelayedQueueCommitLogsAndData(ctx context.Context, req *pb.PullCommitLogsReq) (*pb.PullCommitLogsRsp, error) {
rsp := &pb.PullCommitLogsRsp{}
rreq := fromPbPullCommitLogsReq(req)
rrsp, err := s.nsqdCoord.pullCommitLogsAndData(rreq, true)
if err != nil {
return rsp, err
}
rsp = toPbPullCommitLogRsp(rrsp)
return rsp, nil
}
Loading

0 comments on commit 2ecb5ca

Please sign in to comment.