Skip to content

Commit

Permalink
Merge pull request #36670 from nvanbenschoten/backport19.1-36392
Browse files Browse the repository at this point in the history
release-19.1: storage: avoid copying marshalled RaftCommand when encoding
  • Loading branch information
nvanbenschoten authored Apr 9, 2019
2 parents c03c307 + cb94d64 commit 76e8e78
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 41 deletions.
38 changes: 21 additions & 17 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ func (r *Replica) submitProposalLocked(p *ProposalData) error {
}

func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
data, err := protoutil.Marshal(p.command)
cmdSize := p.command.Size()
data := make([]byte, raftCommandPrefixLen+cmdSize)
_, err := protoutil.MarshalToWithoutFuzzing(p.command, data[raftCommandPrefixLen:])
if err != nil {
return err
}
Expand All @@ -369,7 +371,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
RaftCommand.ReplicatedEvalResult: %d
RaftCommand.ReplicatedEvalResult.Delta: %d
RaftCommand.WriteBatch: %d
`, p.Request.Summary(), len(data),
`, p.Request.Summary(), cmdSize,
p.command.ProposerReplica.Size(),
p.command.ReplicatedEvalResult.Size(),
p.command.ReplicatedEvalResult.Delta.Size(),
Expand All @@ -383,7 +385,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
// blips or worse, and it's good to be able to pick them from traces.
//
// TODO(tschottdorf): can we mark them so lightstep can group them?
if size := len(data); size > largeProposalEventThresholdBytes {
if size := cmdSize; size > largeProposalEventThresholdBytes {
log.Eventf(p.ctx, "proposal is large: %s", humanizeutil.IBytes(int64(size)))
}

Expand All @@ -406,7 +408,7 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {

confChangeCtx := ConfChangeContext{
CommandID: string(p.idKey),
Payload: data,
Payload: data[raftCommandPrefixLen:], // chop off prefix
Replica: crt.Replica,
}
encodedCtx, err := protoutil.Marshal(&confChangeCtx)
Expand All @@ -427,24 +429,26 @@ func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
})
}

return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
encode := encodeRaftCommandV1
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return false, errors.New("cannot sideload empty SSTable")
}
encode = encodeRaftCommandV2
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
}

if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
encodingVersion := raftVersionStandard
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return errors.New("cannot sideload empty SSTable")
}
encodingVersion = raftVersionSideloaded
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], encodingVersion, p.idKey)

return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
// We're proposing a command so there is no need to wake the leader if
// we're quiesced.
r.unquiesceLocked()
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(encode(p.idKey, data))
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(data)
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (r *Replica) unquiesceAndWakeLeaderLocked() {
r.store.unquiescedReplicas.Unlock()
r.maybeCampaignOnWakeLocked(ctx)
// Propose an empty command which will wake the leader.
_ = r.mu.internalRaftGroup.Propose(encodeRaftCommandV1(makeIDKey(), nil))
data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil)
_ = r.mu.internalRaftGroup.Propose(data)
}
}

Expand Down
35 changes: 18 additions & 17 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,6 @@ type raftCommandEncodingVersion byte
//
// TODO(bdarnell): is this commandID still appropriate for our needs?
const (
// The prescribed length for each command ID.
raftCommandIDLen = 8
// The initial Raft command version, used for all regular Raft traffic.
raftVersionStandard raftCommandEncodingVersion = 0
// A proposal containing an SSTable which preferably should be sideloaded
Expand All @@ -1040,6 +1038,10 @@ const (
// Raft log it necessary to inline the payload first as it has usually
// been sideloaded.
raftVersionSideloaded raftCommandEncodingVersion = 1
// The prescribed length for each command ID.
raftCommandIDLen = 8
// The prescribed length of each encoded command's prefix.
raftCommandPrefixLen = 1 + raftCommandIDLen
// The no-split bit is now unused, but we still apply the mask to the first
// byte of the command for backward compatibility.
//
Expand All @@ -1048,27 +1050,26 @@ const (
raftCommandNoSplitMask = raftCommandNoSplitBit - 1
)

func encodeRaftCommandV1(commandID storagebase.CmdIDKey, command []byte) []byte {
return encodeRaftCommand(raftVersionStandard, commandID, command)
}

func encodeRaftCommandV2(commandID storagebase.CmdIDKey, command []byte) []byte {
return encodeRaftCommand(raftVersionSideloaded, commandID, command)
}

// encode a command ID, an encoded storagebase.RaftCommand, and
// whether the command contains a split.
func encodeRaftCommand(
version raftCommandEncodingVersion, commandID storagebase.CmdIDKey, command []byte,
) []byte {
b := make([]byte, raftCommandPrefixLen+len(command))
encodeRaftCommandPrefix(b[:raftCommandPrefixLen], version, commandID)
copy(b[raftCommandPrefixLen:], command)
return b
}

func encodeRaftCommandPrefix(
b []byte, version raftCommandEncodingVersion, commandID storagebase.CmdIDKey,
) {
if len(commandID) != raftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), raftCommandIDLen))
}
x := make([]byte, 1, 1+raftCommandIDLen+len(command))
x[0] = byte(version)
x = append(x, []byte(commandID)...)
x = append(x, command...)
return x
if len(b) != raftCommandPrefixLen {
panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), raftCommandPrefixLen))
}
b[0] = byte(version)
copy(b[1:], []byte(commandID))
}

// DecodeRaftCommand splits a raftpb.Entry.Data into its commandID and
Expand Down
13 changes: 8 additions & 5 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ func maybeSideloadEntriesImpl(
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil

{
var err error
data, err = protoutil.Marshal(&strippedCmd)
data = make([]byte, raftCommandPrefixLen+strippedCmd.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&strippedCmd, data[raftCommandPrefixLen:])
if err != nil {
return nil, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
}
ent.Data = data
}

ent.Data = encodeRaftCommandV2(cmdID, data)
log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.Put(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
Expand Down Expand Up @@ -226,11 +227,13 @@ func maybeInlineSideloadedRaftCommand(
}
command.ReplicatedEvalResult.AddSSTable.Data = sideloadedData
{
data, err := protoutil.Marshal(&command)
data := make([]byte, raftCommandPrefixLen+command.Size())
encodeRaftCommandPrefix(data[:raftCommandPrefixLen], raftVersionSideloaded, cmdID)
_, err := protoutil.MarshalToWithoutFuzzing(&command, data[raftCommandPrefixLen:])
if err != nil {
return nil, err
}
ent.Data = encodeRaftCommandV2(cmdID, data)
ent.Data = data
}
return &ent, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/split_delay_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
_ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) {
// NB: intentionally ignore the error (which can be ErrProposalDropped
// when there's an SST inflight).
_ = rawNode.Propose(encodeRaftCommandV1(makeIDKey(), nil))
data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil)
_ = rawNode.Propose(data)
// NB: we need to unquiesce as the group might be quiesced.
return true /* unquiesceAndWakeLeader */, nil
})
Expand Down

0 comments on commit 76e8e78

Please sign in to comment.