Skip to content

Commit

Permalink
persist store paths, and migrate repo to v3 (#3429)
Browse files Browse the repository at this point in the history
### Summary
This PR bumps the repo version from v2 to v3, where we introduce the
following changes:
1. Change the execution store and job store default paths to no longer
include the node name in the path. To clarify, v2 paths look like
`~/.bacalhau/QmUBgU7xHKK44RuTHgrvnJfoSdZJS4fddT197iyTF5qjEV-compute/executions.db`
and `QmUBgU7xHKK44RuTHgrvnJfoSdZJS4fddT197iyTF5qjEV-requester/jobs.db`,
whereas in v3 they are `~/.baclahau/compute_store/executions.db` and
`~/.bacalhau/orchestrator_store/jobs.db`. This change is needed to allow
users to change our node names to longer depend on libp2p keys.
1. Persists the execution store and job store paths in `config.yaml` so
that users won't lose their state if we change our default path names in
the future
2. Adds a new `Node.Name` config, generates a node name still based on
libp2p, and persist it. This will simplify an upcoming PR that allows
users to define their own node name through the cli
3. Introduce migration scripts that will migrate v2 repos by creating
`config.yaml` if it doesn't exist, persist the store paths, and rename
old path names. No such migration will happen if the user provided their
own `config.yaml` with store paths configured.

### `bacalhau config show`
I've also introduced `bacalhau config show` command to print out the
current configuration which includes configs in `config.yaml`, env
variables and default values.

### Few issues encountered
1. The repo is initialized in the `RootCmd` before any flags are
registered. This means only default configurations are used when
initializing the repo and any env variables or flags will be ignored.
For example, if I run `bacalhau serve --compute-execution-store-path
my-awesome-path` for the very first time, then bacalhau will still
initialize the repo with default value, create
`~/.bacalhau/compute-store` directory, and create `config.yaml` with
`~/.bacalhau/compute-store` instead of my provided one. It is difficult
to do things differently with our current setup as we initialize the
repo in any command, but these flags are only available for `serve`
command
2. We use global `viper` instance to set our config. I had to reset the
config in my tests before each run. It might be safer to not use the
global instance
4. I've tried creating a repo per node in devstack, but couldn't get
that to work because of our global configs. Configurations of the first
repo initialized are also passed to the second repo's config, and we
have test cases that fetch info of the very first repo initialized, such
as
https://github.com/bacalhau-project/bacalhau/blob/badd1ce5fd9508bd615f9a8e08bd8ce44d2eceb7/pkg/test/devstack/timeout_test.go#L112


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new files to enhance configuration management, including
displaying default and current settings.
- Added functionality for handling private keys within the libp2p
network.
- Implemented a new approach for repository migration, including
updating repository versions and handling specific configurations.
	- Enhanced CLI commands for better configuration setup and display.
- **Refactor**
- Updated the logic flow in configuration setup, removing unnecessary
imports and functions.
- Simplified the initialization of repository and node configuration,
including passing context parameters where needed.
	- Adjusted error messaging for clearer debugging.
	- Streamlined setup and teardown processes in tests.
- **Bug Fixes**
- Fixed exclusion patterns in pre-commit hooks to correctly ignore
specified directories.
- **Chores**
- Updated imports and removed unused code across multiple files for
cleaner codebase management.
- **Documentation**
- Added summaries and comments for clarity in understanding the purpose
and impact of changes.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
wdbaruni committed Feb 20, 2024
1 parent 0565aea commit 2045425
Show file tree
Hide file tree
Showing 67 changed files with 1,079 additions and 514 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: detect-aws-credentials
args: [--allow-missing-credentials]
- id: detect-private-key
exclude: ^testdata/certs
exclude: testdata/.*
- id: check-yaml
- id: check-json
- repo: local
Expand Down
34 changes: 2 additions & 32 deletions cmd/cli/config/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package config

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"

"github.com/bacalhau-project/bacalhau/cmd/util/hook"
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/spf13/cobra"
)

func NewCmd() *cobra.Command {
Expand All @@ -16,36 +12,10 @@ func NewCmd() *cobra.Command {
PreRunE: hook.ClientPreRunHooks,
PostRunE: hook.ClientPostRunHooks,
}
configCmd.AddCommand(newShowCmd())
configCmd.AddCommand(newDefaultCmd())
configCmd.AddCommand(newListCmd())
configCmd.AddCommand(newSetCmd())
configCmd.AddCommand(newAutoResourceCmd())
return configCmd
}

func newDefaultCmd() *cobra.Command {
showCmd := &cobra.Command{
Use: "default",
Short: "Show the default bacalhau config.",
RunE: func(cmd *cobra.Command, args []string) error {
return defaultConfig(cmd)
},
}
showCmd.PersistentFlags().String("path", viper.GetString("repo"), "sets path dependent config fields")
return showCmd
}

func defaultConfig(cmd *cobra.Command) error {
// clear any existing configuration before generating the default.
config.Reset()
defaultConfig, err := config.Init(cmd.Flag("path").Value.String())
if err != nil {
return err
}
cfgbytes, err := yaml.Marshal(defaultConfig)
if err != nil {
return err
}
cmd.Println(string(cfgbytes))
return nil
}
35 changes: 35 additions & 0 deletions cmd/cli/config/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
)

func newDefaultCmd() *cobra.Command {
showCmd := &cobra.Command{
Use: "default",
Short: "Show the default bacalhau config.",
RunE: func(cmd *cobra.Command, args []string) error {
return defaultConfig(cmd)
},
}
showCmd.PersistentFlags().String("path", viper.GetString("repo"), "sets path dependent config fields")
return showCmd
}

func defaultConfig(cmd *cobra.Command) error {
// clear any existing configuration before generating the default.
config.Reset()
defaultConfig, err := config.Init(cmd.Flag("path").Value.String())
if err != nil {
return err
}
cfgbytes, err := yaml.Marshal(defaultConfig)
if err != nil {
return err
}
cmd.Println(string(cfgbytes))
return nil
}
34 changes: 34 additions & 0 deletions cmd/cli/config/show.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

import (
"github.com/bacalhau-project/bacalhau/pkg/config"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
)

func newShowCmd() *cobra.Command {
showCmd := &cobra.Command{
Use: "show",
Short: "Show the current bacalhau config.",
RunE: func(cmd *cobra.Command, args []string) error {
return showConfig(cmd)
},
}
showCmd.PersistentFlags().String("path", viper.GetString("repo"), "sets path dependent config fields")
return showCmd
}

func showConfig(cmd *cobra.Command) error {
// clear any existing configuration before generating the current.
currentConfig, err := config.Init(cmd.Flag("path").Value.String())
if err != nil {
return err
}
cfgbytes, err := yaml.Marshal(currentConfig)
if err != nil {
return err
}
cmd.Println(string(cfgbytes))
return nil
}
4 changes: 2 additions & 2 deletions cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ func runDevstack(cmd *cobra.Command, ODs *devstack.DevStackOptions, IsNoop bool)
}
}

computeConfig, err := serve.GetComputeConfig()
computeConfig, err := serve.GetComputeConfig(ctx)
if err != nil {
return err
}
requesterConfig, err := serve.GetRequesterConfig()
requesterConfig, err := serve.GetRequesterConfig(ctx)
if err != nil {
return err
}
Expand Down
41 changes: 13 additions & 28 deletions cmd/cli/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/node"
"github.com/bacalhau-project/bacalhau/pkg/repo"
"github.com/bacalhau-project/bacalhau/pkg/setup"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/util/templates"
"github.com/bacalhau-project/bacalhau/webui"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/spf13/cobra"
"k8s.io/kubectl/pkg/util/i18n"
)
Expand Down Expand Up @@ -174,19 +172,16 @@ func serve(cmd *cobra.Command) error {
if err != nil {
return err
}
fsRepo, err := repo.NewFS(repoDir)
fsRepo, err := setup.SetupBacalhauRepo(repoDir)
if err != nil {
return err
}
if err := fsRepo.Open(); err != nil {
return err
}

nodeID, err := getNodeID()
nodeName, err := getNodeID(ctx)
if err != nil {
return err
}
ctx = logger.ContextWithNodeIDLogger(ctx, nodeID)
ctx = logger.ContextWithNodeIDLogger(ctx, nodeName)

// configure node type
isRequesterNode, isComputeNode, err := getNodeType()
Expand All @@ -205,7 +200,7 @@ func serve(cmd *cobra.Command) error {
return err
}

networkConfig, err := getNetworkConfig(nodeID)
networkConfig, err := getNetworkConfig(nodeName)
if err != nil {
return err
}
Expand All @@ -219,12 +214,12 @@ func serve(cmd *cobra.Command) error {
networkConfig.ClusterPeers = peers
}

computeConfig, err := GetComputeConfig()
computeConfig, err := GetComputeConfig(ctx)
if err != nil {
return err
}

requesterConfig, err := GetRequesterConfig()
requesterConfig, err := GetRequesterConfig(ctx)
if err != nil {
return err
}
Expand All @@ -248,7 +243,7 @@ func serve(cmd *cobra.Command) error {

// Create node config from cmd arguments
nodeConfig := node.NodeConfig{
NodeID: nodeID,
NodeID: nodeName,
CleanupManager: cm,
IPFSClient: ipfsClient,
DisabledFeatures: featureConfig,
Expand All @@ -261,7 +256,6 @@ func serve(cmd *cobra.Command) error {
IsRequesterNode: isRequesterNode,
Labels: config.GetStringMapString(types.NodeLabels),
AllowListedLocalPaths: allowedListLocalPaths,
FsRepo: fsRepo,
NodeInfoStoreTTL: nodeInfoStoreTTL,
NetworkConfig: networkConfig,
}
Expand All @@ -283,6 +277,11 @@ func serve(cmd *cobra.Command) error {
return fmt.Errorf("error creating node: %w", err)
}

// Persist the node config after the node is created and its config is valid.
if err = persistConfigs(repoDir); err != nil {
return fmt.Errorf("error persisting configs: %w", err)
}

// Start node
if err := standardNode.Start(ctx); err != nil {
return fmt.Errorf("error starting node: %w", err)
Expand Down Expand Up @@ -384,20 +383,6 @@ func setupLibp2p() (libp2pHost host.Host, peers []string, err error) {
return
}

func getNodeID() (string, error) {
// for now, use libp2p host ID as node ID, regardless of using NATS or Libp2p
// TODO: allow users to specify node ID
privKey, err := config.GetLibp2pPrivKey()
if err != nil {
return "", err
}
peerID, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return "", err
}
return peerID.String(), nil
}

func buildConnectCommand(ctx context.Context, nodeConfig *node.NodeConfig, ipfsConfig types.IpfsConfig) (string, error) {
headerB := strings.Builder{}
cmdB := strings.Builder{}
Expand Down
85 changes: 81 additions & 4 deletions cmd/cli/serve/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ import (
"encoding/base64"
"fmt"
"net/url"
"path/filepath"
"time"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/compute/store/boltdb"
"github.com/bacalhau-project/bacalhau/pkg/jobstore"
boltjobstore "github.com/bacalhau-project/bacalhau/pkg/jobstore/boltdb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"go.uber.org/multierr"
Expand All @@ -23,7 +29,7 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/system"
)

func GetComputeConfig() (node.ComputeConfig, error) {
func GetComputeConfig(ctx context.Context) (node.ComputeConfig, error) {
var cfg types.ComputeConfig
if err := config.ForKey(types.NodeCompute, &cfg); err != nil {
return node.ComputeConfig{}, err
Expand All @@ -37,6 +43,10 @@ func GetComputeConfig() (node.ComputeConfig, error) {
return node.ComputeConfig{}, err
}

executionStore, err := getExecutionStore(ctx, cfg.ExecutionStore)
if err != nil {
return node.ComputeConfig{}, err
}
return node.NewComputeConfigWith(node.ComputeConfigParams{
TotalResourceLimits: *totalResources,
QueueResourceLimits: *queueResources,
Expand All @@ -57,16 +67,21 @@ func GetComputeConfig() (node.ComputeConfig, error) {
},
LogRunningExecutionsInterval: time.Duration(cfg.Logging.LogRunningExecutionsInterval),
LogStreamBufferSize: cfg.LogStreamConfig.ChannelBufferSize,
ExecutionStore: executionStore,
LocalPublisher: cfg.LocalPublisher,
})
}

func GetRequesterConfig() (node.RequesterConfig, error) {
func GetRequesterConfig(ctx context.Context) (node.RequesterConfig, error) {
var cfg types.RequesterConfig
if err := config.ForKey(types.NodeRequester, &cfg); err != nil {
return node.RequesterConfig{}, err
}

jobStore, err := getJobStore(ctx, cfg.JobStore)
if err != nil {
return node.RequesterConfig{}, err
}
return node.NewRequesterConfigWith(node.RequesterConfigParams{
JobDefaults: transformer.JobDefaults{
ExecutionTimeout: time.Duration(cfg.JobDefaults.ExecutionTimeout),
Expand All @@ -93,8 +108,8 @@ func GetRequesterConfig() (node.RequesterConfig, error) {
S3PreSignedURLExpiration: time.Duration(cfg.StorageProvider.S3.PreSignedURLExpiration),
S3PreSignedURLDisabled: cfg.StorageProvider.S3.PreSignedURLDisabled,
TranslationEnabled: cfg.TranslationEnabled,

DefaultPublisher: cfg.DefaultPublisher,
JobStore: jobStore,
DefaultPublisher: cfg.DefaultPublisher,
})
}

Expand Down Expand Up @@ -219,3 +234,65 @@ func getNetworkConfig(nodeID string) (node.NetworkConfig, error) {
ClusterPeers: networkCfg.Cluster.Peers,
}, nil
}

func getExecutionStore(ctx context.Context, storeCfg types.JobStoreConfig) (store.ExecutionStore, error) {
switch storeCfg.Type {
case types.BoltDB:
return boltdb.NewStore(ctx, storeCfg.Path)
default:
return nil, fmt.Errorf("unknown JobStore type: %s", storeCfg.Type)
}
}

func getJobStore(ctx context.Context, storeCfg types.JobStoreConfig) (jobstore.Store, error) {
switch storeCfg.Type {
case types.BoltDB:
log.Ctx(ctx).Debug().Str("Path", storeCfg.Path).Msg("creating boltdb backed jobstore")
return boltjobstore.NewBoltJobStore(storeCfg.Path)
default:
return nil, fmt.Errorf("unknown JobStore type: %s", storeCfg.Type)
}
}

func getNodeID(ctx context.Context) (string, error) {
nodeName, err := config.Get[string](types.NodeName)
if err != nil {
return "", err
}

if nodeName != "" {
return nodeName, nil
}

// If no nodeName is defined, then use libp2p peer ID
privKey, err := config.GetLibp2pPrivKey()
if err != nil {
return "", fmt.Errorf("getNodeID: error getting libp2p private key: %w", err)
}

peerID, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return "", err
}
nodeName = peerID.String()

// set the new name in the config, so it can be used and persisted later.
config.SetValue(types.NodeName, nodeName)
return nodeName, nil
}

// persistConfigs writes the resolved config to the persisted config file.
// this will only write values that must not change between invocations,
// such as the job store path and node name,
// and only if they are not already set in the config file.
func persistConfigs(repoPath string) error {
resolvedConfig, err := config.GetConfig()
if err != nil {
return fmt.Errorf("error getting config: %w", err)
}
err = config.WritePersistedConfigs(filepath.Join(repoPath, config.ConfigFileName), *resolvedConfig)
if err != nil {
return fmt.Errorf("error writing persisted config: %w", err)
}
return nil
}
Loading

0 comments on commit 2045425

Please sign in to comment.