Skip to content

Commit

Permalink
Adds a 'local' publisher (#3431)
Browse files Browse the repository at this point in the history
Adds a new publisher type that allows for publishing to the local disk
of the compute node. This allows users to test out the publisher
functionality without needing to set up a remote storage service making
it easier to get started with Bacalhau. Currently this implemention is
does not include features such as content lifetimes, or secure access,
and so is not recommended for production use yet. This publisher is
useful for testing and development purposes.

The local publisher comprises two parts. The publisher is responsible
for taking the output from a job, compressing it and moving it to a
location where it can be access by the second part. This part is the
HTTP server which delivers the content back to the caller.

By default the HTTP server listens on port 6001, but this can be changed
by setting the `--local-publisher-port` flag on the compute node. The
HTTP server will serve the content from the directory specified by the
`local-publisher-directory` flag. If this is not set, the HTTP server
will serve the content from a subdirectory of the configured bacalhau
storage directory. Finally the `--local-publisher-address` flag can be
used to specify the address that the HTTP server listens on. The default
for this varies by environment, in test and development environments it
uses `localhost`, in production environments it uses `public` to obtain
a public address. Of course, you can set these values in config in
preference to using the command line flags if the defaults are not
suitable.
  • Loading branch information
rossjones committed Feb 15, 2024
1 parent 18e92eb commit 6f00c8a
Show file tree
Hide file tree
Showing 33 changed files with 672 additions and 43 deletions.
47 changes: 47 additions & 0 deletions cmd/cli/docker/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,50 @@ func (s *DockerRunSuite) TestRun_Timeout_DefinedValue() {

s.Require().Equal(expectedTimeout, j.Spec.GetTimeout())
}

func (s *DockerRunSuite) TestRun_NoPublisher() {
ctx := context.Background()

_, out, err := s.ExecuteTestCobraCommand("docker", "run", "ubuntu", "echo", "'hello world'")
s.Require().NoError(err)

job := testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)
s.T().Log(job)

info, _, err := s.Client.Get(ctx, job.Metadata.ID)
s.Require().NoError(err)
s.T().Log(info)

s.Require().Len(info.State.Executions, 1)

exec := info.State.Executions[0]
result := exec.PublishedResult
s.Require().Equal("unknown", result.StorageSource.String())
s.Require().Empty(result.URL, "Did not expect a URL")
s.Require().Empty(result.CID, "Did not expect a CID")
s.Require().Empty(result.S3, "Did not expect S3 details")

}

func (s *DockerRunSuite) TestRun_LocalPublisher() {
ctx := context.Background()

_, out, err := s.ExecuteTestCobraCommand("docker", "run", "-p", "local", "ubuntu", "echo", "'hello world'")
s.Require().NoError(err)

job := testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)
s.T().Log(job)

info, _, err := s.Client.Get(ctx, job.Metadata.ID)
s.Require().NoError(err)
s.T().Log(info)

s.Require().Len(info.State.Executions, 1)

exec := info.State.Executions[0]
result := exec.PublishedResult
s.Require().Equal(model.StorageSourceURLDownload, result.StorageSource)
s.Require().Contains(result.URL, "http://127.0.0.1:", "URL does not contain expected prefix")
s.Require().Contains(result.URL, fmt.Sprintf("%s.tgz", exec.ID().ExecutionID), "URL does not contain expected file")

}
1 change: 1 addition & 0 deletions cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func GetPeers(peerConnect string) ([]multiaddr.Multiaddr, error) {

func NewCmd() *cobra.Command {
serveFlags := map[string][]configflags.Definition{
"local_publisher": configflags.LocalPublisherFlags,
"publishing": configflags.PublishingFlags,
"requester-tls": configflags.RequesterTLSFlags,
"server-api": configflags.ServerAPIFlags,
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func GetComputeConfig() (node.ComputeConfig, error) {
},
LogRunningExecutionsInterval: time.Duration(cfg.Logging.LogRunningExecutionsInterval),
LogStreamBufferSize: cfg.LogStreamConfig.ChannelBufferSize,
LocalPublisher: cfg.LocalPublisher,
})
}

Expand Down
51 changes: 51 additions & 0 deletions cmd/cli/wasm/wasm_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package wasm_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/suite"

cmdtesting "github.com/bacalhau-project/bacalhau/cmd/testing"
"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/pkg/model"
testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils"
)

Expand Down Expand Up @@ -42,3 +44,52 @@ func (s *WasmRunSuite) TestSpecifyingEnvVars() {

_ = testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)
}

func (s *WasmRunSuite) TestNoPublisher() {
ctx := context.Background()
_, out, err := s.ExecuteTestCobraCommand("wasm", "run",
"../../../testdata/wasm/env/main.wasm",
"-e A=B,C=D",
)
s.Require().NoError(err)

job := testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)
info, _, err := s.Client.Get(ctx, job.Metadata.ID)
s.Require().NoError(err)
s.T().Log(info)

s.Require().Len(info.State.Executions, 1)

exec := info.State.Executions[0]
result := exec.PublishedResult

s.Require().Equal("noop", job.Spec.PublisherSpec.Type.String(), "Expected a noop publisher")
s.Require().Empty(result.URL, "Did not expect a URL")
s.Require().Empty(result.S3, "Did not expect S3 details")
s.Require().Empty(result.CID, "Did not expect a CID")
}

func (s *WasmRunSuite) TestLocalPublisher() {
ctx := context.Background()
_, out, err := s.ExecuteTestCobraCommand("wasm", "run",
"-p", "local",
"../../../testdata/wasm/env/main.wasm",
"-e A=B,C=D",
)
s.Require().NoError(err)

job := testutils.GetJobFromTestOutputLegacy(ctx, s.T(), s.Client, out)
info, _, err := s.Client.Get(ctx, job.Metadata.ID)
s.Require().NoError(err)
s.T().Log(info)

s.Require().Equal(model.PublisherLocal, job.Spec.PublisherSpec.Type, "Expected a local publisher")

s.Require().Len(info.State.Executions, 1)

exec := info.State.Executions[0]
result := exec.PublishedResult
s.Require().Equal(model.StorageSourceURLDownload, result.StorageSource)
s.Require().Contains(result.URL, "http://127.0.0.1:", "URL does not contain expected prefix")
s.Require().Contains(result.URL, fmt.Sprintf("%s.tgz", exec.ID().ExecutionID), "URL does not contain expected file")
}
4 changes: 4 additions & 0 deletions cmd/testing/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bacalhau-project/bacalhau/cmd/cli"
"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/pkg/bidstrategy/semantic"
"github.com/bacalhau-project/bacalhau/pkg/config/types"
"github.com/bacalhau-project/bacalhau/pkg/devstack"
noop_executor "github.com/bacalhau-project/bacalhau/pkg/executor/noop"
"github.com/bacalhau-project/bacalhau/pkg/logger"
Expand Down Expand Up @@ -42,6 +43,9 @@ func (s *BaseSuite) SetupTest() {
JobSelectionPolicy: node.JobSelectionPolicy{
Locality: semantic.Anywhere,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "127.0.0.1",
},
})
s.Require().NoError(err)
ctx := context.Background()
Expand Down
24 changes: 24 additions & 0 deletions cmd/util/flags/configflags/local_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package configflags

import "github.com/bacalhau-project/bacalhau/pkg/config/types"

var LocalPublisherFlags = []Definition{
{
FlagName: "local-publisher-address",
DefaultValue: Default.Node.Compute.LocalPublisher.Address,
ConfigPath: types.NodeComputeLocalPublisherAddress,
Description: `The address for the local publisher's server to bind to`,
},
{
FlagName: "local-publisher-port",
DefaultValue: Default.Node.Compute.LocalPublisher.Port,
ConfigPath: types.NodeComputeLocalPublisherPort,
Description: `The port for the local publisher's server to bind to (default: 6001)`,
},
{
FlagName: "local-publisher-directory",
DefaultValue: Default.Node.Compute.LocalPublisher.Directory,
ConfigPath: types.NodeComputeLocalPublisherDirectory,
Description: `The directory where the local publisher will store content`,
},
}
43 changes: 43 additions & 0 deletions docs/docs/setting-up/other-specifications/publishers/local.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
sidebar_label: Local
---

# Local Publisher Specification

Bacalhau's Local Publisher provides a useful option for storing task results on the compute node, allowing for ease of access and retrieval for testing or trying our Bacalhau.

:::danger

The Local Publisher should not be used for Production use as it is not a reliable storage option. For production use, we recommend using a more reliable option such as an S3-compatible storage service.
:::

## Local Publisher Parameters
The local publisher requires no specific parameters to be defined in the publisher specification. The user only needs to indicate the publisher type as "local", and Bacalhau handles the rest. Here is an example of how to set up a Local Publisher in a job specification.

```yaml
Publisher:
Type: local
```
## Published Result Specification
Once the job is executed, the results are published to the local compute node, and stored as compressed tar file, which can be accessed and retrieved over HTTP from the command line using the `get` command. TAhis will download and extract the contents for the user from the remove compute node.

### Result Parameters
- URL `(string)`: This is the HTTP URL to the results of the computation, which is hosted on the compute node where it ran.
Here's a sample of how the published result might appear:

```yaml
PublishedResult:
Type: local
Params:
URL: "http://192.168.0.11:6001/e-c4b80d04-ff2b-49d6-9b99-d3a8e669a6bf.tgz"
```

In this example, the task results will be stored on the compute node, and can be referenced and retrieved using the specified URL.


## Caveats

- By default the compute node will attempt to use a public address for the HTTP server delivering task output, but there is no guarantee that the compute node is accessible on that address. If the compute node is behind a NAT or firewall, the user may need to manually specify the address to use for the HTTP server in the `config.yaml` file.
- There is no lifecycle management for the content stored on the compute node. The user is responsible for managing the content and ensuring that it is removed when no longer needed before the compute node runs out of disk space.
- If the address/port of the compute node changes, then previously stored content will no longer be accessible. The user will need to manually update the address in the `config.yaml` file and re-publish the content to make it accessible again.
4 changes: 4 additions & 0 deletions pkg/config/configenv/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ var DevelopmentComputeConfig = types.ComputeConfig{
LogStreamConfig: types.LogStreamConfig{
ChannelBufferSize: 10,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "127.0.0.1",
Port: 6001,
},
}

var DevelopmentRequesterConfig = types.RequesterConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/configenv/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ var LocalComputeConfig = types.ComputeConfig{
LogStreamConfig: types.LogStreamConfig{
ChannelBufferSize: 10,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "127.0.0.1",
Port: 6001,
},
}

var LocalRequesterConfig = types.RequesterConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/configenv/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ var ProductionComputeConfig = types.ComputeConfig{
LogStreamConfig: types.LogStreamConfig{
ChannelBufferSize: 10,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "public",
Port: 6001,
},
}

var ProductionRequesterConfig = types.RequesterConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/configenv/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ var StagingComputeConfig = types.ComputeConfig{
LogStreamConfig: types.LogStreamConfig{
ChannelBufferSize: 10,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "public",
Port: 6001,
},
}

var StagingRequesterConfig = types.RequesterConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/configenv/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ var TestingComputeConfig = types.ComputeConfig{
LogStreamConfig: types.LogStreamConfig{
ChannelBufferSize: 10,
},
LocalPublisher: types.LocalPublisherConfig{
Address: "private",
Port: 6001,
},
}

var TestingRequesterConfig = types.RequesterConfig{
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/types/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ComputeConfig struct {
Logging LoggingConfig `yaml:"Logging"`
ManifestCache DockerCacheConfig `yaml:"ManifestCache"`
LogStreamConfig LogStreamConfig `yaml:"LogStream"`
LocalPublisher LocalPublisherConfig `yaml:"LocalPublisher"`
}

type CapacityConfig struct {
Expand Down Expand Up @@ -55,3 +56,9 @@ type LogStreamConfig struct {
// How many messages to buffer in the log stream channel, per stream
ChannelBufferSize int `yaml:"ChannelBufferSize"`
}

type LocalPublisherConfig struct {
Address string `yaml:"Address"`
Port int `yaml:"Port"`
Directory string `yaml:"Directory"`
}
4 changes: 4 additions & 0 deletions pkg/config/types/generated_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ const NodeComputeManifestCacheDuration = "Node.Compute.ManifestCache.Duration"
const NodeComputeManifestCacheFrequency = "Node.Compute.ManifestCache.Frequency"
const NodeComputeLogStreamConfig = "Node.Compute.LogStreamConfig"
const NodeComputeLogStreamConfigChannelBufferSize = "Node.Compute.LogStreamConfig.ChannelBufferSize"
const NodeComputeLocalPublisher = "Node.Compute.LocalPublisher"
const NodeComputeLocalPublisherAddress = "Node.Compute.LocalPublisher.Address"
const NodeComputeLocalPublisherPort = "Node.Compute.LocalPublisher.Port"
const NodeComputeLocalPublisherDirectory = "Node.Compute.LocalPublisher.Directory"
const NodeRequester = "Node.Requester"
const NodeRequesterJobDefaults = "Node.Requester.JobDefaults"
const NodeRequesterJobDefaultsExecutionTimeout = "Node.Requester.JobDefaults.ExecutionTimeout"
Expand Down
9 changes: 8 additions & 1 deletion pkg/config/types/generated_viper_defaults.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// CODE GENERATED BY pkg/config/types/gen_viper DO NOT EDIT

package types
Expand Down Expand Up @@ -110,6 +109,10 @@ func SetDefaults(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.SetDefault(NodeComputeManifestCacheFrequency, cfg.Node.Compute.ManifestCache.Frequency.AsTimeDuration())
p.Viper.SetDefault(NodeComputeLogStreamConfig, cfg.Node.Compute.LogStreamConfig)
p.Viper.SetDefault(NodeComputeLogStreamConfigChannelBufferSize, cfg.Node.Compute.LogStreamConfig.ChannelBufferSize)
p.Viper.SetDefault(NodeComputeLocalPublisher, cfg.Node.Compute.LocalPublisher)
p.Viper.SetDefault(NodeComputeLocalPublisherAddress, cfg.Node.Compute.LocalPublisher.Address)
p.Viper.SetDefault(NodeComputeLocalPublisherPort, cfg.Node.Compute.LocalPublisher.Port)
p.Viper.SetDefault(NodeComputeLocalPublisherDirectory, cfg.Node.Compute.LocalPublisher.Directory)
p.Viper.SetDefault(NodeRequester, cfg.Node.Requester)
p.Viper.SetDefault(NodeRequesterJobDefaults, cfg.Node.Requester.JobDefaults)
p.Viper.SetDefault(NodeRequesterJobDefaultsExecutionTimeout, cfg.Node.Requester.JobDefaults.ExecutionTimeout.AsTimeDuration())
Expand Down Expand Up @@ -289,6 +292,10 @@ func Set(cfg BacalhauConfig, opts ...SetOption) {
p.Viper.Set(NodeComputeManifestCacheFrequency, cfg.Node.Compute.ManifestCache.Frequency.AsTimeDuration())
p.Viper.Set(NodeComputeLogStreamConfig, cfg.Node.Compute.LogStreamConfig)
p.Viper.Set(NodeComputeLogStreamConfigChannelBufferSize, cfg.Node.Compute.LogStreamConfig.ChannelBufferSize)
p.Viper.Set(NodeComputeLocalPublisher, cfg.Node.Compute.LocalPublisher)
p.Viper.Set(NodeComputeLocalPublisherAddress, cfg.Node.Compute.LocalPublisher.Address)
p.Viper.Set(NodeComputeLocalPublisherPort, cfg.Node.Compute.LocalPublisher.Port)
p.Viper.Set(NodeComputeLocalPublisherDirectory, cfg.Node.Compute.LocalPublisher.Directory)
p.Viper.Set(NodeRequester, cfg.Node.Requester)
p.Viper.Set(NodeRequesterJobDefaults, cfg.Node.Requester.JobDefaults)
p.Viper.Set(NodeRequesterJobDefaultsExecutionTimeout, cfg.Node.Requester.JobDefaults.ExecutionTimeout.AsTimeDuration())
Expand Down
27 changes: 19 additions & 8 deletions pkg/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

const (
DefaultLibp2pKeySize = 2048
portsPerNode = 3 // 1 for libp2p, 1 for IPFS, 1 for local publisher
)

type DevStackOptions struct {
Expand Down Expand Up @@ -117,6 +118,15 @@ func Setup(
stackConfig.NetworkType = networkType
}

// We will pre-allocate the potential maximum number of free ports we will need during setup
// to ensure that we can allocate them well before use and avoid any potential port clashes.
// Before this change it was possible to get the same port back from freeport.GetFreePort()
// if a previously allocated one was not used immediately
freePorts, err := freeport.GetFreePorts(totalNodeCount * portsPerNode)
if err != nil {
return nil, fmt.Errorf("failed to get free ports: %w", err)
}

for i := 0; i < totalNodeCount; i++ {
nodeID := fmt.Sprintf("node-%d", i)
ctx = logger.ContextWithNodeIDLogger(ctx, nodeID)
Expand Down Expand Up @@ -154,10 +164,7 @@ func Setup(
const startSwarmPort = 4222 // 4222 is the default NATS port
swarmPort = startSwarmPort + i
} else {
swarmPort, err = freeport.GetFreePort()
if err != nil {
return nil, err
}
swarmPort, freePorts = freePorts[0], freePorts[1:]
}
clusterConfig := node.NetworkConfig{
Type: stackConfig.NetworkType,
Expand All @@ -172,10 +179,7 @@ func Setup(
const startClusterPort = 6222
clusterPort = startClusterPort + i
} else {
clusterPort, err = freeport.GetFreePort()
if err != nil {
return nil, err
}
clusterPort, freePorts = freePorts[0], freePorts[1:]
}

if isRequesterNode {
Expand Down Expand Up @@ -239,6 +243,13 @@ func Setup(
nodeInfoPublisherInterval = node.TestNodeInfoPublishConfig
}

if isComputeNode {
// We have multiple process on the same machine, all wanting to listen on a HTTP port
// and so we will give each compute node a random open port to listen on.
stackConfig.ComputeConfig.LocalPublisher.Port, freePorts = freePorts[0], freePorts[1:]
stackConfig.ComputeConfig.LocalPublisher.Address = "127.0.0.1" //nolint:gomnd
}

nodeConfig := node.NodeConfig{
NodeID: nodeID,
IPFSClient: ipfsNode.Client(),
Expand Down
Loading

0 comments on commit 6f00c8a

Please sign in to comment.