diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim index 88c679632d..309afb8c09 100644 --- a/beacon_chain/light_client.nim +++ b/beacon_chain/light_client.nim @@ -14,7 +14,7 @@ import chronicles, eth/keys, ./gossip_processing/light_client_processor, - ./networking/[eth2_network, topic_params], + ./networking/eth2_network, ./spec/datatypes/altair, ./spec/helpers, ./sync/light_client_manager, @@ -356,10 +356,8 @@ proc updateGossipStatus*( if gossipFork >= BeaconStateFork.Altair: let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork) lightClient.network.subscribe( - getLightClientFinalityUpdateTopic(forkDigest), - basicParams) + getLightClientFinalityUpdateTopic(forkDigest)) lightClient.network.subscribe( - getLightClientOptimisticUpdateTopic(forkDigest), - basicParams) + getLightClientOptimisticUpdateTopic(forkDigest)) lightClient.gossipState = targetGossipState diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index fa239c1b2a..ebe4d75eb6 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -28,11 +28,14 @@ import libp2p/stream/connection, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], + + # Local files ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, - "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] + "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores, + topic_scoring] export tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection, @@ -2242,17 +2245,17 @@ proc createEth2Node*(rng: ref HmacDrbgContext, historyGossip: 3, fanoutTTL: chronos.seconds(60), seenTTL: chronos.seconds(385), - gossipThreshold: -4000, - publishThreshold: -8000, - graylistThreshold: -16000, # also disconnect threshold + gossipThreshold: -40, + publishThreshold: -80, + graylistThreshold: -160, # also disconnect threshold opportunisticGraftThreshold: 0, - decayInterval: chronos.seconds(12), + decayInterval: chronos.seconds(4), decayToZero: 0.01, retainScore: chronos.seconds(385), appSpecificWeight: 0.0, - ipColocationFactorWeight: -53.75, + ipColocationFactorWeight: -5, ipColocationFactorThreshold: 3.0, - behaviourPenaltyWeight: -15.9, + behaviourPenaltyWeight: -5, behaviourPenaltyDecay: 0.986, disconnectBadPeers: true, directPeers: @@ -2299,13 +2302,71 @@ func announcedENR*(node: Eth2Node): enr.Record = func shortForm*(id: NetKeyPair): string = $PeerId.init(id.pubkey) +proc getTopicParams( + heartbeatPeriod: Duration, + slotPeriod: Duration, + peersPerTopic: int, + topicType: TopicScoringType): TopicParams = + case topicType: + of BlockTopic: + getTopicParams( + topicWeight = 0.1, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = 10, # Average over 10 slots, to smooth missing proposals + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = 1, # One proposal per slot + timeInMeshQuantum = chronos.seconds(15) # Stable topic + ) + of AggregateTopic: + getTopicParams( + topicWeight = 0.1, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = 2, + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = (TARGET_AGGREGATORS_PER_COMMITTEE * ATTESTATION_SUBNET_COUNT).int, + timeInMeshQuantum = chronos.seconds(15) # Stable topic + ) + of SubnetTopic: + getTopicParams( + topicWeight = 0.8 / ATTESTATION_SUBNET_COUNT, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = ATTESTATION_SUBNET_COUNT, # Smooth out empty committees + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = TARGET_COMMITTEE_SIZE.int, #TODO use current number + timeInMeshQuantum = chronos.seconds(6) # Flaky topic + ) + of OtherTopic: + TopicParams.init() + +static: + for topicType in ord(low(TopicScoringType))..ord(high(TopicScoringType)): + getTopicParams( + heartbeatPeriod = chronos.milliseconds(700), + slotPeriod = chronos.seconds(12), + peersPerTopic = 8, + TopicScoringType(topicType) + ).validateParameters().tryGet() + + +proc getTopicParams(node: Eth2Node, topicType: TopicScoringType): TopicParams = + let + heartbeatPeriod = node.pubsub.parameters.decayInterval + slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int) + peersPerTopic = node.pubsub.parameters.d + + getTopicParams(heartbeatPeriod, slotPeriod, peersPerTopic, topicType) + proc subscribe*( - node: Eth2Node, topic: string, topicParams: TopicParams, + node: Eth2Node, topic: string, enableTopicMetrics: bool = false) = if enableTopicMetrics: node.pubsub.knownTopics.incl(topic) - node.pubsub.topicParams[topic] = topicParams + #TODO + #node.pubsub.topicParams[topic] = node.getTopicParams(topicType) # Passing in `nil` because we do all message processing in the validator node.pubsub.subscribe(topic, nil) @@ -2417,7 +2478,7 @@ proc subscribeAttestationSubnets*( for subnet_id, enabled in subnets: if enabled: node.subscribe(getAttestationTopic( - forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now + forkDigest, SubnetId(subnet_id))) proc unsubscribeAttestationSubnets*( node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) = diff --git a/beacon_chain/networking/topic_params.nim b/beacon_chain/networking/topic_params.nim deleted file mode 100644 index 7bdf40ed59..0000000000 --- a/beacon_chain/networking/topic_params.nim +++ /dev/null @@ -1,67 +0,0 @@ -# beacon_chain -# Copyright (c) 2022 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import chronos - -from - libp2p/protocols/pubsub/gossipsub -import - TopicParams, validateParameters, init - -# inspired by lighthouse research here -# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py -const - blocksTopicParams* = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 1.1471603557060206, - firstMessageDeliveriesDecay: 0.9928302477768374, - firstMessageDeliveriesCap: 34.86870846001471, - meshMessageDeliveriesWeight: -458.31054878249114, - meshMessageDeliveriesDecay: 0.9716279515771061, - meshMessageDeliveriesThreshold: 0.6849191409056553, - meshMessageDeliveriesCap: 2.054757422716966, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -458.31054878249114 , - meshFailurePenaltyDecay: 0.9716279515771061, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - aggregateTopicParams* = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 0.10764904539552399, - firstMessageDeliveriesDecay: 0.8659643233600653, - firstMessageDeliveriesCap: 371.5778421725158, - meshMessageDeliveriesWeight: -0.07538533073670682, - meshMessageDeliveriesDecay: 0.930572040929699, - meshMessageDeliveriesThreshold: 53.404248450179836, - meshMessageDeliveriesCap: 213.61699380071934, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -0.07538533073670682 , - meshFailurePenaltyDecay: 0.930572040929699, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - basicParams* = TopicParams.init() - -static: - # compile time validation - blocksTopicParams.validateParameters().tryGet() - aggregateTopicParams.validateParameters().tryGet() - basicParams.validateParameters.tryGet() diff --git a/beacon_chain/networking/topic_scoring.nim b/beacon_chain/networking/topic_scoring.nim new file mode 100644 index 0000000000..3d722f748d --- /dev/null +++ b/beacon_chain/networking/topic_scoring.nim @@ -0,0 +1,159 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/math +import pkg/chronos + +from + libp2p/protocols/pubsub/gossipsub +import + TopicParams, validateParameters, init + + +# Gossipsub scoring explained: +# A score of a peer in a topic is the sum of 5 different scores: +# +# `timeInMesh`: for each `Quantum` spent in a mesh, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# Every following score decays: score is multiplied by `Decay` in every heartbeat +# until they reach `decayToZero` (0.1 by default) +# +# `firstMessageDelivery`: for each message delivered first, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# `meshMessageDeliveries`: The most convoluted way possible to punish +# peers not sending enough traffic in a topic. +# +# For each message (duplicate or first) received in a topic, the score is incremented, up to `Cap`. +# If the score of the topic gets below `Threshold`, the peer +# since at least `Activation` time will have: `score += (Threshold - Score)² * Weight` +# (`Weight` should be negative to punish them) +# +# `meshFailurePenalty`: same as meshMessageDeliveries, but only happens on prune +# to avoid peers constantly unsubbing-resubbing +# +# `invalidMessageDeliveries`: for each message not passing validation received, a peer +# score is incremented. Final score = `Score * Score * Weight` +# +# +# Once we have the 5 scores for each peer/topic, we sum them up per peer +# using the topicWeight of each topic. +# +# Nimbus strategy: +# Trying to get a 100 possible points in each topic before weighting +# And then, weight each topic to have 100 points max total +# +# In order of priority: +# - A badly behaving peer (eg, sending bad messages) will be heavily sanctionned +# - A good peer will gain good scores +# - Inactive/slow peers will be mildly sanctionned. +# +# Since a "slow" topic will punish everyone in it, we don't want to punish +# good peers which are unlucky and part of a slow topic. So, we give more points to +# good peers than we remove to slow peers +# +# Global topics are good to check stability of peers, but since we can only +# have ~20 peers in global topics, we need to score on subnets to have data +# about as much peers as possible, even the subnets are less stable +func computeDecay( + startValue: float, + endValue: float, + timeToEndValue: Duration, + heartbeatTime: Duration + ): float = + # startValue will to to endValue in timeToEndValue + # given the returned decay + + let heartbeatsToZero = timeToEndValue.milliseconds.float / heartbeatTime.milliseconds.float + pow(endValue / startValue, 1 / heartbeatsToZero) + +func computeMessageDeliveriesWeight( + messagesThreshold: float, + maxLostPoints: float): float = + + let maxDeficit = messagesThreshold + -maxLostPoints / (maxDeficit * maxDeficit) + +type TopicScoringType* = enum + BlockTopic, + AggregateTopic, + SubnetTopic, + OtherTopic + +proc getTopicParams*( + topicWeight: float, + heartbeatPeriod: Duration, + period: Duration, + averageOverNPeriods: float, + peersPerTopic: int, + expectedMessagesPerPeriod: int, + timeInMeshQuantum: Duration + ): TopicParams = + + let + # Statistically, a peer will be first for every `receivedMessage / d` + shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic + shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods + shouldBeFirstEvery = nanoseconds(period.nanoseconds div expectedMessagesPerPeriod) * peersPerTopic + firstMessageCap = shouldBeFirstOverNPeriod + + # If peer is first every `shouldBeFirstEvery` + # he will be able to stay at cap + firstMessageDecay = + computeDecay( + startValue = firstMessageCap, + endValue = firstMessageCap - 1, + timeToEndValue = shouldBeFirstEvery, + heartbeatPeriod) + + # Start to remove up to 30 points when peer send less + # than half message than expected + shouldSendAtLeastPerPeriod = expectedMessagesPerPeriod / 2 + shouldSendAtLeastOverNPeriod = shouldSendAtLeastPerPeriod * averageOverNPeriods + + messageDeliveryThreshold = shouldSendAtLeastOverNPeriod + messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0) + messageDeliveryDecay = + computeDecay( + startValue = expectedMessagesPerPeriod.float * averageOverNPeriods, + endValue = 0, + timeToEndValue = period * averageOverNPeriods.int, + heartbeatPeriod) + + # Invalid message should be remembered a long time + invalidMessageDecay = computeDecay( + startValue = 1, + endValue = 0.1, + timeToEndValue = chronos.minutes(1), + heartbeatPeriod) + + let topicParams = TopicParams( + topicWeight: topicWeight, + timeInMeshQuantum: timeInMeshQuantum, + timeInMeshCap: 200, # 20 points after timeInMeshQuantum * 200 + timeInMeshWeight: 0.1, # timeInMesh should be less powerful than inactive penalties + firstMessageDeliveriesCap: firstMessageCap, + firstMessageDeliveriesDecay: firstMessageDecay, + firstMessageDeliveriesWeight: 80.0 / firstMessageCap, # Max points: 80 + meshMessageDeliveriesWeight: messageDeliveryWeight, + meshMessageDeliveriesDecay: messageDeliveryDecay, + meshMessageDeliveriesThreshold: messageDeliveryThreshold, + meshMessageDeliveriesCap: expectedMessagesPerPeriod.float * averageOverNPeriods, + meshMessageDeliveriesActivation: period * averageOverNPeriods.int, + meshMessageDeliveriesWindow: chronos.milliseconds(10), + meshFailurePenaltyWeight: messageDeliveryWeight, + meshFailurePenaltyDecay: messageDeliveryDecay, + invalidMessageDeliveriesWeight: -1, # 10 invalid messages = -100 points + invalidMessageDeliveriesDecay: invalidMessageDecay + ) + topicParams diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 4a4552990d..82f6554f03 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -18,7 +18,6 @@ import eth/p2p/discoveryv5/[enr, random2], eth/keys, ./consensus_object_pools/vanity_logs/pandas, - ./networking/topic_params, ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[engine_authentication, weak_subjectivity], @@ -30,11 +29,6 @@ import when defined(posix): import system/ansi_c -from - libp2p/protocols/pubsub/gossipsub -import - TopicParams, validateParameters, init - when defined(windows): import winlean @@ -878,18 +872,18 @@ proc updateBlocksGossipStatus*( for gossipFork in newGossipForks: let forkDigest = node.dag.forkDigests[].atStateFork(gossipFork) node.network.subscribe( - getBeaconBlocksTopic(forkDigest), blocksTopicParams, + getBeaconBlocksTopic(forkDigest), enableTopicMetrics = true) node.blocksGossipState = targetGossipState proc addPhase0MessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = - node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams) + node.network.subscribe(getAttesterSlashingsTopic(forkDigest)) + node.network.subscribe(getProposerSlashingsTopic(forkDigest)) + node.network.subscribe(getVoluntaryExitsTopic(forkDigest)) node.network.subscribe( - getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, + getAggregateAndProofsTopic(forkDigest), enableTopicMetrics = true) # updateAttestationSubnetHandlers subscribes attestation subnets @@ -934,10 +928,10 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl for subcommitteeIdx in SyncSubcommitteeIndex: if currentSyncCommitteeSubnets[subcommitteeIdx]: node.network.subscribe( - getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams) + getSyncCommitteeTopic(forkDigest, subcommitteeIdx)) node.network.subscribe( - getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams) + getSyncCommitteeContributionAndProofTopic(forkDigest)) node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) @@ -987,7 +981,7 @@ proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) = if oldSyncSubnets[subcommitteeIdx]: node.network.unsubscribe(topic) elif newSyncSubnets[subcommitteeIdx]: - node.network.subscribe(topic, basicParams) + node.network.subscribe(topic) node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) @@ -1039,7 +1033,7 @@ proc trackNextSyncCommitteeTopics(node: BeaconNode, slot: Slot) = node.syncCommitteeMsgPool[].isEpochLeadTime(epochsToSyncPeriod.get): for gossipFork in node.gossipState: node.network.subscribe(getSyncCommitteeTopic( - forkDigests[gossipFork], subcommitteeIdx), basicParams) + forkDigests[gossipFork], subcommitteeIdx)) newSubcommittees.setBit(distinctBase(subcommitteeIdx)) debug "trackNextSyncCommitteeTopics: subscribing to sync committee subnets",