Skip to content

Commit

Permalink
Batch slashing protection registration (#5604)
Browse files Browse the repository at this point in the history
This PR brings down the time to send 100 attestations from ~1s to
~100ms, making it feasible to run 10k validators on a single node (which
regularly send 300 attestations / slot).

This is done by batching the slashing protection database write in a
single transaction thus avoiding a slow fsync for every signature -
effects will be more pronounced on slow drives.

The benefit applies both to beacon and client validators.
  • Loading branch information
arnetheduck authored Nov 19, 2023
1 parent d8144c6 commit e1e809e
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 174 deletions.
182 changes: 98 additions & 84 deletions beacon_chain/validator_client/attestation_service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import
std/sets,
chronicles,
../validators/activity_metrics,
../validators/[activity_metrics, validator_duties],
"."/[common, api]

const
Expand All @@ -22,42 +22,22 @@ type
selection_proof: ValidatorSig
validator: AttachedValidator

proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
duty: DutyAndProof): Future[bool] {.async.} =
let vc = service.client
let validator = vc.getValidatorForDuties(
duty.data.pubkey, adata.slot).valueOr:
return false
let fork = vc.forkAtEpoch(adata.slot.epoch)

doAssert(validator.index.isSome())
let vindex = validator.index.get()
proc serveAttestation(
service: AttestationServiceRef, registered: RegisteredAttestation):
Future[bool] {.async.} =
let
vc = service.client
fork = vc.forkAtEpoch(registered.data.slot.epoch)
validator = registered.validator

logScope:
validator = validatorLog(validator)

# TODO: signing_root is recomputed in getAttestationSignature just after,
# but not for locally attached validators.
let signingRoot =
compute_attestation_signing_root(
fork, vc.beaconGenesis.genesis_validators_root, adata)

let notSlashable = vc.attachedValidators[].slashingProtection
.registerAttestation(vindex, validator.pubkey,
adata.source.epoch,
adata.target.epoch, signingRoot)
if notSlashable.isErr():
warn "Slashing protection activated for attestation",
attestationData = shortLog(adata),
signingRoot = shortLog(signingRoot),
badVoteDetails = $notSlashable.error
return false

let attestation = block:
let signature =
try:
let res = await validator.getAttestationSignature(
fork, vc.beaconGenesis.genesis_validators_root, adata)
fork, vc.beaconGenesis.genesis_validators_root, registered.data)
if res.isErr():
warn "Unable to sign attestation", reason = res.error()
return false
Expand All @@ -69,15 +49,11 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
error "An unexpected error occurred while signing attestation",
err_name = exc.name, err_msg = exc.msg
return false

Attestation.init(
[duty.data.validator_committee_index],
int(duty.data.committee_length), adata, signature).expect(
"data validity checked earlier")
registered.toAttestation(signature)

logScope:
attestation = shortLog(attestation)
delay = vc.getDelay(adata.slot.attestation_deadline())
delay = vc.getDelay(registered.data.slot.attestation_deadline())

debug "Sending attestation"

Expand All @@ -98,7 +74,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
return false

if res:
let delay = vc.getDelay(adata.slot.attestation_deadline())
let delay = vc.getDelay(attestation.data.slot.attestation_deadline())
beacon_attestations_sent.inc()
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
notice "Attestation published"
Expand Down Expand Up @@ -176,57 +152,94 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
): Future[AttestationData] {.
async.} =
doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int)))
let vc = service.client
let
vc = service.client
fork = vc.forkAtEpoch(slot.epoch)

# This call could raise ValidatorApiError, but it is handled in
# publishAttestationsAndAggregates().
let ad = await vc.produceAttestationData(slot, committee_index,
ApiStrategyKind.Best)

let pendingAttestations =
block:
var res: seq[Future[bool]]
for duty in duties:
debug "Serving attestation duty", duty = duty.data, epoch = slot.epoch()
if (duty.data.slot != ad.slot) or
(uint64(duty.data.committee_index) != ad.index):
warn "Inconsistent validator duties during attestation signing",
validator = shortLog(duty.data.pubkey),
duty_slot = duty.data.slot,
duty_index = duty.data.committee_index,
attestation_slot = ad.slot, attestation_index = ad.index
continue
res.add(service.serveAttestation(ad, duty))
res

let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingAttestations)
except CancelledError as exc:
let pending = pendingAttestations
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc

for future in pendingAttestations:
if future.completed():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let data = await vc.produceAttestationData(slot, committee_index,
ApiStrategyKind.Best)

let registeredRes = vc.attachedValidators[].slashingProtection.withContext:
var tmp: seq[RegisteredAttestation]
for duty in duties:
if (duty.data.slot != data.slot) or
(uint64(duty.data.committee_index) != data.index):
warn "Inconsistent validator duties during attestation signing",
validator = shortLog(duty.data.pubkey),
duty_slot = duty.data.slot,
duty_index = duty.data.committee_index,
attestation_slot = data.slot, attestation_index = data.index
continue

let validator = vc.getValidatorForDuties(
duty.data.pubkey, duty.data.slot).valueOr:
continue

doAssert(validator.index.isSome())
let validator_index = validator.index.get()

logScope:
validator = validatorLog(validator)

# TODO: signing_root is recomputed in getAttestationSignature just after,
# but not for locally attached validators.
let
signingRoot = compute_attestation_signing_root(
fork, vc.beaconGenesis.genesis_validators_root, data)
registered = registerAttestationInContext(
validator_index, validator.pubkey, data.source.epoch,
data.target.epoch, signingRoot)
if registered.isErr():
warn "Slashing protection activated for attestation",
attestationData = shortLog(data),
signingRoot = shortLog(signingRoot),
badVoteDetails = $registered.error()
continue

tmp.add(RegisteredAttestation(
validator: validator,
index_in_committee: duty.data.validator_committee_index,
committee_len: int duty.data.committee_length,
data: data
))
tmp

if registeredRes.isErr():
warn "Could not update slashing database, skipping attestation duties",
error = registeredRes.error()
else:
let
pendingAttestations = registeredRes[].mapIt(service.serveAttestation(it))
statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingAttestations)
except CancelledError as exc:
let pending = pendingAttestations
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc

for future in pendingAttestations:
if future.completed():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)

let delay = vc.getDelay(slot.attestation_deadline())
debug "Attestation statistics", total = len(pendingAttestations),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
committee_index = committee_index, duties_count = len(duties)
let delay = vc.getDelay(slot.attestation_deadline())
debug "Attestation statistics", total = len(pendingAttestations),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
committee_index = committee_index, duties_count = len(duties)

return ad
return data

proc produceAndPublishAggregates(service: AttestationServiceRef,
adata: AttestationData,
Expand Down Expand Up @@ -329,8 +342,6 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
committee_index: CommitteeIndex,
duties: seq[DutyAndProof]) {.async.} =
let vc = service.client
# Waiting for blocks to be published before attesting.
await vc.waitForBlock(slot, attestationSlotOffset)

block:
let delay = vc.getDelay(slot.attestation_deadline())
Expand Down Expand Up @@ -378,6 +389,9 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
res.mgetOrPut(item.data.committee_index, default).add(item)
res

# Waiting for blocks to be published before attesting.
await vc.waitForBlock(slot, attestationSlotOffset)

var tasks: seq[Future[void]]
try:
for index, duties in dutiesByCommittee:
Expand Down
104 changes: 56 additions & 48 deletions beacon_chain/validators/beacon_validators.nim
Original file line number Diff line number Diff line change
Expand Up @@ -331,27 +331,23 @@ proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async.} =
proc createAndSendAttestation(node: BeaconNode,
fork: Fork,
genesis_validators_root: Eth2Digest,
validator: AttachedValidator,
data: AttestationData,
committeeLen: int,
indexInCommittee: int,
registered: RegisteredAttestation,
subnet_id: SubnetId) {.async.} =
try:
let
signature = block:
let res = await validator.getAttestationSignature(
fork, genesis_validators_root, data)
let res = await registered.validator.getAttestationSignature(
fork, genesis_validators_root, registered.data)
if res.isErr():
warn "Unable to sign attestation", validator = shortLog(validator),
attestationData = shortLog(data), error_msg = res.error()
warn "Unable to sign attestation",
validator = shortLog(registered.validator),
attestationData = shortLog(registered.data),
error_msg = res.error()
return
res.get()
attestation =
Attestation.init(
[uint64 indexInCommittee], committeeLen, data, signature).expect(
"valid data")
attestation = registered.toAttestation(signature)

validator.doppelgangerActivity(attestation.data.slot.epoch)
registered.validator.doppelgangerActivity(attestation.data.slot.epoch)

# Logged in the router
let res = await node.router.routeAttestation(
Expand All @@ -360,7 +356,9 @@ proc createAndSendAttestation(node: BeaconNode,
return

if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubkey)
dump(
node.config.dumpDirOutgoing, attestation.data,
registered.validator.pubkey)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
Expand Down Expand Up @@ -566,8 +564,8 @@ proc makeBeaconBlockForHeadAndSlot*(
PayloadType: type ForkyExecutionPayloadForSigning, node: BeaconNode, randao_reveal: ValidatorSig,
validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef,
slot: Slot):
Future[ForkedBlockResult] {.async.} =
return await makeBeaconBlockForHeadAndSlot(
Future[ForkedBlockResult] =
return makeBeaconBlockForHeadAndSlot(
PayloadType, node, randao_reveal, validator_index, graffiti, head, slot,
execution_payload = Opt.none(PayloadType),
transactions_root = Opt.none(Eth2Digest),
Expand Down Expand Up @@ -1245,41 +1243,51 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
committees_per_slot = get_committee_count_per_slot(epochRef.shufflingRef)
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
registeredRes = node.attachedValidators.slashingProtection.withContext:
var tmp: seq[(RegisteredAttestation, SubnetId)]

for committee_index in get_committee_indices(committees_per_slot):
let committee = get_beacon_committee(
epochRef.shufflingRef, slot, committee_index)
for committee_index in get_committee_indices(committees_per_slot):
let
committee = get_beacon_committee(
epochRef.shufflingRef, slot, committee_index)
subnet_id = compute_subnet_for_attestation(
committees_per_slot, slot, committee_index)

for index_in_committee, validator_index in committee:
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
continue
for index_in_committee, validator_index in committee:
let validator = node.getValidatorForDuties(validator_index, slot).valueOr:
continue

let
data = makeAttestationData(epochRef, attestationHead, committee_index)
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
signingRoot = compute_attestation_signing_root(
fork, genesis_validators_root, data)
registered = node.attachedValidators
.slashingProtection
.registerAttestation(
validator_index,
validator.pubkey,
data.source.epoch,
data.target.epoch,
signingRoot)
if registered.isOk():
let subnet_id = compute_subnet_for_attestation(
committees_per_slot, data.slot, committee_index)
asyncSpawn createAndSendAttestation(
node, fork, genesis_validators_root, validator, data,
committee.len(), index_in_committee, subnet_id)
else:
warn "Slashing protection activated for attestation",
attestationData = shortLog(data),
signingRoot = shortLog(signingRoot),
validator_index,
validator = shortLog(validator),
badVoteDetails = $registered.error()
let
data = makeAttestationData(epochRef, attestationHead, committee_index)
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
signingRoot = compute_attestation_signing_root(
fork, genesis_validators_root, data)
registered = registerAttestationInContext(
validator_index, validator.pubkey, data.source.epoch,
data.target.epoch, signingRoot)
if registered.isErr():
warn "Slashing protection activated for attestation",
attestationData = shortLog(data),
signingRoot = shortLog(signingRoot),
validator_index,
validator = shortLog(validator),
badVoteDetails = $registered.error()
continue

tmp.add((RegisteredAttestation(
validator: validator,
index_in_committee: uint64 index_in_committee,
committee_len: committee.len(), data: data), subnet_id
))
tmp

if registeredRes.isErr():
warn "Could not update slashing database, skipping attestation duties",
error = registeredRes.error()
else:
for attestation in registeredRes[]:
asyncSpawn createAndSendAttestation(
node, fork, genesis_validators_root, attestation[0], attestation[1])

proc createAndSendSyncCommitteeMessage(node: BeaconNode,
validator: AttachedValidator,
Expand Down
Loading

0 comments on commit e1e809e

Please sign in to comment.