Skip to content

Commit

Permalink
nightmare conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Apr 6, 2023
1 parent f083cb3 commit 10fbf09
Show file tree
Hide file tree
Showing 31 changed files with 1,409 additions and 246 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Install](#install)
- [Usage](#usage)
- [Contribute](#contribute)
- [Maintainers](#maintainers)
- [License](#license)

## Install
Expand All @@ -30,7 +31,14 @@ Contributions welcome. Please check out [the issues](https://github.com/libp2p/g

Check out our [contributing document](https://github.com/libp2p/community/blob/master/CONTRIBUTE.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to libp2p are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).

Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
<!-- Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. -->

## Maintainers

- [@ipfs/kubo-maintainers](https://github.com/orgs/ipfs/teams/kubo-maintainers)
- [@libp2p/go-libp2p-maintainers](https://github.com/orgs/libp2p/teams/go-libp2p-maintainers)
- [@guillaumemichel](https://github.com/guillaumemichel)


## License

Expand Down
55 changes: 33 additions & 22 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

var logger = logging.Logger("dht-crawler")

// Crawler connects to hosts in the DHT to track routing tables of peers.
type Crawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
}
var (
logger = logging.Logger("dht-crawler")

_ Crawler = (*DefaultCrawler)(nil)
)

type (
// Crawler connects to hosts in the DHT to track routing tables of peers.
Crawler interface {
// Run crawls the DHT starting from the startingPeers, and calls either handleSuccess or handleFail depending on whether a peer was successfully contacted or not.
Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail)
}
// DefaultCrawler provides a default implementation of Crawler.
DefaultCrawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
dialAddressExtendDur time.Duration
}
)

// New creates a new Crawler
func New(host host.Host, opts ...Option) (*Crawler, error) {
// NewDefaultCrawler creates a new DefaultCrawler
func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
Expand All @@ -45,11 +57,12 @@ func New(host host.Host, opts ...Option) (*Crawler, error) {
return nil, err
}

return &Crawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
return &DefaultCrawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
dialAddressExtendDur: o.dialAddressExtendDur,
}, nil
}

Expand Down Expand Up @@ -120,10 +133,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

const dialAddressExtendDur time.Duration = time.Minute * 30

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)

Expand Down Expand Up @@ -151,7 +162,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, c.dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
Expand Down Expand Up @@ -183,7 +194,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur)
c.host.Peerstore().AddAddrs(p, ai.Addrs, c.dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
Expand Down Expand Up @@ -212,7 +223,7 @@ type queryResult struct {
err error
}

func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
func (c *DefaultCrawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
Expand Down
20 changes: 16 additions & 4 deletions crawler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
type Option func(*options) error

type options struct {
protocols []protocol.ID
parallelism int
connectTimeout time.Duration
perMsgTimeout time.Duration
protocols []protocol.ID
parallelism int
connectTimeout time.Duration
perMsgTimeout time.Duration
dialAddressExtendDur time.Duration
}

// defaults are the default crawler options. This option will be automatically
Expand All @@ -23,6 +24,7 @@ var defaults = func(o *options) error {
o.parallelism = 1000
o.connectTimeout = time.Second * 5
o.perMsgTimeout = time.Second * 5
o.dialAddressExtendDur = time.Minute * 30

return nil
}
Expand Down Expand Up @@ -58,3 +60,13 @@ func WithConnectTimeout(timeout time.Duration) Option {
return nil
}
}

// WithDialAddrExtendDuration sets the duration by which the TTL of dialed address in peer store are
// extended.
// Defaults to 30 minutes if unset.
func WithDialAddrExtendDuration(ext time.Duration) Option {
return func(o *options) error {
o.dialAddressExtendDur = ext
return nil
}
}
44 changes: 38 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
Expand Down Expand Up @@ -148,6 +151,13 @@ type IpfsDHT struct {

rtFreezeTimeout time.Duration

// network size estimator
nsEstimator *netsize.Estimator
enableOptProv bool

// a bound channel to limit asynchronicity of in-flight ADD_PROVIDER RPCs
optProvJobsPool chan struct{}

// configuration variables for tests
testAddressUpdateProcessing bool
}
Expand Down Expand Up @@ -237,7 +247,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p)
dht.peerFound(ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -300,6 +310,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
optProvJobsPool: nil,
}

var maxLastSuccessfulOutboundThreshold time.Duration
Expand Down Expand Up @@ -328,6 +341,13 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout
dht.recentlyCheckedPeers = make(map[peer.ID]time.Time)

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

if dht.enableOptProv {
dht.optProvJobsPool = make(chan struct{}, cfg.OptimisticProvideJobsPoolSize)
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
Expand Down Expand Up @@ -672,10 +692,10 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
}

// verify whether the remote peer advertises the right dht protocol
b, err := dht.validRTPeer(p)
valid, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
} else if valid {
if dht.peerRecentlyQueried(p) {
// peer was already queried recently and didn't make it to the bucket
return
Expand Down Expand Up @@ -714,7 +734,7 @@ func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *IpfsDHT) peerStoppedDHT(p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand All @@ -729,7 +749,10 @@ func (dht *IpfsDHT) fixRTIfNeeded() {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
_, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand Down Expand Up @@ -878,16 +901,25 @@ func (dht *IpfsDHT) Host() host.Host {

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.Ping", trace.WithAttributes(attribute.Stringer("PeerID", p)))
defer span.End()
return dht.protoMessenger.Ping(ctx, p)
}

// NetworkSize returns the most recent estimation of the DHT network size.
// EXPERIMENTAL: We do not provide any guarantees that this method will
// continue to exist in the codebase. Use it at your own risk.
func (dht *IpfsDHT) NetworkSize() (int32, error) {
return dht.nsEstimator.NetworkSize()
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
tag.Upsert(metrics.KeyPeerID, dht.self.String()),
tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
)
ctx, _ = tag.New(
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
}

// a peer has queried us, let's add it to RT
go dht.peerFound(dht.ctx, mPeer)
dht.peerFound(ctx, mPeer)

if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
28 changes: 28 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,31 @@ func forceAddressUpdateProcessing(t *testing.T) Option {
return nil
}
}

// EnableOptimisticProvide enables an optimization that skips the last hops of the provide process.
// This works by using the network size estimator (which uses the keyspace density of queries)
// to optimistically send ADD_PROVIDER requests when we most likely have found the last hop.
// It will also run some ADD_PROVIDER requests asynchronously in the background after returning,
// this allows to optimistically return earlier if some threshold number of RPCs have succeeded.
// The number of background/in-flight queries can be configured with the OptimisticProvideJobsPoolSize
// option.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func EnableOptimisticProvide() Option {
return func(c *dhtcfg.Config) error {
c.EnableOptimisticProvide = true
return nil
}
}

// OptimisticProvideJobsPoolSize allows to configure the asynchronicity limit for in-flight ADD_PROVIDER RPCs.
// It makes sense to set it to a multiple of optProvReturnRatio * BucketSize. Check the description of
// EnableOptimisticProvide for more details.
//
// EXPERIMENTAL: This is an experimental option and might be removed in the future. Use at your own risk.
func OptimisticProvideJobsPoolSize(size int) Option {
return func(c *dhtcfg.Config) error {
c.OptimisticProvideJobsPoolSize = size
return nil
}
}
2 changes: 1 addition & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
Expand Down
2 changes: 1 addition & 1 deletion dual/dual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"testing"
"time"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
dht "github.com/libp2p/go-libp2p-kad-dht"
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
record "github.com/libp2p/go-libp2p-record"
Expand Down
Loading

0 comments on commit 10fbf09

Please sign in to comment.