Skip to content

Commit

Permalink
wip: prioritization
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Mar 26, 2024
1 parent ced4610 commit 6f478c3
Showing 1 changed file with 55 additions and 5 deletions.
60 changes: 55 additions & 5 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/boxo/fetcher"
pin "github.com/ipfs/boxo/pinning/pinner"
provider "github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
"go.uber.org/fx"
Expand Down Expand Up @@ -129,11 +130,11 @@ func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, repro
var keyProvider fx.Option
switch reprovideStrategy {
case "all", "":
keyProvider = fx.Provide(provider.NewBlockstoreProvider)
keyProvider = fx.Provide(newProvidingStrategy(false, false))
case "roots":
keyProvider = fx.Provide(pinnedProviderStrategy(true))
keyProvider = fx.Provide(newProvidingStrategy(true, true))
case "pinned":
keyProvider = fx.Provide(pinnedProviderStrategy(false))
keyProvider = fx.Provide(newProvidingStrategy(true, false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy %q", reprovideStrategy))
}
Expand All @@ -149,13 +150,62 @@ func OfflineProviders() fx.Option {
return fx.Provide(provider.NewNoopProvider)
}

func pinnedProviderStrategy(onlyRoots bool) interface{} {
func newProvidingStrategy(onlyPinned, onlyRoots bool) interface{} {
type input struct {
fx.In
Pinner pin.Pinner
Blockstore blockstore.Blockstore
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
}
return func(in input) provider.KeyChanFunc {
return provider.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher)
if onlyRoots {
return provider.NewPinnedProvider(true, in.Pinner, in.IPLDFetcher)
}

var later provider.KeyChanFunc
if onlyPinned {
later = provider.NewPinnedProvider(false, in.Pinner, in.IPLDFetcher)
} else {
later = provider.NewBlockstoreProvider(in.Blockstore)
}

fmt.Println(onlyPinned, onlyRoots)

roots := provider.NewPinnedProvider(true, in.Pinner, in.IPLDFetcher)
return newPrioritizedStrategy(roots, later)
}
}

func newPrioritizedStrategy(streams ...provider.KeyChanFunc) provider.KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
outCh := make(chan cid.Cid)

go func() {
defer close(outCh)

// TODO: initialize map/bloom filter

for _, stream := range streams {
ch, err := stream(ctx)
if err != nil {
// TODO: what to do
} else {
select {
case <-ctx.Done():
return
case c := <-ch:
// TODO: check in map/bloom filter: already reprovided?

select {
case <-ctx.Done():
return
case outCh <- c:
}
}
}
}
}()

return outCh, nil
}
}

0 comments on commit 6f478c3

Please sign in to comment.