Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip sub attestation subnet scoring #3029

Draft
wants to merge 7 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 205 additions & 10 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1865,17 +1865,17 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
historyGossip: 3,
fanoutTTL: 60.seconds,
seenTTL: 385.seconds,
gossipThreshold: -4000,
publishThreshold: -8000,
graylistThreshold: -16000, # also disconnect threshold
opportunisticGraftThreshold: 0,
decayInterval: 12.seconds,
gossipThreshold: -40,
publishThreshold: -80,
graylistThreshold: -160, # also disconnect threshold
opportunisticGraftThreshold: 1,
decayInterval: 12.seconds, #TODO this is not used in libp2p
decayToZero: 0.01,
retainScore: 385.seconds,
appSpecificWeight: 0.0,
ipColocationFactorWeight: -53.75,
ipColocationFactorWeight: -5,
ipColocationFactorThreshold: 3.0,
behaviourPenaltyWeight: -15.9,
behaviourPenaltyWeight: -5,
behaviourPenaltyDecay: 0.986,
disconnectBadPeers: true,
directPeers:
Expand Down Expand Up @@ -1920,13 +1920,208 @@ proc announcedENR*(node: Eth2Node): enr.Record =
proc shortForm*(id: NetKeyPair): string =
$PeerID.init(id.pubkey)


# Gossipsub scoring explained:
# A score of a peer in a topic is the sum of 5 different scores:
#
# `timeInMesh`: for each `Quantum` spent in a mesh,
# the peer will win `Weight` points, up to `(Cap * Weight)`
#
# Every following score decays: score is multiplied by `Decay` in every heartbeat
# until they reach `decayToZero` (0.1 by default)
#
# `firstMessageDelivery`: for each message delivered first,
# the peer will win `Weight` points, up to `(Cap * Weight)`
#
# `meshMessageDeliveries`: The most convoluted way possible to punish
# peers not sending enough traffic in a topic.
#
# For each message (duplicate or first) received in a topic, the score is incremented, up to `Cap`.
# If the score of the topic gets below `Threshold`, the peer
# since at least `Activation` time will have: `score += (Threshold - Score)² * Weight`
# (`Weight` should be negative to punish them)
#
# `meshFailurePenalty`: same as meshMessageDeliveries, but only happens on prune
# to avoid peers constantly unsubbing-resubbing
#
# `invalidMessageDeliveries`: for each message not passing validation received, a peer
# score is incremented. Final score = `Score * Score * Weight`
#
#
# Once we have the 5 scores for each peer/topic, we sum them up per peer
# using the topicWeight of each topic.
#
# Nimbus strategy:
# Trying to get a 100 possible points in each topic before weighting
# And then, weight each topic to have 100 points max total
#
# In order of priority:
# - A badly behaving peer (eg, sending bad messages) will be heavily sanctionned
# - A good peer will gain good scores
# - Inactive/slow peers will be mildly sanctionned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# - Inactive/slow peers will be mildly sanctionned.
# - Inactive/slow peers will be mildly sanctioned.

#
# Since a "slow" topic will punish everyone in it, we don't want to punish
# good peers which are unlucky and part of a slow topic. So, we give more points to
# good peers than we remove to slow peers
#
# Global topics are good to check stability of peers, but since we can only
# have ~20 peers in global topics, we need to score on subnets to have data
# about as much peers as possible, even the subnets are less stable
func computeDecay(
startValue: float,
endValue: float,
timeToEndValue: Duration,
heartbeatTime: Duration
): float =
# startValue will to to endValue in timeToEndValue
# given the returned decay

let heartbeatsToZero = timeToEndValue.milliseconds.float / heartbeatTime.milliseconds.float
pow(endValue / startValue, 1 / heartbeatsToZero)

func computeMessageDeliveriesWeight(
messagesThreshold: float,
maxLostPoints: float): float =

let maxDeficit = messagesThreshold
-maxLostPoints / (maxDeficit * maxDeficit)

type TopicType* = enum
BlockTopic,
AggregateTopic,
SubnetTopic,
OtherTopic

proc getTopicParams(
topicWeight: float,
heartbeatPeriod: Duration,
period: Duration,
averageOverNPeriods: float,
peersPerTopic: int,
expectedMessagesPerPeriod: int,
timeInMeshQuantum: Duration
): TopicParams =

let
# Statistically, a peer will be first for every `receivedMessage / d`
shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic
shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods

# A peer being first in 1/d% messages will reach a score of
# `shouldSendOverNPeriod`
firstMessageDecay =
computeDecay(
startValue = shouldBeFirstOverNPeriod,
endValue = 0.1,
timeToEndValue = period * averageOverNPeriods.int * 2,
heartbeatPeriod)

# Start to remove up to 30 points when peer send less
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can bring a peer into negative score territory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still tweaking this, but my intuition is that it should be
FirstMessageDeliveriesBonus > InactivePenalty > TimeInMeshBonus

Currently, it's 80 > -30 > 20
A perfect peer would have 100 per topic. If it suddenly becomes inactive, he would gradually (in averageOverNPeriods) drop to -10 (InactivePenalty + TimeInMeshBonus), so it would be negative, meaning you can't grind for points, then become inactive.

So yeah, being inactive could bring to negative score, if you don't do good things aside. Now, what we do when they are into negative score is a different question. As the moment, this PR is fairly conservative on actions (to reach -40 you would need to be inactive on a lot of topics, or more probably, send a lot of invalid messages), since the primary goal as of now is to restore the opportunistic grafting, which helps a lot with inactive peers.

ATM i'm not very happy on the topicWeight computation, need to rework that

# than half message than expected
shouldSendAtLeastPerPeriod = expectedMessagesPerPeriod / 2
shouldSendAtLeastOverNPeriod = shouldSendAtLeastPerPeriod * averageOverNPeriods

messageDeliveryThreshold = shouldSendAtLeastOverNPeriod
messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0)
messageDeliveryDecay =
computeDecay(
startValue = expectedMessagesPerPeriod.float * averageOverNPeriods,
endValue = 0,
timeToEndValue = period * averageOverNPeriods.int,
heartbeatPeriod)

# Invalid message should be remembered a long time
invalidMessageDecay = computeDecay(
startValue = 1,
endValue = 0.1,
timeToEndValue = chronos.minutes(1),
heartbeatPeriod)

let topicParams = TopicParams(
topicWeight: topicWeight,
timeInMeshWeight: 0.1,
timeInMeshQuantum: timeInMeshQuantum,
timeInMeshCap: 300, # 30 points after timeInMeshQuantum * 300
firstMessageDeliveriesWeight: 35.0 / shouldBeFirstOverNPeriod,
firstMessageDeliveriesDecay: firstMessageDecay,
firstMessageDeliveriesCap: shouldBeFirstOverNPeriod, # Max points: 70
meshMessageDeliveriesWeight: messageDeliveryWeight,
meshMessageDeliveriesDecay: messageDeliveryDecay,
meshMessageDeliveriesThreshold: messageDeliveryThreshold,
meshMessageDeliveriesCap: expectedMessagesPerPeriod.float * averageOverNPeriods,
meshMessageDeliveriesActivation: period * averageOverNPeriods.int,
meshMessageDeliveriesWindow: chronos.milliseconds(10),
meshFailurePenaltyWeight: messageDeliveryWeight,
meshFailurePenaltyDecay: messageDeliveryDecay,
invalidMessageDeliveriesWeight: -1, # 10 invalid messages = -100 points
invalidMessageDeliveriesDecay: invalidMessageDecay
)
topicParams

proc getTopicParams(
heartbeatPeriod: Duration,
slotPeriod: Duration,
peersPerTopic: int,
topicType: TopicType): TopicParams =
case topicType:
of BlockTopic:
getTopicParams(
topicWeight = 0.1,
heartbeatPeriod = heartbeatPeriod,
period = slotPeriod,
averageOverNPeriods = 10, # Average over 10 slots, to smooth missing proposals
peersPerTopic = peersPerTopic,
expectedMessagesPerPeriod = 1, # One proposal per slot
timeInMeshQuantum = chronos.seconds(15) # Stable topic
)
of AggregateTopic:
getTopicParams(
topicWeight = 0.1,
heartbeatPeriod = heartbeatPeriod,
period = slotPeriod,
averageOverNPeriods = 2,
peersPerTopic = peersPerTopic,
expectedMessagesPerPeriod = (TARGET_AGGREGATORS_PER_COMMITTEE * ATTESTATION_SUBNET_COUNT).int,
timeInMeshQuantum = chronos.seconds(15) # Stable topic
)
of SubnetTopic:
getTopicParams(
topicWeight = 0.8 / ATTESTATION_SUBNET_COUNT,
heartbeatPeriod = heartbeatPeriod,
period = slotPeriod,
averageOverNPeriods = ATTESTATION_SUBNET_COUNT, # Smooth out empty committees
peersPerTopic = peersPerTopic,
expectedMessagesPerPeriod = TARGET_COMMITTEE_SIZE.int, #TODO use current number
Copy link
Contributor Author

@Menduist Menduist Oct 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone has a ~1-liner to get the current average committee size (or number of active validators) from Eth2Node, I'll take it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have it in here (network doesn't know about dag), but it changes only once per epoch so onSlotEnd could update it by grabbing an getEpochRef for (head, wallslot) similar to how it sets up attestation subnets

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more to the point - we might need to update the scoring while nimbus is running - for example during sync, we'll go from 20k "known" validators to 250k

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, didn't think about sync.
But we only subscribe to gossip once synced, no?

My intuition was that the number grows/reduce slowly enough to avoid updating it OTF, but we can do it if needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't assume really that users restart nimbus regularly (they shouldn't have to) - ie there are still users that haven't upgraded since genesis

timeInMeshQuantum = chronos.seconds(6) # Flaky topic
)
of OtherTopic:
TopicParams.init()

static:
for topicType in ord(low(TopicType))..ord(high(TopicType)):
getTopicParams(
heartbeatPeriod = chronos.milliseconds(700),
slotPeriod = chronos.seconds(12),
peersPerTopic = 8,
TopicType(topicType)
).validateParameters().tryGet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateParameters should probably be replaced by a TopicParameters.init that returns a Result



proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams =
let
heartbeatPeriod = node.pubsub.parameters.heartbeatInterval
slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int)
peersPerTopic = node.pubsub.parameters.d

getTopicParams(heartbeatPeriod, slotPeriod, peersPerTopic, topicType)

proc subscribe*(
node: Eth2Node, topic: string, topicParams: TopicParams,
node: Eth2Node, topic: string, topicType: TopicType,
enableTopicMetrics: bool = false) =
if enableTopicMetrics:
node.pubsub.knownTopics.incl(topic)

node.pubsub.topicParams[topic] = topicParams
node.pubsub.topicParams[topic] = node.getTopicParams(topicType)

# Passing in `nil` because we do all message processing in the validator
node.pubsub.subscribe(topic, nil)
Expand Down Expand Up @@ -2042,7 +2237,7 @@ proc subscribeAttestationSubnets*(
for subnet_id, enabled in subnets:
if enabled:
node.subscribe(getAttestationTopic(
forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now
forkDigest, SubnetId(subnet_id)), SubnetTopic) # don't score attestation subnets for now

proc unsubscribeAttestationSubnets*(
node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) =
Expand Down
68 changes: 7 additions & 61 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ from eth/common/eth_types import BlockHashOrNumber
when defined(posix):
import system/ansi_c

from
libp2p/protocols/pubsub/gossipsub
import
TopicParams, validateParameters, init

type
RpcServer* = RpcHttpServer

Expand Down Expand Up @@ -522,61 +517,12 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
subscribeSubnets = subnetLog(subscribeSubnets),
unsubscribeSubnets = subnetLog(unsubscribeSubnets)

# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
const
blocksTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 1.1471603557060206,
firstMessageDeliveriesDecay: 0.9928302477768374,
firstMessageDeliveriesCap: 34.86870846001471,
meshMessageDeliveriesWeight: -458.31054878249114,
meshMessageDeliveriesDecay: 0.9716279515771061,
meshMessageDeliveriesThreshold: 0.6849191409056553,
meshMessageDeliveriesCap: 2.054757422716966,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -458.31054878249114 ,
meshFailurePenaltyDecay: 0.9716279515771061,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
aggregateTopicParams = TopicParams(
topicWeight: 0.5,
timeInMeshWeight: 0.03333333333333333,
timeInMeshQuantum: chronos.seconds(12),
timeInMeshCap: 300,
firstMessageDeliveriesWeight: 0.10764904539552399,
firstMessageDeliveriesDecay: 0.8659643233600653,
firstMessageDeliveriesCap: 371.5778421725158,
meshMessageDeliveriesWeight: -0.07538533073670682,
meshMessageDeliveriesDecay: 0.930572040929699,
meshMessageDeliveriesThreshold: 53.404248450179836,
meshMessageDeliveriesCap: 213.61699380071934,
meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesWindow: chronos.seconds(2),
meshFailurePenaltyWeight: -0.07538533073670682 ,
meshFailurePenaltyDecay: 0.930572040929699,
invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesDecay: 0.9971259067705325
)
basicParams = TopicParams.init()

static:
# compile time validation
blocksTopicParams.validateParameters().tryGet()
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()

proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.network.subscribe(getBeaconBlocksTopic(forkDigest), BlockTopic, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), OtherTopic)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), OtherTopic)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), OtherTopic)
node.network.subscribe(getAggregateAndProofsTopic(forkDigest), AggregateTopic, enableTopicMetrics = true)

# updateAttestationSubnetHandlers subscribes attestation subnets

Expand Down Expand Up @@ -609,10 +555,10 @@ proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
closureScope:
let idx = committeeIdx
# TODO This should be done in dynamic way in trackSyncCommitteeTopics
node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), basicParams)
node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), OtherTopic)
syncnets.setBit(idx.asInt)

node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), basicParams)
node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), OtherTopic)
node.network.updateSyncnetsMetadata(syncnets)

proc removeAltairMessageHandlers(node: BeaconNode) =
Expand Down