Skip to content

Commit

Permalink
remove libp2p (#4073)
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Jun 14, 2024
1 parent 4bea988 commit ac15434
Show file tree
Hide file tree
Showing 80 changed files with 479 additions and 3,764 deletions.
2 changes: 1 addition & 1 deletion DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ ifps job submit -f process.py -r requirements.txt -c QmbWqxBEKC3P8tqsKc98xmWNzrz

## Components to Build

- Build an application that listens for jobs over libp2p, receives payment somehow, runs the job in {kubernetes, docker, idk}, and returns the result to the use (ideally the 'result' is in the form of an ipfs object and we can just return the hash).
- Build an application that listens for jobs over NATS, receives payment somehow, runs the job in {kubernetes, docker, idk}, and returns the result to the use (ideally the 'result' is in the form of an ipfs object and we can just return the hash).
- The inputs to the job should be a 'program' and a CID. The node should pull the CID requested into a car file (it should already be in this format for sectors that they have sealed) and pass that to the docker image (probably mounted somewhere to the image).
- This should run as a sidecar to lotus nodes, and should be fairly isolate so as not to mess with the node's primary operation.
- Need a payment system, payment estimator
Expand Down
1 change: 0 additions & 1 deletion cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func newDevStackOptions() *devstack.DevStackOptions {
Peer: "",
CPUProfilingFile: "",
MemoryProfilingFile: "",
NodeInfoPublisherInterval: node.TestNodeInfoPublishConfig,
ConfigurationRepo: "",
}
}
Expand Down
183 changes: 9 additions & 174 deletions cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ import (
"net"
"net/url"
"os"
"sort"
"strings"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand All @@ -23,14 +19,10 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/lib/crypto"
bac_libp2p "github.com/bacalhau-project/bacalhau/pkg/libp2p"
"github.com/bacalhau-project/bacalhau/pkg/libp2p/rcmgr"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node"
"github.com/bacalhau-project/bacalhau/pkg/repo"
"github.com/bacalhau-project/bacalhau/pkg/setup"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/util/closer"
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
"github.com/bacalhau-project/bacalhau/webui"
Expand Down Expand Up @@ -62,45 +54,13 @@ var (
`))
)

func GetPeers(cfg types.BacalhauConfig) ([]multiaddr.Multiaddr, error) {
var (
peersStrings []string
)
// TODO(forrest): [ux] this is a really confusing way to configure bootstrap peers.
// The convenience is nice by passing a single 'env' value, and can be improved with sane defaults commented
// out in the config. If a user wants to connect then can pass the --peer flag or uncomment the config values.
if cfg.Node.Libp2p.PeerConnect == DefaultPeerConnect || cfg.Node.Libp2p.PeerConnect == "" {
return nil, nil
} else if cfg.Node.Libp2p.PeerConnect == "env" {
// TODO(forrest): [ux/sanity] in the future default to the value in the config file and remove system environment
peersStrings = system.Envs[system.GetEnvironment()].BootstrapAddresses
} else if cfg.Node.Libp2p.PeerConnect == "config" {
// TODO(forrest): [ux] if the user explicitly passes the peer flag with value `config` read the
// bootstrap peer list from their config file.
return parseBootstrapPeers(cfg.Node.BootstrapAddresses)
} else {
peersStrings = strings.Split(cfg.Node.Libp2p.PeerConnect, ",")
}

peers := make([]multiaddr.Multiaddr, 0, len(peersStrings))
for _, peer := range peersStrings {
parsed, err := multiaddr.NewMultiaddr(peer)
if err != nil {
return nil, err
}
peers = append(peers, parsed)
}
return peers, nil
}

func NewCmd() *cobra.Command {
serveFlags := map[string][]configflags.Definition{
"local_publisher": configflags.LocalPublisherFlags,
"publishing": configflags.PublishingFlags,
"requester-tls": configflags.RequesterTLSFlags,
"server-api": configflags.ServerAPIFlags,
"network": configflags.NetworkFlags,
"libp2p": configflags.Libp2pFlags,
"ipfs": configflags.IPFSFlags,
"capacity": configflags.CapacityFlags,
"job-timeouts": configflags.ComputeTimeoutFlags,
Expand Down Expand Up @@ -159,27 +119,14 @@ func serve(cmd *cobra.Command, cfg types.BacalhauConfig, fsRepo *repo.FsRepo) er
cm := util.GetCleanupManager(ctx)

var nodeName string
var libp2pHost host.Host
var libp2pPeers []string
var err error

// if the transport type is libp2p, we use the peerID as the node name
// even if the user provided one to avoid issues with peer lookups
if cfg.Node.Network.Type == models.NetworkTypeLibp2p {
libp2pHost, libp2pPeers, err = setupLibp2p(cfg)
if cfg.Node.Name == "" {
nodeName, err = getNodeID(ctx, cfg.Node.NameProvider)
if err != nil {
return err
}
nodeName = libp2pHost.ID().String()
cfg.Node.Name = nodeName
} else {
if cfg.Node.Name == "" {
nodeName, err = getNodeID(ctx, cfg.Node.NameProvider)
if err != nil {
return err
}
cfg.Node.Name = nodeName
}
}
ctx = logger.ContextWithNodeIDLogger(ctx, nodeName)

Expand All @@ -194,11 +141,6 @@ func serve(cmd *cobra.Command, cfg types.BacalhauConfig, fsRepo *repo.FsRepo) er
return err
}

if networkConfig.Type == models.NetworkTypeLibp2p {
networkConfig.Libp2pHost = libp2pHost
networkConfig.ClusterPeers = libp2pPeers
}

computeConfig, err := GetComputeConfig(ctx, cfg.Node, isComputeNode)
if err != nil {
return errors.Wrapf(err, "failed to configure compute node")
Expand Down Expand Up @@ -228,7 +170,6 @@ func serve(cmd *cobra.Command, cfg types.BacalhauConfig, fsRepo *repo.FsRepo) er
RequesterSelfSign: cfg.Node.ServerAPI.TLS.SelfSigned,
Labels: cfg.Node.Labels,
AllowListedLocalPaths: cfg.Node.AllowListedLocalPaths,
NodeInfoStoreTTL: time.Duration(cfg.Node.NodeInfoStoreTTL),
NetworkConfig: networkConfig,
}
if isRequesterNode {
Expand Down Expand Up @@ -311,33 +252,6 @@ func serve(cmd *cobra.Command, cfg types.BacalhauConfig, fsRepo *repo.FsRepo) er
return nil
}

func setupLibp2p(cfg types.BacalhauConfig) (libp2pHost host.Host, peers []string, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to setup libp2p node. %w", err)
}
}()
privKey, err := loadLibp2pPrivKey(cfg.User.Libp2pKeyPath)
if err != nil {
return
}

libp2pHost, err = bac_libp2p.NewHost(cfg.Node.Libp2p.SwarmPort, privKey, rcmgr.DefaultResourceManager)
if err != nil {
return
}

peersAddrs, err := GetPeers(cfg)
if err != nil {
return
}
peers = make([]string, len(peersAddrs))
for i, p := range peersAddrs {
peers[i] = p.String()
}
return
}

func buildConnectCommand(ctx context.Context, nodeConfig *node.NodeConfig) (string, error) {
headerB := strings.Builder{}
cmdB := strings.Builder{}
Expand All @@ -348,37 +262,12 @@ func buildConnectCommand(ctx context.Context, nodeConfig *node.NodeConfig) (stri
cmdB.WriteString(fmt.Sprintf("%s=compute ",
configflags.FlagNameForKey(types.NodeType, configflags.NodeTypeFlags...)))

advertisedAddr := getPublicNATSOrchestratorURL(nodeConfig)
headerB.WriteString("To connect a compute node to this orchestrator, run the following command in your shell:\n")
cmdB.WriteString(fmt.Sprintf("%s=%s ",
configflags.FlagNameForKey(types.NodeNetworkType, configflags.NetworkFlags...),
nodeConfig.NetworkConfig.Type))

switch nodeConfig.NetworkConfig.Type {
case models.NetworkTypeNATS:
advertisedAddr := getPublicNATSOrchestratorURL(nodeConfig)

headerB.WriteString("To connect a compute node to this orchestrator, run the following command in your shell:\n")
cmdB.WriteString(fmt.Sprintf("%s=%s ",
configflags.FlagNameForKey(types.NodeNetworkOrchestrators, configflags.NetworkFlags...),
advertisedAddr.String(),
))

case models.NetworkTypeLibp2p:
headerB.WriteString("To connect another node to this one, run the following command in your shell:\n")

p2pAddr, err := multiaddr.NewMultiaddr("/p2p/" + nodeConfig.NetworkConfig.Libp2pHost.ID().String())
if err != nil {
return "", err
}
peerAddress := pickP2pAddress(nodeConfig.NetworkConfig.Libp2pHost.Addrs()).Encapsulate(p2pAddr).String()
cmdB.WriteString(fmt.Sprintf("%s=%s ",
configflags.FlagNameForKey(types.NodeLibp2pPeerConnect, configflags.Libp2pFlags...),
peerAddress,
))
}
} else {
if nodeConfig.NetworkConfig.Type == models.NetworkTypeLibp2p {
headerB.WriteString("Make sure there's at least one requester node in your network.")
}
configflags.FlagNameForKey(types.NodeNetworkOrchestrators, configflags.NetworkFlags...),
advertisedAddr.String(),
))
}

return headerB.String() + cmdB.String(), nil
Expand All @@ -405,29 +294,9 @@ func buildEnvVariables(
if nodeConfig.IsRequesterNode {
envVarBuilder.WriteString(fmt.Sprintf(
"export %s=%s\n",
config.KeyAsEnvVar(types.NodeNetworkType), nodeConfig.NetworkConfig.Type,
config.KeyAsEnvVar(types.NodeNetworkOrchestrators),
getPublicNATSOrchestratorURL(nodeConfig).String(),
))

switch nodeConfig.NetworkConfig.Type {
case models.NetworkTypeNATS:
envVarBuilder.WriteString(fmt.Sprintf(
"export %s=%s\n",
config.KeyAsEnvVar(types.NodeNetworkOrchestrators),
getPublicNATSOrchestratorURL(nodeConfig).String(),
))
case models.NetworkTypeLibp2p:
p2pAddr, err := multiaddr.NewMultiaddr("/p2p/" + nodeConfig.NetworkConfig.Libp2pHost.ID().String())
if err != nil {
return "", err
}
peerAddress := pickP2pAddress(nodeConfig.NetworkConfig.Libp2pHost.Addrs()).Encapsulate(p2pAddr).String()

envVarBuilder.WriteString(fmt.Sprintf(
"export %s=%s\n",
config.KeyAsEnvVar(types.NodeLibp2pPeerConnect),
peerAddress,
))
}
}

return envVarBuilder.String(), nil
Expand All @@ -446,40 +315,6 @@ func getPublicNATSOrchestratorURL(nodeConfig *node.NodeConfig) *url.URL {
return orchestrator
}

// pickP2pAddress will aim to select a non-localhost IPv4 TCP address, or at least a non-localhost IPv6 one, from a list
// of addresses.
func pickP2pAddress(addresses []multiaddr.Multiaddr) multiaddr.Multiaddr {
value := func(m multiaddr.Multiaddr) int {
count := 0
if _, err := m.ValueForProtocol(multiaddr.P_TCP); err == nil {
count++
}
if ip, err := m.ValueForProtocol(multiaddr.P_IP4); err == nil {
count++
if ip != "127.0.0.1" {
count++
}
} else if ip, err := m.ValueForProtocol(multiaddr.P_IP6); err == nil && ip != "::1" {
count++
}
return count
}

preferredAddress := config.PreferredAddress()
if preferredAddress != "" {
for _, addr := range addresses {
if strings.Contains(addr.String(), preferredAddress) {
return addr
}
}
}

sort.Slice(addresses, func(i, j int) bool {
return value(addresses[i]) > value(addresses[j])
})

return addresses[0]
}
func GetTLSCertificate(ctx context.Context, cfg types.BacalhauConfig, nodeConfig *node.NodeConfig) (string, string, error) {
cert := cfg.Node.ServerAPI.TLS.ServerCertificate
key := cfg.Node.ServerAPI.TLS.ServerKey
Expand Down
55 changes: 0 additions & 55 deletions cmd/cli/serve/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stretchr/testify/suite"
"golang.org/x/sync/errgroup"

"github.com/bacalhau-project/bacalhau/pkg/config/configenv"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/client"
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
apitest "github.com/bacalhau-project/bacalhau/pkg/publicapi/test"
Expand All @@ -25,7 +24,6 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/lib/network"

cmd2 "github.com/bacalhau-project/bacalhau/cmd/cli"
"github.com/bacalhau-project/bacalhau/cmd/cli/serve"
cfgtypes "github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/model"
Expand Down Expand Up @@ -215,59 +213,6 @@ func (s *ServeSuite) TestCanSubmitJob() {
s.NoError(err)
}

func (s *ServeSuite) TestGetPeers() {
cfg := configenv.Testing
// by default it should return no peers
peers, err := serve.GetPeers(cfg)
s.NoError(err)
s.Require().Equal(0, len(peers))

// if we set the peer connect to "env" it should return the peers from the env
for envName, envData := range system.Envs {
// skip checking environments other than test, because
// system.GetEnvironment() in getPeers() always returns "test" while testing
if envName.String() != "test" {
continue
}

// this is required for the below line to succeed as environment is being deprecated.
cfg.Node.Libp2p.PeerConnect = "env"
peers, err = serve.GetPeers(cfg)
s.NoError(err)
s.Require().NotEmpty(peers, "getPeers() returned an empty slice")
// search each peer in env BootstrapAddresses
for _, peer := range peers {
found := false
for _, envPeer := range envData.BootstrapAddresses {
if peer.String() == envPeer {
found = true
break
}
}
s.Require().True(found, "Peer %s not found in env %s", peer, envName)
}
}

// if we pass multiaddresses it should just return them
inputPeers := []string{
"/ip4/0.0.0.0/tcp/1235/p2p/QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVz",
"/ip4/0.0.0.0/tcp/1235/p2p/QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcz",
}
peerConnect := strings.Join(inputPeers, ",")
cfg.Node.Libp2p.PeerConnect = peerConnect
peers, err = serve.GetPeers(cfg)
s.NoError(err)
s.Require().Equal(inputPeers[0], peers[0].String())
s.Require().Equal(inputPeers[1], peers[1].String())

// if we pass invalid multiaddress it should error out
inputPeers = []string{"foo"}
peerConnect = strings.Join(inputPeers, ",")
cfg.Node.Libp2p.PeerConnect = peerConnect
_, err = serve.GetPeers(cfg)
s.Require().Error(err)
}

func (s *ServeSuite) TestSelfSignedRequester() {
s.protocol = "https"
_, err := s.serve("--node-type", "requester", "--self-signed")
Expand Down
Loading

0 comments on commit ac15434

Please sign in to comment.