From 4538e00c12052fb38cf846da8631b3ccf2a4a077 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 26 Oct 2021 13:46:12 +0200 Subject: [PATCH 1/6] scoring v1 --- beacon_chain/networking/eth2_network.nim | 182 ++++++++++++++++++++++- beacon_chain/nimbus_beacon_node.nim | 68 +-------- 2 files changed, 185 insertions(+), 65 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 3faf746a49..791c564134 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1920,16 +1920,190 @@ proc announcedENR*(node: Eth2Node): enr.Record = proc shortForm*(id: NetKeyPair): string = $PeerID.init(id.pubkey) + +# Gossipsub scoring explained: +# A score of a peer in a topic is the sum of 5 different scores: +# +# `timeInMesh`: for each `Quantum` spent in a mesh, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# Every following score decays: score is multiplied by `Decay` in every heartbeat +# until they reach `decayToZero` (0.1 by default) +# +# `firstMessageDelivery`: for each message delivered first, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# `meshMessageDeliveries`: The most convoluted way possible to punish +# peers in topics with low traffic. +# +# For each unique message received in a topic, the topic score is incremented, up to `Cap`. +# If the score of the topic gets below `Threshold`, each peer present in the topic +# since at least `Activation` time will have: `score += (Threshold - Score)² * Weight` +# (`Weight` should be negative to punish them) +# +# `meshFailurePenalty`: same as meshMessageDeliveries, but only happens on prune +# to avoid peers constantly unsubbing-resubbing +# +# `invalidMessageDeliveries`: for each message not passing validation received, a peer +# score is incremented. Final score = `Score * Score * Weight` +# +# +# Once we have the 5 scores for each peer/topic, we sum them up per peer +# using the topicWeight of each topic. +# +# Nimbus strategy: +# Trying to get a 100 possible points in each topic before weighting +# And then, weight each topic to have 100 points max total +# +# In order of priority: +# - A badly behaving peer (eg, sending bad messages) will be heavily sanctionned +# - A good peer will gain good scores +# - Inactive/slow peers will be mildly sanctionned. +# +# Since a "slow" topic will punish everyone in it, we don't want to punish +# good peers which are unlucky and part of a slow topic. So, we give more points to +# good peers than we remove to slow peers +# +# Global topics are good to check stability of peers, but since we can only +# have ~20 peers in global topics, we need to score on subnets to have data +# about as much peers as possible, even the subnets are less stable +func computeDecay( + startValue: float, + endValue: float, + timeToEndValue: Duration, + heartbeatTime: Duration + ): float = + # startValue will to to endValue in timeToEndValue + # given the returned decay + + let heartbeatsToZero = timeToEndValue.seconds.float / heartbeatTime.seconds.float + pow(endValue / startValue, 1 / heartbeatsToZero) + +func computeMessageDeliveriesWeight( + messagesThreshold: float, + maxLostPoints: float): float = + + let maxDeficit = messagesThreshold + -maxLostPoints / (maxDeficit * maxDeficit) + +type TopicType* = enum + BlockTopic, + AggregateTopic, + SubnetTopic, + OtherTopic + +proc getTopicParams( + topicWeight: float, + heartbeatPeriod: Duration, + period: Duration, + averageOverNPeriods: float, + peersPerTopic: int, + expectedMessagesPerPeriod: int, + timeInMeshQuantum: Duration + ): TopicParams = + + let + # Statistically, a peer will be first for every `receivedMessage / d` + shouldSendPerPeriod = expectedMessagesPerPeriod / peersPerTopic + shouldSendOverNPeriod = shouldSendPerPeriod * averageOverNPeriods + + # A peer being first in 1/d% messages will reach a score of + # `shouldSendOverNPeriod` + firstMessageDecay = + computeDecay( + startValue = shouldSendOverNPeriod, + endValue = 0.1, + timeToEndValue = period * averageOverNPeriods.int, + heartbeatPeriod) + + # Start to remove up to 30 points when <80% message received in N periods + messageDeliveryThreshold = 1.float + messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0) + messageDeliveryDecay = + computeDecay( + startValue = expectedMessagesPerPeriod.float * averageOverNPeriods, + endValue = 0, + timeToEndValue = period * averageOverNPeriods.int, + heartbeatPeriod) + + # Invalid message should be remembered a long time + invalidMessageDecay = computeDecay( + startValue = 1, + endValue = 0.1, + timeToEndValue = chronos.minutes(10), + heartbeatPeriod) + + TopicParams( + topicWeight: topicWeight, + timeInMeshWeight: 0.1, + timeInMeshQuantum: timeInMeshQuantum, + timeInMeshCap: 300, # 30 points after timeInMeshQuantum * 300 + firstMessageDeliveriesWeight: 35.0 / shouldSendOverNPeriod, + firstMessageDeliveriesDecay: firstMessageDecay, + firstMessageDeliveriesCap: shouldSendOverNPeriod, # Max points: 70 + meshMessageDeliveriesWeight: messageDeliveryWeight, + meshMessageDeliveriesDecay: messageDeliveryDecay, + meshMessageDeliveriesThreshold: messageDeliveryThreshold, + meshMessageDeliveriesCap: expectedMessagesPerPeriod.float * averageOverNPeriods, + meshMessageDeliveriesActivation: period * averageOverNPeriods.int, + meshMessageDeliveriesWindow: chronos.milliseconds(10), + meshFailurePenaltyWeight: messageDeliveryWeight, + meshFailurePenaltyDecay: messageDeliveryDecay, + invalidMessageDeliveriesWeight: -5, # Invalid messages are badly penalized + invalidMessageDeliveriesDecay: invalidMessageDecay + ) + + +proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams = + let + heartbeatPeriod = node.pubsub.parameters.heartbeatInterval + slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int) + peersPerTopic = node.pubsub.parameters.d + + case topicType: + of BlockTopic: + getTopicParams( + topicWeight = 0.1, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = 10, # Average over 10 slots, to smooth missing proposals + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = 1, # One proposal per slot + timeInMeshQuantum = chronos.seconds(15) # Stable topic + ) + of AggregateTopic: + getTopicParams( + topicWeight = 0.1, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = 2, + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = (TARGET_AGGREGATORS_PER_COMMITTEE * ATTESTATION_SUBNET_COUNT).int, + timeInMeshQuantum = chronos.seconds(15) # Stable topic + ) + of SubnetTopic: + getTopicParams( + topicWeight = 0.8 / ATTESTATION_SUBNET_COUNT, + heartbeatPeriod = heartbeatPeriod, + period = slotPeriod, + averageOverNPeriods = ATTESTATION_SUBNET_COUNT, # Smooth out empty committees + peersPerTopic = peersPerTopic, + expectedMessagesPerPeriod = TARGET_COMMITTEE_SIZE.int, #TODO use current number + timeInMeshQuantum = chronos.seconds(6) # Flaky topic + ) + of OtherTopic: + TopicParams.init() + proc subscribe*( - node: Eth2Node, topic: string, topicParams: TopicParams, + node: Eth2Node, topic: string, topicType: TopicType, enableTopicMetrics: bool = false) = if enableTopicMetrics: node.pubsub.knownTopics.incl(topic) - node.pubsub.topicParams[topic] = topicParams + node.pubsub.topicParams[topic] = node.getTopicParams(topicType) # Passing in `nil` because we do all message processing in the validator - node.pubsub.subscribe(topic, nil) + node.pubsub.subscribe(topic, nim) proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] = let res = newFuture[ValidationResult]("eth2_network.execValidator") @@ -2042,7 +2216,7 @@ proc subscribeAttestationSubnets*( for subnet_id, enabled in subnets: if enabled: node.subscribe(getAttestationTopic( - forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now + forkDigest, SubnetId(subnet_id)), SubnetTopic) # don't score attestation subnets for now proc unsubscribeAttestationSubnets*( node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) = diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 9960b96a19..c2b666651d 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -49,11 +49,6 @@ from eth/common/eth_types import BlockHashOrNumber when defined(posix): import system/ansi_c -from - libp2p/protocols/pubsub/gossipsub -import - TopicParams, validateParameters, init - type RpcServer* = RpcHttpServer @@ -522,61 +517,12 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) = subscribeSubnets = subnetLog(subscribeSubnets), unsubscribeSubnets = subnetLog(unsubscribeSubnets) -# inspired by lighthouse research here -# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py -const - blocksTopicParams = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 1.1471603557060206, - firstMessageDeliveriesDecay: 0.9928302477768374, - firstMessageDeliveriesCap: 34.86870846001471, - meshMessageDeliveriesWeight: -458.31054878249114, - meshMessageDeliveriesDecay: 0.9716279515771061, - meshMessageDeliveriesThreshold: 0.6849191409056553, - meshMessageDeliveriesCap: 2.054757422716966, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -458.31054878249114 , - meshFailurePenaltyDecay: 0.9716279515771061, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - aggregateTopicParams = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 0.10764904539552399, - firstMessageDeliveriesDecay: 0.8659643233600653, - firstMessageDeliveriesCap: 371.5778421725158, - meshMessageDeliveriesWeight: -0.07538533073670682, - meshMessageDeliveriesDecay: 0.930572040929699, - meshMessageDeliveriesThreshold: 53.404248450179836, - meshMessageDeliveriesCap: 213.61699380071934, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -0.07538533073670682 , - meshFailurePenaltyDecay: 0.930572040929699, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - basicParams = TopicParams.init() - -static: - # compile time validation - blocksTopicParams.validateParameters().tryGet() - aggregateTopicParams.validateParameters().tryGet() - basicParams.validateParameters.tryGet() - proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = - node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true) - node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams) - node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true) + node.network.subscribe(getBeaconBlocksTopic(forkDigest), BlockTopic, enableTopicMetrics = true) + node.network.subscribe(getAttesterSlashingsTopic(forkDigest), OtherTopic) + node.network.subscribe(getProposerSlashingsTopic(forkDigest), OtherTopic) + node.network.subscribe(getVoluntaryExitsTopic(forkDigest), OtherTopic) + node.network.subscribe(getAggregateAndProofsTopic(forkDigest), AggregateTopic, enableTopicMetrics = true) # updateAttestationSubnetHandlers subscribes attestation subnets @@ -609,10 +555,10 @@ proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) = closureScope: let idx = committeeIdx # TODO This should be done in dynamic way in trackSyncCommitteeTopics - node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), basicParams) + node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), OtherTopic) syncnets.setBit(idx.asInt) - node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), basicParams) + node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), OtherTopic) node.network.updateSyncnetsMetadata(syncnets) proc removeAltairMessageHandlers(node: BeaconNode) = From a2d74c0f17786de6e45ddb74decd60a588688859 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 26 Oct 2021 15:14:09 +0200 Subject: [PATCH 2/6] fix scoring --- beacon_chain/networking/eth2_network.nim | 73 +++++++++++++++--------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 791c564134..3e2dd09e2e 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1865,17 +1865,17 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, historyGossip: 3, fanoutTTL: 60.seconds, seenTTL: 385.seconds, - gossipThreshold: -4000, - publishThreshold: -8000, - graylistThreshold: -16000, # also disconnect threshold - opportunisticGraftThreshold: 0, - decayInterval: 12.seconds, + gossipThreshold: -40, + publishThreshold: -80, + graylistThreshold: -160, # also disconnect threshold + opportunisticGraftThreshold: 1, + decayInterval: 12.seconds, #TODO this is not used in libp2p decayToZero: 0.01, retainScore: 385.seconds, appSpecificWeight: 0.0, - ipColocationFactorWeight: -53.75, + ipColocationFactorWeight: -5, ipColocationFactorThreshold: 3.0, - behaviourPenaltyWeight: -15.9, + behaviourPenaltyWeight: -5, behaviourPenaltyDecay: 0.986, disconnectBadPeers: true, directPeers: @@ -1934,10 +1934,10 @@ proc shortForm*(id: NetKeyPair): string = # the peer will win `Weight` points, up to `(Cap * Weight)` # # `meshMessageDeliveries`: The most convoluted way possible to punish -# peers in topics with low traffic. +# peers not sending enough traffic in a topic. # -# For each unique message received in a topic, the topic score is incremented, up to `Cap`. -# If the score of the topic gets below `Threshold`, each peer present in the topic +# For each message (duplicate or first) received in a topic, the score is incremented, up to `Cap`. +# If the score of the topic gets below `Threshold`, the peer # since at least `Activation` time will have: `score += (Threshold - Score)² * Weight` # (`Weight` should be negative to punish them) # @@ -1976,7 +1976,7 @@ func computeDecay( # startValue will to to endValue in timeToEndValue # given the returned decay - let heartbeatsToZero = timeToEndValue.seconds.float / heartbeatTime.seconds.float + let heartbeatsToZero = timeToEndValue.milliseconds.float / heartbeatTime.milliseconds.float pow(endValue / startValue, 1 / heartbeatsToZero) func computeMessageDeliveriesWeight( @@ -2004,20 +2004,24 @@ proc getTopicParams( let # Statistically, a peer will be first for every `receivedMessage / d` - shouldSendPerPeriod = expectedMessagesPerPeriod / peersPerTopic - shouldSendOverNPeriod = shouldSendPerPeriod * averageOverNPeriods + shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic + shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods # A peer being first in 1/d% messages will reach a score of # `shouldSendOverNPeriod` firstMessageDecay = computeDecay( - startValue = shouldSendOverNPeriod, + startValue = shouldBeFirstOverNPeriod, endValue = 0.1, timeToEndValue = period * averageOverNPeriods.int, heartbeatPeriod) - # Start to remove up to 30 points when <80% message received in N periods - messageDeliveryThreshold = 1.float + # Start to remove up to 30 points when peer send less + # than half message than expected + shouldSendAtLeastPerPeriod = expectedMessagesPerPeriod / 2 + shouldSendAtLeastOverNPeriod = shouldSendAtLeastPerPeriod * averageOverNPeriods + + messageDeliveryThreshold = shouldSendAtLeastOverNPeriod messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0) messageDeliveryDecay = computeDecay( @@ -2033,14 +2037,14 @@ proc getTopicParams( timeToEndValue = chronos.minutes(10), heartbeatPeriod) - TopicParams( + let topicParams = TopicParams( topicWeight: topicWeight, timeInMeshWeight: 0.1, timeInMeshQuantum: timeInMeshQuantum, timeInMeshCap: 300, # 30 points after timeInMeshQuantum * 300 - firstMessageDeliveriesWeight: 35.0 / shouldSendOverNPeriod, + firstMessageDeliveriesWeight: 35.0 / shouldBeFirstOverNPeriod, firstMessageDeliveriesDecay: firstMessageDecay, - firstMessageDeliveriesCap: shouldSendOverNPeriod, # Max points: 70 + firstMessageDeliveriesCap: shouldBeFirstOverNPeriod, # Max points: 70 meshMessageDeliveriesWeight: messageDeliveryWeight, meshMessageDeliveriesDecay: messageDeliveryDecay, meshMessageDeliveriesThreshold: messageDeliveryThreshold, @@ -2052,14 +2056,13 @@ proc getTopicParams( invalidMessageDeliveriesWeight: -5, # Invalid messages are badly penalized invalidMessageDeliveriesDecay: invalidMessageDecay ) + topicParams - -proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams = - let - heartbeatPeriod = node.pubsub.parameters.heartbeatInterval - slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int) - peersPerTopic = node.pubsub.parameters.d - +proc getTopicParams( + heartbeatPeriod: Duration, + slotPeriod: Duration, + peersPerTopic: int, + topicType: TopicType): TopicParams = case topicType: of BlockTopic: getTopicParams( @@ -2094,6 +2097,24 @@ proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams = of OtherTopic: TopicParams.init() +static: + for topicType in ord(low(TopicType))..ord(high(TopicType)): + getTopicParams( + heartbeatPeriod = chronos.milliseconds(700), + slotPeriod = chronos.seconds(12), + peersPerTopic = 8, + TopicType(topicType) + ).validateParameters().tryGet() + + +proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams = + let + heartbeatPeriod = node.pubsub.parameters.heartbeatInterval + slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int) + peersPerTopic = node.pubsub.parameters.d + + getTopicParams(heartbeatPeriod, slotPeriod, peersPerTopic, topicType) + proc subscribe*( node: Eth2Node, topic: string, topicType: TopicType, enableTopicMetrics: bool = false) = From 9490809861de77a7c2c918c717340f21e2f336e9 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 26 Oct 2021 15:30:25 +0200 Subject: [PATCH 3/6] fixup values --- beacon_chain/networking/eth2_network.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 3e2dd09e2e..e1275487dc 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -2013,7 +2013,7 @@ proc getTopicParams( computeDecay( startValue = shouldBeFirstOverNPeriod, endValue = 0.1, - timeToEndValue = period * averageOverNPeriods.int, + timeToEndValue = period * averageOverNPeriods.int * 2, heartbeatPeriod) # Start to remove up to 30 points when peer send less @@ -2034,7 +2034,7 @@ proc getTopicParams( invalidMessageDecay = computeDecay( startValue = 1, endValue = 0.1, - timeToEndValue = chronos.minutes(10), + timeToEndValue = chronos.minutes(1), heartbeatPeriod) let topicParams = TopicParams( @@ -2053,7 +2053,7 @@ proc getTopicParams( meshMessageDeliveriesWindow: chronos.milliseconds(10), meshFailurePenaltyWeight: messageDeliveryWeight, meshFailurePenaltyDecay: messageDeliveryDecay, - invalidMessageDeliveriesWeight: -5, # Invalid messages are badly penalized + invalidMessageDeliveriesWeight: -1, # 10 invalid messages = -100 points invalidMessageDeliveriesDecay: invalidMessageDecay ) topicParams From f0c3e2a6b2b9b24ac2014cb1ddb75044ac11d89c Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 26 Oct 2021 15:41:01 +0200 Subject: [PATCH 4/6] fix typo --- beacon_chain/networking/eth2_network.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index e1275487dc..b4650bd42c 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -2124,7 +2124,7 @@ proc subscribe*( node.pubsub.topicParams[topic] = node.getTopicParams(topicType) # Passing in `nil` because we do all message processing in the validator - node.pubsub.subscribe(topic, nim) + node.pubsub.subscribe(topic, nil) proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] = let res = newFuture[ValidationResult]("eth2_network.execValidator") From ad74b5d215efc10dc3b2cb68bc352f3380ee7ee8 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Thu, 28 Oct 2021 15:46:33 +0200 Subject: [PATCH 5/6] tweak scoring params --- beacon_chain/networking/eth2_network.nim | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index b4650bd42c..94e9563e06 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -2006,14 +2006,16 @@ proc getTopicParams( # Statistically, a peer will be first for every `receivedMessage / d` shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods + shouldBeFirstEvery = nanoseconds(period.nanoseconds div expectedMessagesPerPeriod) * peersPerTopic + firstMessageCap = shouldBeFirstOverNPeriod - # A peer being first in 1/d% messages will reach a score of - # `shouldSendOverNPeriod` + # If peer is first every `shouldBeFirstEvery` + # he will be able to stay at cap firstMessageDecay = computeDecay( - startValue = shouldBeFirstOverNPeriod, - endValue = 0.1, - timeToEndValue = period * averageOverNPeriods.int * 2, + startValue = firstMessageCap, + endValue = firstMessageCap - 1, + timeToEndValue = shouldBeFirstEvery, heartbeatPeriod) # Start to remove up to 30 points when peer send less @@ -2039,12 +2041,12 @@ proc getTopicParams( let topicParams = TopicParams( topicWeight: topicWeight, - timeInMeshWeight: 0.1, timeInMeshQuantum: timeInMeshQuantum, - timeInMeshCap: 300, # 30 points after timeInMeshQuantum * 300 - firstMessageDeliveriesWeight: 35.0 / shouldBeFirstOverNPeriod, + timeInMeshCap: 200, # 20 points after timeInMeshQuantum * 200 + timeInMeshWeight: 0.1, # timeInMesh should be less powerful than inactive penalties + firstMessageDeliveriesCap: firstMessageCap, firstMessageDeliveriesDecay: firstMessageDecay, - firstMessageDeliveriesCap: shouldBeFirstOverNPeriod, # Max points: 70 + firstMessageDeliveriesWeight: 80.0 / firstMessageCap, # Max points: 80 meshMessageDeliveriesWeight: messageDeliveryWeight, meshMessageDeliveriesDecay: messageDeliveryDecay, meshMessageDeliveriesThreshold: messageDeliveryThreshold, From 48011610e8402f4a573d4d43eb9b7c5eb3238f10 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Thu, 8 Sep 2022 14:38:50 +0200 Subject: [PATCH 6/6] First pass of cleaning --- beacon_chain/light_client.nim | 8 +- beacon_chain/networking/eth2_network.nim | 164 ++-------------------- beacon_chain/networking/topic_params.nim | 67 --------- beacon_chain/networking/topic_scoring.nim | 159 +++++++++++++++++++++ beacon_chain/nimbus_beacon_node.nim | 19 ++- 5 files changed, 185 insertions(+), 232 deletions(-) delete mode 100644 beacon_chain/networking/topic_params.nim create mode 100644 beacon_chain/networking/topic_scoring.nim diff --git a/beacon_chain/light_client.nim b/beacon_chain/light_client.nim index 88c679632d..309afb8c09 100644 --- a/beacon_chain/light_client.nim +++ b/beacon_chain/light_client.nim @@ -14,7 +14,7 @@ import chronicles, eth/keys, ./gossip_processing/light_client_processor, - ./networking/[eth2_network, topic_params], + ./networking/eth2_network, ./spec/datatypes/altair, ./spec/helpers, ./sync/light_client_manager, @@ -356,10 +356,8 @@ proc updateGossipStatus*( if gossipFork >= BeaconStateFork.Altair: let forkDigest = lightClient.forkDigests[].atStateFork(gossipFork) lightClient.network.subscribe( - getLightClientFinalityUpdateTopic(forkDigest), - basicParams) + getLightClientFinalityUpdateTopic(forkDigest)) lightClient.network.subscribe( - getLightClientOptimisticUpdateTopic(forkDigest), - basicParams) + getLightClientOptimisticUpdateTopic(forkDigest)) lightClient.gossipState = targetGossipState diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index ca3bcc7035..ebe4d75eb6 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -28,11 +28,14 @@ import libp2p/stream/connection, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], + + # Local files ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, - "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] + "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores, + topic_scoring] export tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection, @@ -2246,7 +2249,7 @@ proc createEth2Node*(rng: ref HmacDrbgContext, publishThreshold: -80, graylistThreshold: -160, # also disconnect threshold opportunisticGraftThreshold: 0, - decayInterval: chronos.seconds(12), + decayInterval: chronos.seconds(4), decayToZero: 0.01, retainScore: chronos.seconds(385), appSpecificWeight: 0.0, @@ -2299,151 +2302,11 @@ func announcedENR*(node: Eth2Node): enr.Record = func shortForm*(id: NetKeyPair): string = $PeerId.init(id.pubkey) - -# Gossipsub scoring explained: -# A score of a peer in a topic is the sum of 5 different scores: -# -# `timeInMesh`: for each `Quantum` spent in a mesh, -# the peer will win `Weight` points, up to `(Cap * Weight)` -# -# Every following score decays: score is multiplied by `Decay` in every heartbeat -# until they reach `decayToZero` (0.1 by default) -# -# `firstMessageDelivery`: for each message delivered first, -# the peer will win `Weight` points, up to `(Cap * Weight)` -# -# `meshMessageDeliveries`: The most convoluted way possible to punish -# peers not sending enough traffic in a topic. -# -# For each message (duplicate or first) received in a topic, the score is incremented, up to `Cap`. -# If the score of the topic gets below `Threshold`, the peer -# since at least `Activation` time will have: `score += (Threshold - Score)² * Weight` -# (`Weight` should be negative to punish them) -# -# `meshFailurePenalty`: same as meshMessageDeliveries, but only happens on prune -# to avoid peers constantly unsubbing-resubbing -# -# `invalidMessageDeliveries`: for each message not passing validation received, a peer -# score is incremented. Final score = `Score * Score * Weight` -# -# -# Once we have the 5 scores for each peer/topic, we sum them up per peer -# using the topicWeight of each topic. -# -# Nimbus strategy: -# Trying to get a 100 possible points in each topic before weighting -# And then, weight each topic to have 100 points max total -# -# In order of priority: -# - A badly behaving peer (eg, sending bad messages) will be heavily sanctionned -# - A good peer will gain good scores -# - Inactive/slow peers will be mildly sanctionned. -# -# Since a "slow" topic will punish everyone in it, we don't want to punish -# good peers which are unlucky and part of a slow topic. So, we give more points to -# good peers than we remove to slow peers -# -# Global topics are good to check stability of peers, but since we can only -# have ~20 peers in global topics, we need to score on subnets to have data -# about as much peers as possible, even the subnets are less stable -func computeDecay( - startValue: float, - endValue: float, - timeToEndValue: Duration, - heartbeatTime: Duration - ): float = - # startValue will to to endValue in timeToEndValue - # given the returned decay - - let heartbeatsToZero = timeToEndValue.milliseconds.float / heartbeatTime.milliseconds.float - pow(endValue / startValue, 1 / heartbeatsToZero) - -func computeMessageDeliveriesWeight( - messagesThreshold: float, - maxLostPoints: float): float = - - let maxDeficit = messagesThreshold - -maxLostPoints / (maxDeficit * maxDeficit) - -type TopicType* = enum - BlockTopic, - AggregateTopic, - SubnetTopic, - OtherTopic - -proc getTopicParams( - topicWeight: float, - heartbeatPeriod: Duration, - period: Duration, - averageOverNPeriods: float, - peersPerTopic: int, - expectedMessagesPerPeriod: int, - timeInMeshQuantum: Duration - ): TopicParams = - - let - # Statistically, a peer will be first for every `receivedMessage / d` - shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic - shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods - shouldBeFirstEvery = nanoseconds(period.nanoseconds div expectedMessagesPerPeriod) * peersPerTopic - firstMessageCap = shouldBeFirstOverNPeriod - - # If peer is first every `shouldBeFirstEvery` - # he will be able to stay at cap - firstMessageDecay = - computeDecay( - startValue = firstMessageCap, - endValue = firstMessageCap - 1, - timeToEndValue = shouldBeFirstEvery, - heartbeatPeriod) - - # Start to remove up to 30 points when peer send less - # than half message than expected - shouldSendAtLeastPerPeriod = expectedMessagesPerPeriod / 2 - shouldSendAtLeastOverNPeriod = shouldSendAtLeastPerPeriod * averageOverNPeriods - - messageDeliveryThreshold = shouldSendAtLeastOverNPeriod - messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0) - messageDeliveryDecay = - computeDecay( - startValue = expectedMessagesPerPeriod.float * averageOverNPeriods, - endValue = 0, - timeToEndValue = period * averageOverNPeriods.int, - heartbeatPeriod) - - # Invalid message should be remembered a long time - invalidMessageDecay = computeDecay( - startValue = 1, - endValue = 0.1, - timeToEndValue = chronos.minutes(1), - heartbeatPeriod) - - let topicParams = TopicParams( - topicWeight: topicWeight, - timeInMeshQuantum: timeInMeshQuantum, - timeInMeshCap: 200, # 20 points after timeInMeshQuantum * 200 - timeInMeshWeight: 0.1, # timeInMesh should be less powerful than inactive penalties - firstMessageDeliveriesCap: firstMessageCap, - firstMessageDeliveriesDecay: firstMessageDecay, - firstMessageDeliveriesWeight: 80.0 / firstMessageCap, # Max points: 80 - meshMessageDeliveriesWeight: messageDeliveryWeight, - meshMessageDeliveriesDecay: messageDeliveryDecay, - meshMessageDeliveriesThreshold: messageDeliveryThreshold, - meshMessageDeliveriesCap: expectedMessagesPerPeriod.float * averageOverNPeriods, - meshMessageDeliveriesActivation: period * averageOverNPeriods.int, - meshMessageDeliveriesWindow: chronos.milliseconds(10), - meshFailurePenaltyWeight: messageDeliveryWeight, - meshFailurePenaltyDecay: messageDeliveryDecay, - invalidMessageDeliveriesWeight: -1, # 10 invalid messages = -100 points - invalidMessageDeliveriesDecay: invalidMessageDecay - ) - topicParams - proc getTopicParams( heartbeatPeriod: Duration, slotPeriod: Duration, peersPerTopic: int, - topicType: TopicType): TopicParams = + topicType: TopicScoringType): TopicParams = case topicType: of BlockTopic: getTopicParams( @@ -2479,30 +2342,31 @@ proc getTopicParams( TopicParams.init() static: - for topicType in ord(low(TopicType))..ord(high(TopicType)): + for topicType in ord(low(TopicScoringType))..ord(high(TopicScoringType)): getTopicParams( heartbeatPeriod = chronos.milliseconds(700), slotPeriod = chronos.seconds(12), peersPerTopic = 8, - TopicType(topicType) + TopicScoringType(topicType) ).validateParameters().tryGet() -proc getTopicParams(node: Eth2Node, topicType: TopicType): TopicParams = +proc getTopicParams(node: Eth2Node, topicType: TopicScoringType): TopicParams = let - heartbeatPeriod = node.pubsub.parameters.heartbeatInterval + heartbeatPeriod = node.pubsub.parameters.decayInterval slotPeriod = chronos.seconds(SECONDS_PER_SLOT.int) peersPerTopic = node.pubsub.parameters.d getTopicParams(heartbeatPeriod, slotPeriod, peersPerTopic, topicType) proc subscribe*( - node: Eth2Node, topic: string, topicType: TopicType, + node: Eth2Node, topic: string, enableTopicMetrics: bool = false) = if enableTopicMetrics: node.pubsub.knownTopics.incl(topic) - node.pubsub.topicParams[topic] = node.getTopicParams(topicType) + #TODO + #node.pubsub.topicParams[topic] = node.getTopicParams(topicType) # Passing in `nil` because we do all message processing in the validator node.pubsub.subscribe(topic, nil) @@ -2614,7 +2478,7 @@ proc subscribeAttestationSubnets*( for subnet_id, enabled in subnets: if enabled: node.subscribe(getAttestationTopic( - forkDigest, SubnetId(subnet_id)), SubnetTopic) # don't score attestation subnets for now + forkDigest, SubnetId(subnet_id))) proc unsubscribeAttestationSubnets*( node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) = diff --git a/beacon_chain/networking/topic_params.nim b/beacon_chain/networking/topic_params.nim deleted file mode 100644 index 7bdf40ed59..0000000000 --- a/beacon_chain/networking/topic_params.nim +++ /dev/null @@ -1,67 +0,0 @@ -# beacon_chain -# Copyright (c) 2022 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import chronos - -from - libp2p/protocols/pubsub/gossipsub -import - TopicParams, validateParameters, init - -# inspired by lighthouse research here -# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py -const - blocksTopicParams* = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 1.1471603557060206, - firstMessageDeliveriesDecay: 0.9928302477768374, - firstMessageDeliveriesCap: 34.86870846001471, - meshMessageDeliveriesWeight: -458.31054878249114, - meshMessageDeliveriesDecay: 0.9716279515771061, - meshMessageDeliveriesThreshold: 0.6849191409056553, - meshMessageDeliveriesCap: 2.054757422716966, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -458.31054878249114 , - meshFailurePenaltyDecay: 0.9716279515771061, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - aggregateTopicParams* = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 0.10764904539552399, - firstMessageDeliveriesDecay: 0.8659643233600653, - firstMessageDeliveriesCap: 371.5778421725158, - meshMessageDeliveriesWeight: -0.07538533073670682, - meshMessageDeliveriesDecay: 0.930572040929699, - meshMessageDeliveriesThreshold: 53.404248450179836, - meshMessageDeliveriesCap: 213.61699380071934, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -0.07538533073670682 , - meshFailurePenaltyDecay: 0.930572040929699, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - basicParams* = TopicParams.init() - -static: - # compile time validation - blocksTopicParams.validateParameters().tryGet() - aggregateTopicParams.validateParameters().tryGet() - basicParams.validateParameters.tryGet() diff --git a/beacon_chain/networking/topic_scoring.nim b/beacon_chain/networking/topic_scoring.nim new file mode 100644 index 0000000000..3d722f748d --- /dev/null +++ b/beacon_chain/networking/topic_scoring.nim @@ -0,0 +1,159 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/math +import pkg/chronos + +from + libp2p/protocols/pubsub/gossipsub +import + TopicParams, validateParameters, init + + +# Gossipsub scoring explained: +# A score of a peer in a topic is the sum of 5 different scores: +# +# `timeInMesh`: for each `Quantum` spent in a mesh, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# Every following score decays: score is multiplied by `Decay` in every heartbeat +# until they reach `decayToZero` (0.1 by default) +# +# `firstMessageDelivery`: for each message delivered first, +# the peer will win `Weight` points, up to `(Cap * Weight)` +# +# `meshMessageDeliveries`: The most convoluted way possible to punish +# peers not sending enough traffic in a topic. +# +# For each message (duplicate or first) received in a topic, the score is incremented, up to `Cap`. +# If the score of the topic gets below `Threshold`, the peer +# since at least `Activation` time will have: `score += (Threshold - Score)² * Weight` +# (`Weight` should be negative to punish them) +# +# `meshFailurePenalty`: same as meshMessageDeliveries, but only happens on prune +# to avoid peers constantly unsubbing-resubbing +# +# `invalidMessageDeliveries`: for each message not passing validation received, a peer +# score is incremented. Final score = `Score * Score * Weight` +# +# +# Once we have the 5 scores for each peer/topic, we sum them up per peer +# using the topicWeight of each topic. +# +# Nimbus strategy: +# Trying to get a 100 possible points in each topic before weighting +# And then, weight each topic to have 100 points max total +# +# In order of priority: +# - A badly behaving peer (eg, sending bad messages) will be heavily sanctionned +# - A good peer will gain good scores +# - Inactive/slow peers will be mildly sanctionned. +# +# Since a "slow" topic will punish everyone in it, we don't want to punish +# good peers which are unlucky and part of a slow topic. So, we give more points to +# good peers than we remove to slow peers +# +# Global topics are good to check stability of peers, but since we can only +# have ~20 peers in global topics, we need to score on subnets to have data +# about as much peers as possible, even the subnets are less stable +func computeDecay( + startValue: float, + endValue: float, + timeToEndValue: Duration, + heartbeatTime: Duration + ): float = + # startValue will to to endValue in timeToEndValue + # given the returned decay + + let heartbeatsToZero = timeToEndValue.milliseconds.float / heartbeatTime.milliseconds.float + pow(endValue / startValue, 1 / heartbeatsToZero) + +func computeMessageDeliveriesWeight( + messagesThreshold: float, + maxLostPoints: float): float = + + let maxDeficit = messagesThreshold + -maxLostPoints / (maxDeficit * maxDeficit) + +type TopicScoringType* = enum + BlockTopic, + AggregateTopic, + SubnetTopic, + OtherTopic + +proc getTopicParams*( + topicWeight: float, + heartbeatPeriod: Duration, + period: Duration, + averageOverNPeriods: float, + peersPerTopic: int, + expectedMessagesPerPeriod: int, + timeInMeshQuantum: Duration + ): TopicParams = + + let + # Statistically, a peer will be first for every `receivedMessage / d` + shouldBeFirstPerPeriod = expectedMessagesPerPeriod / peersPerTopic + shouldBeFirstOverNPeriod = shouldBeFirstPerPeriod * averageOverNPeriods + shouldBeFirstEvery = nanoseconds(period.nanoseconds div expectedMessagesPerPeriod) * peersPerTopic + firstMessageCap = shouldBeFirstOverNPeriod + + # If peer is first every `shouldBeFirstEvery` + # he will be able to stay at cap + firstMessageDecay = + computeDecay( + startValue = firstMessageCap, + endValue = firstMessageCap - 1, + timeToEndValue = shouldBeFirstEvery, + heartbeatPeriod) + + # Start to remove up to 30 points when peer send less + # than half message than expected + shouldSendAtLeastPerPeriod = expectedMessagesPerPeriod / 2 + shouldSendAtLeastOverNPeriod = shouldSendAtLeastPerPeriod * averageOverNPeriods + + messageDeliveryThreshold = shouldSendAtLeastOverNPeriod + messageDeliveryWeight = computeMessageDeliveriesWeight(messageDeliveryThreshold, 30.0) + messageDeliveryDecay = + computeDecay( + startValue = expectedMessagesPerPeriod.float * averageOverNPeriods, + endValue = 0, + timeToEndValue = period * averageOverNPeriods.int, + heartbeatPeriod) + + # Invalid message should be remembered a long time + invalidMessageDecay = computeDecay( + startValue = 1, + endValue = 0.1, + timeToEndValue = chronos.minutes(1), + heartbeatPeriod) + + let topicParams = TopicParams( + topicWeight: topicWeight, + timeInMeshQuantum: timeInMeshQuantum, + timeInMeshCap: 200, # 20 points after timeInMeshQuantum * 200 + timeInMeshWeight: 0.1, # timeInMesh should be less powerful than inactive penalties + firstMessageDeliveriesCap: firstMessageCap, + firstMessageDeliveriesDecay: firstMessageDecay, + firstMessageDeliveriesWeight: 80.0 / firstMessageCap, # Max points: 80 + meshMessageDeliveriesWeight: messageDeliveryWeight, + meshMessageDeliveriesDecay: messageDeliveryDecay, + meshMessageDeliveriesThreshold: messageDeliveryThreshold, + meshMessageDeliveriesCap: expectedMessagesPerPeriod.float * averageOverNPeriods, + meshMessageDeliveriesActivation: period * averageOverNPeriods.int, + meshMessageDeliveriesWindow: chronos.milliseconds(10), + meshFailurePenaltyWeight: messageDeliveryWeight, + meshFailurePenaltyDecay: messageDeliveryDecay, + invalidMessageDeliveriesWeight: -1, # 10 invalid messages = -100 points + invalidMessageDeliveriesDecay: invalidMessageDecay + ) + topicParams diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 88d23fc9d4..82f6554f03 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -18,7 +18,6 @@ import eth/p2p/discoveryv5/[enr, random2], eth/keys, ./consensus_object_pools/vanity_logs/pandas, - ./networking/topic_params, ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[engine_authentication, weak_subjectivity], @@ -873,18 +872,18 @@ proc updateBlocksGossipStatus*( for gossipFork in newGossipForks: let forkDigest = node.dag.forkDigests[].atStateFork(gossipFork) node.network.subscribe( - getBeaconBlocksTopic(forkDigest), blocksTopicParams, + getBeaconBlocksTopic(forkDigest), enableTopicMetrics = true) node.blocksGossipState = targetGossipState proc addPhase0MessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = - node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams) - node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams) + node.network.subscribe(getAttesterSlashingsTopic(forkDigest)) + node.network.subscribe(getProposerSlashingsTopic(forkDigest)) + node.network.subscribe(getVoluntaryExitsTopic(forkDigest)) node.network.subscribe( - getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, + getAggregateAndProofsTopic(forkDigest), enableTopicMetrics = true) # updateAttestationSubnetHandlers subscribes attestation subnets @@ -929,10 +928,10 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl for subcommitteeIdx in SyncSubcommitteeIndex: if currentSyncCommitteeSubnets[subcommitteeIdx]: node.network.subscribe( - getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams) + getSyncCommitteeTopic(forkDigest, subcommitteeIdx)) node.network.subscribe( - getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams) + getSyncCommitteeContributionAndProofTopic(forkDigest)) node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) @@ -982,7 +981,7 @@ proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) = if oldSyncSubnets[subcommitteeIdx]: node.network.unsubscribe(topic) elif newSyncSubnets[subcommitteeIdx]: - node.network.subscribe(topic, basicParams) + node.network.subscribe(topic) node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) @@ -1034,7 +1033,7 @@ proc trackNextSyncCommitteeTopics(node: BeaconNode, slot: Slot) = node.syncCommitteeMsgPool[].isEpochLeadTime(epochsToSyncPeriod.get): for gossipFork in node.gossipState: node.network.subscribe(getSyncCommitteeTopic( - forkDigests[gossipFork], subcommitteeIdx), basicParams) + forkDigests[gossipFork], subcommitteeIdx)) newSubcommittees.setBit(distinctBase(subcommitteeIdx)) debug "trackNextSyncCommitteeTopics: subscribing to sync committee subnets",