From a6a31037a973ba0febffd9bc424741edd487d818 Mon Sep 17 00:00:00 2001 From: Forrest <6546409+frrist@users.noreply.github.com> Date: Thu, 27 Jun 2024 02:46:22 -0700 Subject: [PATCH] refactor: migate scenario tests to models pacakge (#4139) closes https://github.com/bacalhau-project/bacalhau/issues/4140 part of https://github.com/bacalhau-project/bacalhau/issues/3832 --------- Co-authored-by: frrist --- pkg/docker/test.go | 6 +- pkg/executor/docker/models/types.go | 8 + pkg/executor/wasm/models/types.go | 8 + pkg/job/parser.go | 16 +- pkg/node/node.go | 2 + pkg/node/requester.go | 5 +- pkg/storage/inline/types.go | 14 +- pkg/test/compute/resourcelimits_test.go | 33 ++- pkg/test/compute/utils_test.go | 9 +- pkg/test/devstack/concurrency_test.go | 15 +- pkg/test/devstack/default_publisher_test.go | 70 +++-- pkg/test/devstack/disabled_feature_test.go | 17 +- pkg/test/devstack/errorlogs_test.go | 67 +++-- pkg/test/devstack/jobselection_test.go | 47 ++- .../devstack/multiple_input_files_test.go | 42 +-- pkg/test/devstack/publish_on_error_test.go | 42 +-- pkg/test/devstack/target_all_test.go | 92 ++++-- pkg/test/devstack/timeout_test.go | 41 ++- pkg/test/devstack/url_test.go | 71 +++-- pkg/test/executor/docker_entrypoint_test.go | 31 +- pkg/test/executor/scenario_test.go | 2 +- pkg/test/executor/test_runner.go | 83 +++--- pkg/test/requester/node_selection_test.go | 56 ++-- pkg/test/scenario/example_basic_test.go | 55 ---- pkg/test/scenario/example_noop_test.go | 19 +- pkg/test/scenario/job_checkers.go | 20 -- pkg/test/scenario/resolver.go | 215 ++++++++++++++ pkg/test/scenario/responses.go | 19 +- pkg/test/scenario/scenario.go | 12 +- pkg/test/scenario/storage.go | 70 +++-- pkg/test/scenario/suite.go | 67 ++--- pkg/test/scenario/test_scenarios.go | 274 +++++++++++------- 32 files changed, 950 insertions(+), 578 deletions(-) delete mode 100644 pkg/test/scenario/example_basic_test.go create mode 100644 pkg/test/scenario/resolver.go diff --git a/pkg/docker/test.go b/pkg/docker/test.go index cab74704f6..d516ff3d2f 100644 --- a/pkg/docker/test.go +++ b/pkg/docker/test.go @@ -9,7 +9,7 @@ import ( "github.com/docker/docker/client" "github.com/stretchr/testify/require" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" ) // MustHaveDocker will skip the test if the test is running in an environment that cannot support cross-platform @@ -20,8 +20,8 @@ func MustHaveDocker(t testing.TB) { // EngineSpecRequiresDocker will skip the test if the test is running in an environment that cannot support cross-platform // Docker images, and the passed model.EngineSpec type is model.EngineDocker -func EngineSpecRequiresDocker(t testing.TB, engineSpec model.EngineSpec) { - MaybeNeedDocker(t, engineSpec.Engine() == model.EngineDocker) +func EngineSpecRequiresDocker(t testing.TB, engineSpec *models.SpecConfig) { + MaybeNeedDocker(t, engineSpec.Type == models.EngineDocker) } // MaybeNeedDocker will skip the test if the test is running in an environment that cannot support cross-platform diff --git a/pkg/executor/docker/models/types.go b/pkg/executor/docker/models/types.go index 087f5e5f99..ac75ac31a4 100644 --- a/pkg/executor/docker/models/types.go +++ b/pkg/executor/docker/models/types.go @@ -128,3 +128,11 @@ func (b *DockerEngineBuilder) Build() (*models.SpecConfig, error) { Params: b.spec.ToMap(), }, nil } + +func (b *DockerEngineBuilder) MustBuild() *models.SpecConfig { + spec, err := b.Build() + if err != nil { + panic(err) + } + return spec +} diff --git a/pkg/executor/wasm/models/types.go b/pkg/executor/wasm/models/types.go index a74da261f5..19ac6b2bc2 100644 --- a/pkg/executor/wasm/models/types.go +++ b/pkg/executor/wasm/models/types.go @@ -229,3 +229,11 @@ func (b *WasmEngineBuilder) Build() (*models.SpecConfig, error) { Params: b.spec.ToMap(), }, nil } + +func (b *WasmEngineBuilder) MustBuild() *models.SpecConfig { + spec, err := b.Build() + if err != nil { + panic(err) + } + return spec +} diff --git a/pkg/job/parser.go b/pkg/job/parser.go index 35141a2983..bbcca7f7a4 100644 --- a/pkg/job/parser.go +++ b/pkg/job/parser.go @@ -7,11 +7,13 @@ import ( "strings" "github.com/bacalhau-project/bacalhau/pkg/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" ) const ( - s3Prefix = "s3" - ipfsPrefix = "ipfs" + s3Prefix = "s3" + ipfsPrefix = "ipfs" + localPrefix = "local" ) // ParsePublisherString parses a publisher string into a SpecConfig without having to @@ -64,10 +66,10 @@ func ParsePublisherString(publisher string) (*models.SpecConfig, error) { parsedURI.Scheme = parsedURI.Path } - var res models.SpecConfig + var res *models.SpecConfig switch parsedURI.Scheme { case ipfsPrefix: - res = models.SpecConfig{ + res = &models.SpecConfig{ Type: models.PublisherIPFS, } case s3Prefix: @@ -77,13 +79,15 @@ func ParsePublisherString(publisher string) (*models.SpecConfig, error) { if _, ok := options["key"]; !ok { options["key"] = strings.TrimLeft(parsedURI.Path, "/") } - res = models.SpecConfig{ + res = &models.SpecConfig{ Type: models.PublisherS3, Params: options, } + case localPrefix: + res = publisher_local.NewSpecConfig() default: return nil, fmt.Errorf("unknown publisher type: %s", parsedURI.Scheme) } - return &res, nil + return res, nil } diff --git a/pkg/node/node.go b/pkg/node/node.go index 46a96a19b7..c50eee079b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -185,6 +185,8 @@ func NewNode( return nil, err } + // TODO calling `Keys` on the publishers takes ~10 seconds per call + // https://github.com/bacalhau-project/bacalhau/issues/4153 metrics.NodeInfo.Add(ctx, 1, attribute.StringSlice("node_publishers", publishers.Keys(ctx)), attribute.StringSlice("node_storages", storages.Keys(ctx)), diff --git a/pkg/node/requester.go b/pkg/node/requester.go index 79b046d832..38fd14c0dc 100644 --- a/pkg/node/requester.go +++ b/pkg/node/requester.go @@ -233,9 +233,10 @@ func NewRequesterNode( // parse the publisher to generate a models.SpecConfig and add it to each job // which is without a publisher config, err := job.ParsePublisherString(requesterConfig.DefaultPublisher) - if err == nil { - jobTransformers = append(jobTransformers, transformer.DefaultPublisher(config)) + if err != nil { + return nil, fmt.Errorf("parsing default publisher spec (%s): %w", requesterConfig.DefaultPublisher, err) } + jobTransformers = append(jobTransformers, transformer.DefaultPublisher(config)) } endpointV2 := orchestrator.NewBaseEndpoint(&orchestrator.BaseEndpointParams{ diff --git a/pkg/storage/inline/types.go b/pkg/storage/inline/types.go index 6f4ebe602a..9eff2bf6b3 100644 --- a/pkg/storage/inline/types.go +++ b/pkg/storage/inline/types.go @@ -4,10 +4,11 @@ import ( "errors" "fmt" - "github.com/bacalhau-project/bacalhau/pkg/lib/validate" - "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/fatih/structs" "github.com/mitchellh/mapstructure" + + "github.com/bacalhau-project/bacalhau/pkg/lib/validate" + "github.com/bacalhau-project/bacalhau/pkg/models" ) type Source struct { @@ -42,3 +43,12 @@ func DecodeSpec(spec *models.SpecConfig) (Source, error) { return c, c.Validate() } + +func NewSpecConfig(url string) *models.SpecConfig { + s := Source{URL: url} + + return &models.SpecConfig{ + Type: models.StorageSourceInline, + Params: s.ToMap(), + } +} diff --git a/pkg/test/compute/resourcelimits_test.go b/pkg/test/compute/resourcelimits_test.go index 490b5d3c6b..7faffbba35 100644 --- a/pkg/test/compute/resourcelimits_test.go +++ b/pkg/test/compute/resourcelimits_test.go @@ -16,6 +16,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/config/types" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" + clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2" legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" "github.com/bacalhau-project/bacalhau/pkg/model" @@ -60,14 +62,13 @@ type TotalResourceTestCaseCheck struct { type TotalResourceTestCase struct { // the total list of jobs to throw at the cluster all at the same time - jobs []model.ResourceUsageConfig - totalLimits model.ResourceUsageConfig + jobs []*models.ResourcesConfig + totalLimits *models.ResourcesConfig wait TotalResourceTestCaseCheck checkers []TotalResourceTestCaseCheck } func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { - // for this test we use the transport so the compute_node is calling // the executor in a go-routine and we can test what jobs // look like over time - this test leave each job running for X seconds @@ -126,7 +127,7 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { return size.Bytes(), err } - resourcesConfig := legacy.FromLegacyResourceUsageConfig(testCase.totalLimits) + resourcesConfig := testCase.totalLimits parsedResources, err := resourcesConfig.ToResources() require.NoError(suite.T(), err) @@ -148,12 +149,24 @@ func (suite *ComputeNodeResourceLimitsSuite) TestTotalResourceLimits() { for _, jobResources := range testCase.jobs { // what the job is doesn't matter - it will only end up - j := testutils.MakeNoopJob(suite.T()) - j.Spec.Resources = jobResources - _, err := stack.Nodes[0].RequesterNode.Endpoint.SubmitJob(ctx, model.JobCreatePayload{ - ClientID: "123", - APIVersion: j.APIVersion, - Spec: &j.Spec, + j := &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + }, + ResourcesConfig: jobResources, + }, + }, + } + j.Normalize() + client := clientv2.New(fmt.Sprintf("http://%s:%d", stack.Nodes[0].APIServer.Address, stack.Nodes[0].APIServer.Port)) + _, err := client.Jobs().Put(ctx, &apimodels.PutJobRequest{ + Job: j, }) require.NoError(suite.T(), err) diff --git a/pkg/test/compute/utils_test.go b/pkg/test/compute/utils_test.go index 7d32052d61..a5000eb494 100644 --- a/pkg/test/compute/utils_test.go +++ b/pkg/test/compute/utils_test.go @@ -6,7 +6,6 @@ import ( "fmt" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/models" localdirectory "github.com/bacalhau-project/bacalhau/pkg/storage/local_directory" ) @@ -34,16 +33,16 @@ func addResourceUsage(execution *models.Execution, usage models.Resources) *mode return execution } -func getResources(c, m, d string) model.ResourceUsageConfig { - return model.ResourceUsageConfig{ +func getResources(c, m, d string) *models.ResourcesConfig { + return &models.ResourcesConfig{ CPU: c, Memory: m, Disk: d, } } -func getResourcesArray(data [][]string) []model.ResourceUsageConfig { - var res []model.ResourceUsageConfig +func getResourcesArray(data [][]string) []*models.ResourcesConfig { + var res []*models.ResourcesConfig for _, d := range data { res = append(res, getResources(d[0], d[1], d[2])) } diff --git a/pkg/test/devstack/concurrency_test.go b/pkg/test/devstack/concurrency_test.go index 7e5dbb2439..fdbbe51d5d 100644 --- a/pkg/test/devstack/concurrency_test.go +++ b/pkg/test/devstack/concurrency_test.go @@ -7,12 +7,10 @@ import ( "github.com/stretchr/testify/suite" - "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/devstack" + "github.com/bacalhau-project/bacalhau/pkg/downloader" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" ) @@ -27,7 +25,6 @@ func TestDevstackConcurrencySuite(t *testing.T) { } func (suite *DevstackConcurrencySuite) TestConcurrencyLimit() { - testCase := scenario.WasmHelloWorld(suite.T()) testCase.Stack = &scenario.StackConfig{ DevStackOptions: &devstack.DevStackOptions{ @@ -35,14 +32,14 @@ func (suite *DevstackConcurrencySuite) TestConcurrencyLimit() { NumberOfComputeOnlyNodes: 2, }, } - testCase.Deal = model.Deal{Concurrency: 2} + testCase.Job.Count = 2 testCase.ResultsChecker = scenario.FileEquals( downloader.DownloadFilenameStdout, "Hello, world!\nHello, world!\n", ) - testCase.JobCheckers = []legacy_job.CheckStatesFunction{ - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: testCase.Deal.Concurrency, + testCase.JobCheckers = []scenario.StateChecks{ + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: testCase.Job.Count, }), } diff --git a/pkg/test/devstack/default_publisher_test.go b/pkg/test/devstack/default_publisher_test.go index d3473839c6..1d42f3c828 100644 --- a/pkg/test/devstack/default_publisher_test.go +++ b/pkg/test/devstack/default_publisher_test.go @@ -7,11 +7,10 @@ import ( "os" "testing" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" + wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" "github.com/bacalhau-project/bacalhau/testdata/wasm/cat" "github.com/stretchr/testify/suite" @@ -27,20 +26,26 @@ func TestDefaultPublisherSuite(t *testing.T) { func (s *DefaultPublisherSuite) TestNoDefaultPublisher() { testcase := scenario.Scenario{ - Spec: testutils.MakeSpecWithOpts(s.T(), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - "data/hello.txt", - "does/not/exist.txt", - ). - Build(), - ), - ), + Job: &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters( + "data/hello.txt", + "does/not/exist.txt", + ). + MustBuild(), + }, + }, + }, ResultsChecker: expectResultsNone, - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, } @@ -48,25 +53,32 @@ func (s *DefaultPublisherSuite) TestNoDefaultPublisher() { } func (s *DefaultPublisherSuite) TestDefaultPublisher() { + // we are skipping this test because the orchestrator endpoint doesn't require a publisher stack := scenario.StackConfig{} stack.DefaultPublisher = "local" testcase := scenario.Scenario{ - Spec: testutils.MakeSpecWithOpts(s.T(), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - "data/hello.txt", - "does/not/exist.txt", - ). - Build(), - ), - ), + Job: &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters( + "data/hello.txt", + "does/not/exist.txt", + ). + MustBuild(), + }, + }, + }, Stack: &stack, ResultsChecker: expectResultsSome, - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, } diff --git a/pkg/test/devstack/disabled_feature_test.go b/pkg/test/devstack/disabled_feature_test.go index bc84d13c66..9812e8455c 100644 --- a/pkg/test/devstack/disabled_feature_test.go +++ b/pkg/test/devstack/disabled_feature_test.go @@ -8,10 +8,9 @@ import ( "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/pkg/devstack" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" ) @@ -31,9 +30,9 @@ func disabledTestSpec(t testing.TB) scenario.Scenario { NumberOfComputeOnlyNodes: 1, }, }, - Spec: scenario.WasmHelloWorld(t).Spec, - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForUnsuccessfulCompletion(), + Job: scenario.WasmHelloWorld(t).Job, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForUnsuccessfulCompletion(), }, } } @@ -42,9 +41,7 @@ func (s *DisabledFeatureTestSuite) TestNothingDisabled() { testCase := disabledTestSpec(s.T()) testCase.SubmitChecker = scenario.SubmitJobSuccess() testCase.JobCheckers = scenario.WaitUntilSuccessful(1) - testCase.Spec.PublisherSpec = model.PublisherSpec{ - Type: model.PublisherLocal, - } + testCase.Job.Task().Publisher = publisher_local.NewSpecConfig() s.RunScenario(testCase) } @@ -64,9 +61,7 @@ func (s *DisabledFeatureTestSuite) TestDisabledStorage() { func (s *DisabledFeatureTestSuite) TestDisabledPublisher() { testCase := disabledTestSpec(s.T()) - testCase.Spec.PublisherSpec = model.PublisherSpec{ - Type: model.PublisherLocal, - } + testCase.Job.Task().Publisher = publisher_local.NewSpecConfig() testCase.Stack.DevStackOptions.DisabledFeatures.Publishers = []string{models.PublisherLocal} s.RunScenario(testCase) diff --git a/pkg/test/devstack/errorlogs_test.go b/pkg/test/devstack/errorlogs_test.go index bfdbb85862..4faae2c573 100644 --- a/pkg/test/devstack/errorlogs_test.go +++ b/pkg/test/devstack/errorlogs_test.go @@ -10,18 +10,17 @@ import ( "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/downloader" + dockmodels "github.com/bacalhau-project/bacalhau/pkg/executor/docker/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/docker" "github.com/bacalhau-project/bacalhau/pkg/executor" "github.com/bacalhau-project/bacalhau/pkg/executor/noop" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" ) type DevstackErrorLogsSuite struct { @@ -32,23 +31,37 @@ func TestDevstackErrorLogsSuite(t *testing.T) { suite.Run(t, new(DevstackErrorLogsSuite)) } -func executorTestCases(t testing.TB) []model.Spec { - return []model.Spec{ - testutils.MakeSpecWithOpts(t, - legacy_job.WithPublisher( - model.PublisherSpec{Type: model.PublisherLocal}, - ), - ), - testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder("ubuntu"). - WithEntrypoint("bash", "-c", "echo -n 'apples' >&1; echo -n 'oranges' >&2; exit 19;"). - Build(), - ), - legacy_job.WithPublisher( - model.PublisherSpec{Type: model.PublisherLocal}, - ), - ), +func executorTestCases(t testing.TB) []*models.Job { + return []*models.Job{ + { + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + Publisher: publisher_local.NewSpecConfig(), + }, + }, + }, + { + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder("ubuntu:latest"). + WithEntrypoint("bash", "-c", "echo -n 'apples' >&1; echo -n 'oranges' >&2; exit 19;"). + MustBuild(), + Publisher: publisher_local.NewSpecConfig(), + }, + }, + }, } } @@ -71,19 +84,19 @@ var errorLogsTestCase = scenario.Scenario{ scenario.FileEquals(downloader.DownloadFilenameStdout, "apples"), scenario.FileEquals(downloader.DownloadFilenameStderr, "oranges"), ), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, } func (suite *DevstackErrorLogsSuite) TestCanGetResultsFromErroredJob() { for _, testCase := range executorTestCases(suite.T()) { - suite.Run(testCase.EngineSpec.String(), func() { - docker.EngineSpecRequiresDocker(suite.T(), testCase.EngineSpec) + suite.Run(testCase.Task().Engine.Type, func() { + docker.EngineSpecRequiresDocker(suite.T(), testCase.Task().Engine) - scenario := errorLogsTestCase - scenario.Spec = testCase - suite.RunScenario(scenario) + s := errorLogsTestCase + s.Job = testCase + suite.RunScenario(s) }) } } diff --git a/pkg/test/devstack/jobselection_test.go b/pkg/test/devstack/jobselection_test.go index 0965c4e773..5c70e3897b 100644 --- a/pkg/test/devstack/jobselection_test.go +++ b/pkg/test/devstack/jobselection_test.go @@ -8,18 +8,17 @@ import ( "context" "testing" + "github.com/stretchr/testify/suite" + "github.com/bacalhau-project/bacalhau/pkg/bidstrategy/semantic" "github.com/bacalhau-project/bacalhau/pkg/config/configenv" "github.com/bacalhau-project/bacalhau/pkg/devstack" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/node" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/retry" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" - - "github.com/stretchr/testify/suite" + storage_local "github.com/bacalhau-project/bacalhau/pkg/storage/local_directory" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" ) @@ -61,16 +60,19 @@ func (suite *DevstackJobSelectionSuite) TestSelectAllJobs() { if testCase.addFiles { inputs = scenario.StoredText(rootSourceDir, "job selection", "/inputs") } else { - inputs = func(ctx context.Context) ([]model.StorageSpec, error) { + inputs = func(ctx context.Context) ([]*models.InputSource, error) { sourceFile, err := scenario.CreateSourcePath(rootSourceDir) if err != nil { return nil, err } - return []model.StorageSpec{ + localSource, err := storage_local.NewSpecConfig(sourceFile, false) + if err != nil { + return nil, err + } + return []*models.InputSource{ { - StorageSource: model.StorageSourceLocalDirectory, - SourcePath: sourceFile, - Path: "/inputs", + Target: "/inputs", + Source: localSource, }, }, nil } @@ -85,12 +87,25 @@ func (suite *DevstackJobSelectionSuite) TestSelectAllJobs() { RequesterConfig: requesterConfig, }, Inputs: inputs, - Spec: testutils.MakeNoopJob(suite.T()).Spec, - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: testCase.completed, - model.ExecutionStateAskForBidRejected: testCase.rejected, - model.ExecutionStateFailed: testCase.failed, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: testCase.completed, + models.ExecutionStateAskForBidRejected: testCase.rejected, + models.ExecutionStateFailed: testCase.failed, }), }, } diff --git a/pkg/test/devstack/multiple_input_files_test.go b/pkg/test/devstack/multiple_input_files_test.go index e87945f11c..28a30b0d04 100644 --- a/pkg/test/devstack/multiple_input_files_test.go +++ b/pkg/test/devstack/multiple_input_files_test.go @@ -10,12 +10,12 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" + wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models" + "github.com/bacalhau-project/bacalhau/pkg/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" "github.com/bacalhau-project/bacalhau/testdata/wasm/cat" ) @@ -46,27 +46,29 @@ func (s *MultipleInputFilesSuite) TestMultipleFiles() { scenario.StoredText(rootSourceDir, "file1\n", filepath.Join(dirCID1, fileName1)), scenario.StoredText(rootSourceDir, "file2\n", filepath.Join(dirCID2, fileName2)), ), - Spec: testutils.MakeSpecWithOpts(s.T(), - legacy_job.WithPublisher( - model.PublisherSpec{ - Type: model.PublisherLocal, + Job: &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Publisher: publisher_local.NewSpecConfig(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters( + filepath.Join(dirCID1, fileName1), + filepath.Join(dirCID2, fileName2), + ). + MustBuild(), }, - ), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - filepath.Join(dirCID1, fileName1), - filepath.Join(dirCID2, fileName2), - ). - Build(), - ), - ), + }, + }, ResultsChecker: scenario.ManyChecks( scenario.FileEquals(downloader.DownloadFilenameStdout, "file1\nfile2\n"), ), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, } diff --git a/pkg/test/devstack/publish_on_error_test.go b/pkg/test/devstack/publish_on_error_test.go index 698baa8444..ff87a683ac 100644 --- a/pkg/test/devstack/publish_on_error_test.go +++ b/pkg/test/devstack/publish_on_error_test.go @@ -7,11 +7,11 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" + wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" "github.com/bacalhau-project/bacalhau/testdata/wasm/cat" "github.com/stretchr/testify/suite" @@ -39,25 +39,27 @@ func (s *PublishOnErrorSuite) TestPublishOnError() { }, }, Inputs: scenario.StoredText(rootSourceDir, stdoutText, "data/hello.txt"), - Spec: testutils.MakeSpecWithOpts(s.T(), - legacy_job.WithPublisher( - model.PublisherSpec{ - Type: model.PublisherLocal, + Job: &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Publisher: publisher_local.NewSpecConfig(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters( + "data/hello.txt", + "does/not/exist.txt", + ). + MustBuild(), }, - ), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - "data/hello.txt", - "does/not/exist.txt", - ). - Build(), - ), - ), + }, + }, ResultsChecker: scenario.FileEquals(downloader.DownloadFilenameStdout, stdoutText), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, } diff --git a/pkg/test/devstack/target_all_test.go b/pkg/test/devstack/target_all_test.go index 749e4cc5b9..0bb379dd4c 100644 --- a/pkg/test/devstack/target_all_test.go +++ b/pkg/test/devstack/target_all_test.go @@ -10,15 +10,11 @@ import ( "github.com/stretchr/testify/suite" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/executor" "github.com/bacalhau-project/bacalhau/pkg/executor/noop" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" - - "github.com/bacalhau-project/bacalhau/pkg/devstack" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" ) @@ -38,8 +34,19 @@ func (suite *TargetAllSuite) TestCanTargetZeroNodes() { NumberOfRequesterOnlyNodes: 1, NumberOfComputeOnlyNodes: 0, }}, - Spec: testutils.MakeSpecWithOpts(suite.T()), - Deal: model.Deal{TargetingMode: model.TargetAll}, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeOps, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, SubmitChecker: scenario.SubmitJobSuccess(), JobCheckers: scenario.WaitUntilSuccessful(0), } @@ -52,13 +59,24 @@ func (suite *TargetAllSuite) TestCanTargetSingleNode() { Stack: &scenario.StackConfig{DevStackOptions: &devstack.DevStackOptions{ NumberOfHybridNodes: 1, }}, - Spec: testutils.MakeSpecWithOpts(suite.T()), - Deal: model.Deal{TargetingMode: model.TargetAll}, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeOps, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, SubmitChecker: scenario.SubmitJobSuccess(), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: 1, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: 1, }), }, } @@ -72,13 +90,24 @@ func (suite *TargetAllSuite) TestCanTargetMultipleNodes() { NumberOfHybridNodes: 1, NumberOfComputeOnlyNodes: 4, }}, - Spec: testutils.MakeSpecWithOpts(suite.T()), - Deal: model.Deal{TargetingMode: model.TargetAll}, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeOps, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, SubmitChecker: scenario.SubmitJobSuccess(), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: 5, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: 5, }), }, } @@ -112,14 +141,25 @@ func (suite *TargetAllSuite) TestPartialFailure() { }, }, }, - Spec: testutils.MakeSpecWithOpts(suite.T()), - Deal: model.Deal{TargetingMode: model.TargetAll}, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeOps, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, SubmitChecker: scenario.SubmitJobSuccess(), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForUnsuccessfulCompletion(), - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: 1, - model.ExecutionStateFailed: 1, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForUnsuccessfulCompletion(), + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: 1, + models.ExecutionStateFailed: 1, }), }, } diff --git a/pkg/test/devstack/timeout_test.go b/pkg/test/devstack/timeout_test.go index f31045734e..4ecebad66c 100644 --- a/pkg/test/devstack/timeout_test.go +++ b/pkg/test/devstack/timeout_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/bacalhau-project/bacalhau/pkg/config" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/orchestrator/transformer" @@ -19,11 +18,9 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/executor" "github.com/bacalhau-project/bacalhau/pkg/executor/noop" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/node" "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" ) type DevstackTimeoutSuite struct { @@ -69,6 +66,12 @@ func (suite *DevstackTimeoutSuite) TestRunningTimeout() { }) suite.Require().NoError(err) + // required by job_timeout_greater_than_max_but_on_allowed_list + namespace := "" + if len(testCase.computeJobExecutionBypassList) > 0 { + namespace = testCase.computeJobExecutionBypassList[0] + } + testScenario := scenario.Scenario{ Stack: &scenario.StackConfig{ DevStackOptions: &devstack.DevStackOptions{NumberOfHybridNodes: testCase.nodeCount}, @@ -88,17 +91,29 @@ func (suite *DevstackTimeoutSuite) TestRunningTimeout() { }, }, }, - Spec: testutils.MakeSpecWithOpts(suite.T(), - legacy_job.WithTimeout(int64(testCase.jobTimeout.Seconds())), - ), - Deal: model.Deal{ - Concurrency: testCase.concurrency, + Job: &models.Job{ + Name: suite.T().Name(), + Namespace: namespace, + Type: models.JobTypeBatch, + Count: testCase.concurrency, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + Timeouts: &models.TimeoutConfig{ + TotalTimeout: int64(testCase.jobTimeout.Seconds()), + }, + }, + }, }, - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: testCase.completedCount, - model.ExecutionStateCancelled: testCase.errorCount, - model.ExecutionStateAskForBidRejected: testCase.rejectedCount, + JobCheckers: []scenario.StateChecks{ + scenario.WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: testCase.completedCount, + models.ExecutionStateCancelled: testCase.errorCount, + models.ExecutionStateAskForBidRejected: testCase.rejectedCount, }), }, } diff --git a/pkg/test/devstack/url_test.go b/pkg/test/devstack/url_test.go index 1276c6fe25..018203823f 100644 --- a/pkg/test/devstack/url_test.go +++ b/pkg/test/devstack/url_test.go @@ -15,11 +15,11 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/model" + wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/node" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" "github.com/bacalhau-project/bacalhau/testdata/wasm/cat" "github.com/stretchr/testify/require" @@ -71,26 +71,27 @@ func runURLTest( scenario.FileEquals(downloader.DownloadFilenameStderr, ""), scenario.FileEquals(downloader.DownloadFilenameStdout, allContent), ), - JobCheckers: []legacy_job.CheckStatesFunction{ - legacy_job.WaitForSuccessfulCompletion(), + JobCheckers: []scenario.StateChecks{ + scenario.WaitForSuccessfulCompletion(), }, - Spec: testutils.MakeSpecWithOpts(suite.T(), - legacy_job.WithPublisher( - model.PublisherSpec{ - Type: model.PublisherLocal, + Job: &models.Job{ + Name: suite.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: suite.T().Name(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters( + testCase.mount1, + testCase.mount2, + ).MustBuild(), + Publisher: publisher_local.NewSpecConfig(), }, - ), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - testCase.mount1, - testCase.mount2, - ). - Build(), - ), - ), + }, + }, } suite.RunScenario(testScenario) @@ -243,23 +244,21 @@ func (s *URLTestSuite) TestLocalURLCombo() { scenario.StoredText(rootSourceDir, localContent, path.Join(localMount, localFile)), scenario.URLDownload(svr, urlfile, urlmount), ), - - Spec: testutils.MakeSpecWithOpts(s.T(), - legacy_job.WithPublisher( - model.PublisherSpec{ - Type: model.PublisherLocal, + Job: &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Engine: wasmmodels.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters(urlmount, path.Join(localMount, localFile)). + MustBuild(), + Publisher: publisher_local.NewSpecConfig(), }, - ), - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(scenario.InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters( - urlmount, - path.Join(localMount, localFile), - ). - Build(), - ), - ), + }, + }, ResultsChecker: scenario.FileEquals(downloader.DownloadFilenameStdout, URLContent+localContent), JobCheckers: scenario.WaitUntilSuccessful(1), } diff --git a/pkg/test/executor/docker_entrypoint_test.go b/pkg/test/executor/docker_entrypoint_test.go index 42347b2fd8..a10fbfd0af 100644 --- a/pkg/test/executor/docker_entrypoint_test.go +++ b/pkg/test/executor/docker_entrypoint_test.go @@ -13,19 +13,19 @@ import ( "strings" "testing" - "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/bacalhau-project/bacalhau/pkg/downloader" + dockmodels "github.com/bacalhau-project/bacalhau/pkg/executor/docker/models" + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/docker" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/storage/util" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" "github.com/bacalhau-project/bacalhau/pkg/util/targzip" ) @@ -78,6 +78,7 @@ func (suite *DockerEntrypointTestSuite) SetupSuite() { require.NoError(suite.T(), err, "reading file") suite.T().Log(string(data)) + // TODO our tests will leak files if this statement isn't reached f.Close() //build image cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) @@ -268,14 +269,20 @@ func createTestScenario(t testing.TB, expectedStderr, expectedStdout, image stri } testScenario := scenario.Scenario{ ResultsChecker: checkResults, - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder(image). - WithEntrypoint(entrypoint...). - WithParameters(parameters...). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder(image). + WithEntrypoint(entrypoint...). + WithParameters(parameters...). + MustBuild(), + }, + }, + }, SubmitChecker: scenario.SubmitJobSuccess(), } if expectError == true { diff --git a/pkg/test/executor/scenario_test.go b/pkg/test/executor/scenario_test.go index c9ccf450a5..003e5b2d8b 100644 --- a/pkg/test/executor/scenario_test.go +++ b/pkg/test/executor/scenario_test.go @@ -15,7 +15,7 @@ func TestScenarios(t *testing.T) { name, func(t *testing.T) { t.Log(name) - docker.EngineSpecRequiresDocker(t, testCase.Spec.EngineSpec) + docker.EngineSpecRequiresDocker(t, testCase.Job.Task().Engine) RunTestCase(t, testCase) }, ) diff --git a/pkg/test/executor/test_runner.go b/pkg/test/executor/test_runner.go index 7e98ff9e7e..cb2a785ef5 100644 --- a/pkg/test/executor/test_runner.go +++ b/pkg/test/executor/test_runner.go @@ -2,21 +2,21 @@ package executor import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/require" "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy" - "github.com/bacalhau-project/bacalhau/pkg/requester/jobtransform" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/pkg/setup" "github.com/bacalhau-project/bacalhau/pkg/test/mock" "github.com/bacalhau-project/bacalhau/pkg/compute" "github.com/bacalhau-project/bacalhau/pkg/devstack" _ "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/test/scenario" testutils "github.com/bacalhau-project/bacalhau/pkg/test/teststack" ) @@ -28,7 +28,7 @@ func RunTestCase( testCase scenario.Scenario, ) { ctx := context.Background() - spec := testCase.Spec + job := testCase.Job devstackOptions := &devstack.DevStackOptions{} if testCase.Stack != nil && testCase.Stack.DevStackOptions != nil { @@ -38,69 +38,55 @@ func RunTestCase( fsr, c := setup.SetupBacalhauRepoForTesting(t) stack := testutils.Setup(ctx, t, fsr, c, devstackOptions.Options()...) - executor, err := stack.Nodes[0].ComputeNode.Executors.Get(ctx, spec.EngineSpec.Engine().String()) + executor, err := stack.Nodes[0].ComputeNode.Executors.Get(ctx, job.Task().Engine.Type) require.NoError(t, err) isInstalled, err := executor.IsInstalled(ctx) require.NoError(t, err) require.True(t, isInstalled) - prepareStorage := func(getStorage scenario.SetupStorage) []model.StorageSpec { + prepareStorage := func(getStorage scenario.SetupStorage) []*models.InputSource { if getStorage == nil { - return []model.StorageSpec{} + return []*models.InputSource{} } storageList, stErr := getStorage(ctx) require.NoError(t, stErr) - for _, storageSpec := range storageList { - inputSource, err := legacy.FromLegacyStorageSpecToInputSource(storageSpec) + for _, input := range storageList { + strger, err := stack.Nodes[0].ComputeNode.Storages.Get(ctx, input.Source.Type) require.NoError(t, err) - strger, err := stack.Nodes[0].ComputeNode.Storages.Get(ctx, storageSpec.StorageSource.String()) - require.NoError(t, err) - hasStorage, stErr := strger.HasStorageLocally(ctx, *inputSource) + hasStorage, stErr := strger.HasStorageLocally(ctx, *input) require.NoError(t, stErr) + require.True(t, hasStorage) } return storageList } - spec.Inputs = prepareStorage(testCase.Inputs) - spec.Outputs = testCase.Outputs - spec.PublisherSpec = model.PublisherSpec{ - Type: model.PublisherLocal, - } - spec.Deal = model.Deal{Concurrency: testNodeCount} - - job := model.Job{ - Metadata: model.Metadata{ - ID: "test-job", - ClientID: "test-client", - CreatedAt: time.Now(), - Requester: model.JobRequester{ - RequesterNodeID: "test-owner", - }, - }, - Spec: spec, - } + job.Task().InputSources = prepareStorage(testCase.Inputs) + job.Task().ResultPaths = testCase.Outputs + job.Task().Publisher = publisher_local.NewSpecConfig() + job.Count = testNodeCount - newJob, err := legacy.FromLegacyJob(&job) - require.NoError(t, err) - execution := mock.ExecutionForJob(newJob) - execution.AllocateResources(newJob.Task().Name, models.Resources{}) - - // Since we are submitting jobs directly to the executor, we need to - // transform wasm storage specs, which is currently handled by the - // requester's endpoint - jobTransformers := []jobtransform.PostTransformer{ - jobtransform.NewWasmStorageSpecConverter(), - } - for _, transformer := range jobTransformers { - _, err = transformer(ctx, newJob) - require.NoError(t, err) - } + now := time.Now().UnixNano() + job.ID = fmt.Sprintf("test-job-%d", now) + job.Meta = make(map[string]string, 2) + job.Meta[models.MetaRequesterID] = "test-owner" + job.Meta[models.MetaClientID] = "test-client" + job.State = models.NewJobState(models.JobStateTypeUndefined) + job.Version = 1 + job.Revision = 1 + job.CreateTime = now + job.ModifyTime = now + + job.Normalize() + require.NoError(t, job.Validate()) + + execution := mock.ExecutionForJob(job) + execution.AllocateResources(job.Task().Name, models.Resources{}) resultsDirectory := t.TempDir() strgProvider := stack.Nodes[0].ComputeNode.Storages @@ -115,7 +101,12 @@ func RunTestCase( _, err = executor.Run(ctx, runCommandArguments) if testCase.SubmitChecker != nil { - err = testCase.SubmitChecker(&job, err) + // TODO not sure how this behavior should be replicated. + err = testCase.SubmitChecker(&apimodels.PutJobResponse{ + JobID: job.ID, + EvaluationID: "TODO", + Warnings: nil, + }, err) require.NoError(t, err) } diff --git a/pkg/test/requester/node_selection_test.go b/pkg/test/requester/node_selection_test.go index 59a50a9f03..48f655a6c4 100644 --- a/pkg/test/requester/node_selection_test.go +++ b/pkg/test/requester/node_selection_test.go @@ -4,24 +4,24 @@ package requester import ( "context" + "fmt" "testing" "github.com/stretchr/testify/suite" "k8s.io/apimachinery/pkg/labels" - "github.com/bacalhau-project/bacalhau/pkg/models/migration/legacy" - "github.com/bacalhau-project/bacalhau/pkg/publicapi/client" + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" + clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2" + "github.com/bacalhau-project/bacalhau/pkg/test/scenario" "github.com/bacalhau-project/bacalhau/pkg/devstack" noop_executor "github.com/bacalhau-project/bacalhau/pkg/executor/noop" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" "github.com/bacalhau-project/bacalhau/pkg/lib/math" "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/node" "github.com/bacalhau-project/bacalhau/pkg/setup" "github.com/bacalhau-project/bacalhau/pkg/test/teststack" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" nodeutils "github.com/bacalhau-project/bacalhau/pkg/test/utils/node" ) @@ -32,8 +32,8 @@ type NodeSelectionSuite struct { compute2 *node.Node compute3 *node.Node computeNodes []*node.Node - client *client.APIClient - stateResolver *legacy_job.StateResolver + api clientv2.API + stateResolver *scenario.StateResolver } func (s *NodeSelectionSuite) SetupSuite() { @@ -78,9 +78,9 @@ func (s *NodeSelectionSuite) SetupSuite() { s.compute1 = stack.Nodes[1] s.compute2 = stack.Nodes[2] s.compute3 = stack.Nodes[3] - s.client, err = client.NewAPIClient(client.NoTLS, cfg.User, s.requester.APIServer.Address, s.requester.APIServer.Port) + s.api = clientv2.New(fmt.Sprintf("http://%s:%d", s.requester.APIServer.Address, s.requester.APIServer.Port)) s.Require().NoError(err) - s.stateResolver = legacy.NewStateResolver(s.requester.RequesterNode.JobStore) + s.stateResolver = scenario.NewStateResolver(s.api) s.computeNodes = []*node.Node{s.compute1, s.compute2, s.compute3} nodeutils.WaitForNodeDiscovery(s.T(), s.requester.RequesterNode, 4) @@ -156,20 +156,33 @@ func (s *NodeSelectionSuite) TestNodeSelectionByLabels() { for _, tc := range testCase { s.Run(tc.name, func() { ctx := context.Background() - j := testutils.MakeNoopJob(s.T()) - j.Spec.NodeSelectors = s.parseLabels(tc.selector) - j.Spec.Deal.Concurrency = math.Max(1, len(tc.expectedNodes)) + j := &models.Job{ + Name: s.T().Name(), + Type: models.JobTypeBatch, + Count: math.Max(1, len(tc.expectedNodes)), + Tasks: []*models.Task{ + { + Name: s.T().Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + } + j.Constraints = s.parseLabels(tc.selector) + j.Normalize() - submittedJob, err := s.client.Submit(ctx, j) + putResp, err := s.api.Jobs().Put(ctx, &apimodels.PutJobRequest{Job: j}) s.NoError(err) - err = s.stateResolver.WaitUntilComplete(ctx, submittedJob.Metadata.ID) + err = s.stateResolver.Wait(ctx, putResp.JobID, scenario.WaitForSuccessfulCompletion()) if len(tc.expectedNodes) == 0 { s.Error(err) } else { s.NoError(err) } - selectedNodes := s.getSelectedNodes(submittedJob.Metadata.ID) + selectedNodes := s.getSelectedNodes(putResp.JobID) s.assertNodesMatch(tc.expectedNodes, selectedNodes) }) } @@ -177,9 +190,9 @@ func (s *NodeSelectionSuite) TestNodeSelectionByLabels() { func (s *NodeSelectionSuite) getSelectedNodes(jobID string) []*node.Node { ctx := context.Background() - jobState, err := s.stateResolver.GetJobState(ctx, jobID) + jobState, err := s.stateResolver.JobState(ctx, jobID) s.NoError(err) - completedExecutionStates := legacy_job.GetCompletedExecutionStates(jobState) + completedExecutionStates := scenario.GetCompletedExecutionStates(jobState) nodes := make([]*node.Node, 0, len(completedExecutionStates)) for _, executionState := range completedExecutionStates { @@ -210,8 +223,13 @@ func (s *NodeSelectionSuite) assertNodesMatch(expected, selected []*node.Node) { s.ElementsMatch(expectedNodeNames, selectedNodeNames) } -func (s *NodeSelectionSuite) parseLabels(selector string) []model.LabelSelectorRequirement { +func (s *NodeSelectionSuite) parseLabels(selector string) []*models.LabelSelectorRequirement { requirements, err := labels.ParseToRequirements(selector) s.NoError(err) - return model.ToLabelSelectorRequirements(requirements...) + tmp := models.ToLabelSelectorRequirements(requirements...) + out := make([]*models.LabelSelectorRequirement, 0, len(tmp)) + for _, r := range tmp { + out = append(out, r.Copy()) + } + return out } diff --git a/pkg/test/scenario/example_basic_test.go b/pkg/test/scenario/example_basic_test.go deleted file mode 100644 index 5bff772687..0000000000 --- a/pkg/test/scenario/example_basic_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package scenario - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/bacalhau-project/bacalhau/pkg/devstack" - "github.com/bacalhau-project/bacalhau/pkg/downloader" - - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/model" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" -) - -func basicScenario(t testing.TB) Scenario { - rootSourceDir := t.TempDir() - - return Scenario{ - Stack: &StackConfig{ - DevStackOptions: &devstack.DevStackOptions{ - AllowListedLocalPaths: []string{rootSourceDir + AllowedListedLocalPathsSuffix}, - }, - }, - Inputs: ManyStores( - StoredText(rootSourceDir, "hello, world!", "/inputs"), - StoredFile(rootSourceDir, "../../../testdata/wasm/cat/main.wasm", "/job"), - ), - Outputs: []model.StorageSpec{}, - ResultsChecker: FileEquals(downloader.DownloadFilenameStdout, "hello, world!\n"), - JobCheckers: WaitUntilSuccessful(1), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - // TODO(forrest): [correctness] this isn't a valid wasm engine spec - it needs an entry module - // but leaving as is to preserve whatever behaviour this test is after. - model.NewWasmEngineBuilder(model.StorageSpec{}). - WithEntrypoint("_start"). - Build(), - ), - ), - } -} - -type ExampleTest struct { - ScenarioRunner -} - -func Example_basic() { - // In a real example, use the testing.T passed to the TestXxx method. - suite.Run(&testing.T{}, new(ExampleTest)) -} - -func (suite *ExampleTest) TestRun() { - suite.RunScenario(basicScenario(suite.T())) -} diff --git a/pkg/test/scenario/example_noop_test.go b/pkg/test/scenario/example_noop_test.go index 11861332a3..811dfd5df3 100644 --- a/pkg/test/scenario/example_noop_test.go +++ b/pkg/test/scenario/example_noop_test.go @@ -5,15 +5,15 @@ import ( "strings" "testing" - "github.com/bacalhau-project/bacalhau/pkg/downloader" "github.com/stretchr/testify/suite" + "github.com/bacalhau-project/bacalhau/pkg/downloader" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/executor" "github.com/bacalhau-project/bacalhau/pkg/executor/noop" "github.com/bacalhau-project/bacalhau/pkg/system" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" ) func noopScenario(t testing.TB) Scenario { @@ -32,7 +32,20 @@ func noopScenario(t testing.TB) Scenario { }, }, }, - Spec: testutils.MakeSpecWithOpts(t), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: &models.SpecConfig{ + Type: models.EngineNoop, + Params: make(map[string]interface{}), + }, + }, + }, + }, ResultsChecker: FileEquals(downloader.DownloadFilenameStdout, "hello, world!\n"), JobCheckers: WaitUntilSuccessful(1), } diff --git a/pkg/test/scenario/job_checkers.go b/pkg/test/scenario/job_checkers.go index eaab347275..09c8148e82 100644 --- a/pkg/test/scenario/job_checkers.go +++ b/pkg/test/scenario/job_checkers.go @@ -1,21 +1 @@ package scenario - -import ( - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - - "github.com/bacalhau-project/bacalhau/pkg/model" -) - -// WaitUntilSuccessful returns a set of job.CheckStatesFunctions that will wait -// until the job they are checking reaches the Completed state on the passed -// number of nodes. The checks will fail if any job errors. -func WaitUntilSuccessful(nodes int) []legacy_job.CheckStatesFunction { - return []legacy_job.CheckStatesFunction{ - legacy_job.WaitExecutionsThrowErrors([]model.ExecutionStateType{ - model.ExecutionStateFailed, - }), - legacy_job.WaitForExecutionStates(map[model.ExecutionStateType]int{ - model.ExecutionStateCompleted: nodes, - }), - } -} diff --git a/pkg/test/scenario/resolver.go b/pkg/test/scenario/resolver.go new file mode 100644 index 0000000000..55ee5e8050 --- /dev/null +++ b/pkg/test/scenario/resolver.go @@ -0,0 +1,215 @@ +package scenario + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + + "github.com/bacalhau-project/bacalhau/pkg/models" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2" + "github.com/bacalhau-project/bacalhau/pkg/system" +) + +type StateResolver struct { + api client.API + maxWaitAttempts int + waitDelay time.Duration +} + +func NewStateResolver(api client.API) *StateResolver { + return &StateResolver{ + api: api, + maxWaitAttempts: 1000, + waitDelay: time.Millisecond * 100, + } +} + +type JobState struct { + ID string + Executions []*models.Execution + State models.State[models.JobStateType] +} + +type StateChecks func(s *JobState) (bool, error) + +func (s *StateResolver) JobState(ctx context.Context, id string) (*JobState, error) { + resp, err := s.api.Jobs().Get(ctx, &apimodels.GetJobRequest{ + JobID: id, + Include: "executions", + }) + if err != nil { + return nil, fmt.Errorf("failed to get job (%s): %w", id, err) + } + + return &JobState{ + ID: resp.Job.ID, + Executions: resp.Executions.Items, + State: resp.Job.State, + }, nil +} + +func (s *StateResolver) Wait(ctx context.Context, id string, until ...StateChecks) error { + waiter := &system.FunctionWaiter{ + Name: "wait for job", + MaxAttempts: s.maxWaitAttempts, + Delay: s.waitDelay, + Handler: func() (bool, error) { + state, err := s.JobState(ctx, id) + if err != nil { + return false, err + } + + allOk := true + for _, checkFunction := range until { + stepOk, checkErr := checkFunction(state) + if checkErr != nil { + return false, checkErr + } + if !stepOk { + allOk = false + } + } + + if allOk { + return allOk, nil + } + + // some of the check functions returned false + // let's see if we can quit early because all expected states are + // in terminal state + allTerminal, err := WaitForTerminalStates()(state) + if err != nil { + return false, err + } + + // If all the jobs are in terminal states, then nothing is going + // to change if we keep polling, so we should exit early. + if allTerminal { + log.Ctx(ctx).Error().Msgf("all executions are in terminal state, but not all expected states are met: %+v", state) + return false, fmt.Errorf("all jobs are in terminal states and conditions aren't met") + } + return false, nil + }, + } + + return waiter.Wait(ctx) +} + +func GetCompletedExecutionStates(jobState *JobState) []*models.Execution { + return GetFilteredExecutionStates(jobState, models.ExecutionStateCompleted) +} + +func GetFilteredExecutionStates(jobState *JobState, filterState models.ExecutionStateType) []*models.Execution { + var ret []*models.Execution + for _, executionState := range jobState.Executions { + if executionState.ComputeState.StateType == filterState { + ret = append(ret, executionState) + } + } + return ret +} + +// WaitForTerminalStates it is possible that a job is in a terminal state, but some executions are still running, +// such as when one node publishes the result before others. +// for that reason, we consider a job to be in a terminal state when: +// - all executions are in a terminal state +// - the job is in a terminal state to account for possible retries +// TODO validate this is comment is still valid. +func WaitForTerminalStates() StateChecks { + return func(state *JobState) (bool, error) { + for _, executionState := range state.Executions { + if !executionState.ComputeState.StateType.IsTermainl() { + return false, nil + } + } + return state.State.StateType.IsTerminal(), nil + } +} + +func WaitForSuccessfulCompletion() StateChecks { + return func(jobState *JobState) (bool, error) { + if jobState.State.StateType.IsTerminal() { + if jobState.State.StateType != models.JobStateTypeCompleted { + return false, fmt.Errorf("job did not complete successfully. "+ + "Completed with status: %s message: %s", jobState.State.StateType, jobState.State.Message) + } + return true, nil + } + return false, nil + } +} + +func WaitForUnsuccessfulCompletion() StateChecks { + return func(jobState *JobState) (bool, error) { + if jobState.State.StateType.IsTerminal() { + if jobState.State.StateType != models.JobStateTypeFailed { + return false, fmt.Errorf("job did not complete successfully") + } + return true, nil + } + return false, nil + } +} + +// WaitUntilSuccessful returns a set of job.CheckStatesFunctions that will wait +// until the job they are checking reaches the Completed state on the passed +// number of nodes. The checks will fail if any job errors. +func WaitUntilSuccessful(nodes int) []StateChecks { + return []StateChecks{ + WaitExecutionsThrowErrors([]models.ExecutionStateType{ + models.ExecutionStateFailed, + }), + WaitForExecutionStates(map[models.ExecutionStateType]int{ + models.ExecutionStateCompleted: nodes, + }), + } +} + +// error if there are any errors in any of the states +func WaitExecutionsThrowErrors(errorStates []models.ExecutionStateType) StateChecks { + return func(jobState *JobState) (bool, error) { + for _, execution := range jobState.Executions { //nolint:gocritic + for _, errorState := range errorStates { + if execution.ComputeState.StateType == errorState { + e := log.Debug() + if execution.RunOutput != nil { + e = e.Str("stdout", execution.RunOutput.STDOUT).Str("stderr", execution.RunOutput.STDERR) + } + e.Msg("Job failed") + return false, fmt.Errorf("job has error state %s on node %s (%s)", + execution.ComputeState.StateType.String(), execution.NodeID, execution.ComputeState.Message) + } + } + } + return true, nil + } +} + +// wait for the given number of different states to occur +func WaitForExecutionStates(requiredStateCounts map[models.ExecutionStateType]int) StateChecks { + return func(jobState *JobState) (bool, error) { + discoveredStateCount := getExecutionStateTotals(jobState.Executions) + log.Trace().Msgf("WaitForJobShouldHaveStates:\nrequired = %+v,\nactual = %+v\n", requiredStateCounts, discoveredStateCount) + for requiredStateType, requiredStateCount := range requiredStateCounts { + discoveredCount, ok := discoveredStateCount[requiredStateType] + if !ok { + discoveredCount = 0 + } + if discoveredCount != requiredStateCount { + return false, nil + } + } + return true, nil + } +} + +func getExecutionStateTotals(executionStates []*models.Execution) map[models.ExecutionStateType]int { + discoveredStateCount := map[models.ExecutionStateType]int{} + for _, executionState := range executionStates { //nolint:gocritic + discoveredStateCount[executionState.ComputeState.StateType]++ + } + return discoveredStateCount +} diff --git a/pkg/test/scenario/responses.go b/pkg/test/scenario/responses.go index adaae48967..b78d2f8eb8 100644 --- a/pkg/test/scenario/responses.go +++ b/pkg/test/scenario/responses.go @@ -4,21 +4,24 @@ import ( "fmt" "strings" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels" ) // A CheckSubmitResponse is a function that will examine and validate submitJob response. // Useful when validating that a job should be rejected. -type CheckSubmitResponse func(job *model.Job, err error) error +type CheckSubmitResponse func(response *apimodels.PutJobResponse, err error) error // SubmitJobSuccess returns a CheckSubmitResponse that asserts no error was returned when submitting a job. func SubmitJobSuccess() CheckSubmitResponse { - return func(job *model.Job, err error) error { + return func(response *apimodels.PutJobResponse, err error) error { if err != nil { return fmt.Errorf("expected no error, got %v", err) } - if job == nil { - return fmt.Errorf("expected job, got nil") + if response == nil { + return fmt.Errorf("expected job response, got nil") + } + if len(response.Warnings) > 0 { + return fmt.Errorf("unexpted warnings returned when submitting job: %v", response.Warnings) } return nil } @@ -26,7 +29,7 @@ func SubmitJobSuccess() CheckSubmitResponse { // SubmitJobFail returns a CheckSubmitResponse that asserts an error was returned when submitting a job. func SubmitJobFail() CheckSubmitResponse { - return func(_ *model.Job, err error) error { + return func(_ *apimodels.PutJobResponse, err error) error { if err == nil { return fmt.Errorf("expected error, got nil") } @@ -35,8 +38,8 @@ func SubmitJobFail() CheckSubmitResponse { } func SubmitJobErrorContains(msg string) CheckSubmitResponse { - return func(job *model.Job, err error) error { - e := SubmitJobFail()(job, err) + return func(response *apimodels.PutJobResponse, err error) error { + e := SubmitJobFail()(response, err) if e != nil { return e } diff --git a/pkg/test/scenario/scenario.go b/pkg/test/scenario/scenario.go index 787e904cab..22536c50e7 100644 --- a/pkg/test/scenario/scenario.go +++ b/pkg/test/scenario/scenario.go @@ -18,8 +18,7 @@ package scenario import ( "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/executor/noop" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" "github.com/bacalhau-project/bacalhau/pkg/node" ) @@ -48,13 +47,10 @@ type Scenario struct { // Output volumes that must be available to the job. If nil, no output // volumes will be attached to the job. - Outputs []model.StorageSpec + Outputs []*models.ResultPath // The job specification - Spec model.Spec - - // The job deal. If nil, concurrency will default to 1. - Deal model.Deal + Job *models.Job // A function that will assert submitJob response is as expected. // if nil, will use SubmitJobSuccess by default. @@ -67,7 +63,7 @@ type Scenario struct { // A set of checkers that will decide when the job has completed, and maybe // whether it was successful or not. If empty, the job will not be waited // for once it has been submitted. - JobCheckers []legacy_job.CheckStatesFunction + JobCheckers []StateChecks } // All the information that is needed to uniquely define a devstack. diff --git a/pkg/test/scenario/storage.go b/pkg/test/scenario/storage.go index 98d9a40bfc..0205aefac7 100644 --- a/pkg/test/scenario/storage.go +++ b/pkg/test/scenario/storage.go @@ -12,7 +12,10 @@ import ( "github.com/vincent-petithory/dataurl" - "github.com/bacalhau-project/bacalhau/pkg/model" + "github.com/bacalhau-project/bacalhau/pkg/models" + storage_inline "github.com/bacalhau-project/bacalhau/pkg/storage/inline" + storage_local "github.com/bacalhau-project/bacalhau/pkg/storage/local_directory" + storage_url "github.com/bacalhau-project/bacalhau/pkg/storage/url/urldownload" "github.com/bacalhau-project/bacalhau/pkg/storage/util" ) @@ -33,16 +36,16 @@ func CreateSourcePath(rootSourceDir string) (string, error) { // of the function to ensure that the data has been set up correctly. type SetupStorage func( ctx context.Context, -) ([]model.StorageSpec, error) +) ([]*models.InputSource, error) // StoredText will store the passed string as a file on the local filesystem and -// return the path to the file in a model.StorageSpec. +// return the path to the file in a *models.InputSource. func StoredText( rootSourceDir string, fileContents string, mountPath string, ) SetupStorage { - return func(ctx context.Context) ([]model.StorageSpec, error) { + return func(ctx context.Context) ([]*models.InputSource, error) { sourcePath, err := CreateSourcePath(rootSourceDir) if err != nil { return nil, err @@ -61,12 +64,16 @@ func StoredText( return nil, err } - return []model.StorageSpec{ + spec, err := storage_local.NewSpecConfig(sourcePath, false) + if err != nil { + return nil, err + } + + return []*models.InputSource{ { - StorageSource: model.StorageSourceLocalDirectory, - SourcePath: sourcePath, - Path: mountPath, - Name: mountPath, + Source: spec, + Target: mountPath, + Alias: mountPath, }, }, nil } @@ -79,7 +86,7 @@ func StoredFile( filePath string, mountPath string, ) SetupStorage { - return func(ctx context.Context) ([]model.StorageSpec, error) { + return func(ctx context.Context) ([]*models.InputSource, error) { fileInfo, err := os.Stat(filePath) if err != nil { return nil, fmt.Errorf("failed to stat file %s: %w", filePath, err) @@ -101,12 +108,15 @@ func StoredFile( return nil, fmt.Errorf("failed to copy file %s: %w", filePath, err) } } - return []model.StorageSpec{ + spec, err := storage_local.NewSpecConfig(sourcePath, false) + if err != nil { + return nil, err + } + return []*models.InputSource{ { - StorageSource: model.StorageSourceLocalDirectory, - SourcePath: sourcePath, - Path: mountPath, - Name: mountPath, + Source: spec, + Target: mountPath, + Alias: mountPath, }, }, nil } @@ -116,10 +126,11 @@ func StoredFile( // the other storage set-ups, this function loads the file immediately. This // makes it possible to store things deeper into the Spec object without the // test system needing to know how to prepare them. -func InlineData(data []byte) model.StorageSpec { - return model.StorageSpec{ - StorageSource: model.StorageSourceInline, - URL: dataurl.EncodeBytes(data), +func InlineData(data []byte) *models.InputSource { + spec := storage_inline.NewSpecConfig(dataurl.EncodeBytes(data)) + return &models.InputSource{ + Source: spec, + Target: "/inputs", } } @@ -130,13 +141,20 @@ func URLDownload( urlPath string, mountPath string, ) SetupStorage { - return func(_ context.Context) ([]model.StorageSpec, error) { + return func(_ context.Context) ([]*models.InputSource, error) { finalURL, err := url.JoinPath(server.URL, urlPath) - return []model.StorageSpec{ + if err != nil { + return nil, err + } + spec, err := storage_url.NewSpecConfig(finalURL) + if err != nil { + return nil, err + } + return []*models.InputSource{ { - StorageSource: model.StorageSourceURLDownload, - URL: finalURL, - Path: mountPath, + Source: spec, + Target: mountPath, + Alias: mountPath, }, }, err } @@ -146,8 +164,8 @@ func URLDownload( // associated with all of them. If any of them fail, the error from the first to // fail will be returned. func ManyStores(stores ...SetupStorage) SetupStorage { - return func(ctx context.Context) ([]model.StorageSpec, error) { - var specs []model.StorageSpec + return func(ctx context.Context) ([]*models.InputSource, error) { + var specs []*models.InputSource for _, store := range stores { spec, err := store(ctx) if err != nil { diff --git a/pkg/test/scenario/suite.go b/pkg/test/scenario/suite.go index 5a92a353be..715460d2e0 100644 --- a/pkg/test/scenario/suite.go +++ b/pkg/test/scenario/suite.go @@ -15,14 +15,12 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/config/types" "github.com/bacalhau-project/bacalhau/pkg/lib/provider" "github.com/bacalhau-project/bacalhau/pkg/models" - "github.com/bacalhau-project/bacalhau/pkg/publicapi/client" "github.com/bacalhau-project/bacalhau/pkg/setup" "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/docker" "github.com/bacalhau-project/bacalhau/pkg/downloader" "github.com/bacalhau-project/bacalhau/pkg/logger" - "github.com/bacalhau-project/bacalhau/pkg/model" "github.com/bacalhau-project/bacalhau/pkg/node" "github.com/bacalhau-project/bacalhau/pkg/system" "github.com/bacalhau-project/bacalhau/pkg/telemetry" @@ -61,9 +59,9 @@ func (s *ScenarioRunner) SetupTest() { s.T().Cleanup(func() { _ = telemetry.Cleanup() }) } -func (s *ScenarioRunner) prepareStorage(stack *devstack.DevStack, getStorage SetupStorage) []model.StorageSpec { +func (s *ScenarioRunner) prepareStorage(stack *devstack.DevStack, getStorage SetupStorage) []*models.InputSource { if getStorage == nil { - return []model.StorageSpec{} + return nil } storageList, stErr := getStorage(s.Ctx) @@ -124,62 +122,51 @@ func (s *ScenarioRunner) setupStack(config *StackConfig) (*devstack.DevStack, *s func (s *ScenarioRunner) RunScenario(scenario Scenario) string { var resultsDir string - spec := scenario.Spec - docker.EngineSpecRequiresDocker(s.T(), spec.EngineSpec) + scenario.Job.Normalize() + job := scenario.Job + task := job.Task() + docker.EngineSpecRequiresDocker(s.T(), task.Engine) stack, _ := s.setupStack(scenario.Stack) s.T().Log("Setting up storage") - spec.Inputs = s.prepareStorage(stack, scenario.Inputs) - spec.Outputs = scenario.Outputs - if spec.Outputs == nil { - spec.Outputs = []model.StorageSpec{} - } - - s.T().Log("Submitting job") - j, err := model.NewJobWithSaneProductionDefaults() - s.Require().NoError(err) - - j.Spec = spec - s.Require().True(model.IsValidEngine(j.Spec.EngineSpec.Engine())) - if !model.IsValidPublisher(j.Spec.PublisherSpec.Type) { - j.Spec.PublisherSpec = model.PublisherSpec{ - Type: model.PublisherLocal, - } - } - - j.Spec.Deal = scenario.Deal - if j.Spec.Deal.Concurrency < 1 { - j.Spec.Deal.Concurrency = 1 - } + task.InputSources = s.prepareStorage(stack, scenario.Inputs) + task.ResultPaths = scenario.Outputs apiServer := stack.Nodes[0].APIServer - apiClient, err := client.NewAPIClient(client.NoTLS, s.Config.User, apiServer.Address, apiServer.Port) - s.Require().NoError(err) - apiClientV2 := clientv2.New(fmt.Sprintf("http://%s:%d", apiServer.Address, apiServer.Port)) + apiProtocol := "http" + apiHost := apiServer.Address + apiPort := apiServer.Port + api := clientv2.New(fmt.Sprintf("%s://%s:%d", apiProtocol, apiHost, apiPort)) - submittedJob, submitError := apiClient.Submit(s.Ctx, j) if scenario.SubmitChecker == nil { scenario.SubmitChecker = SubmitJobSuccess() } - err = scenario.SubmitChecker(submittedJob, submitError) - s.Require().NoError(err) + s.T().Logf("Submitting job: %v", job) + putResp, err := api.Jobs().Put(s.Ctx, &apimodels.PutJobRequest{ + Job: job, + }) + s.Require().NoError(scenario.SubmitChecker(putResp, err)) // exit if the test expects submission to fail as no further assertions can be made - if submitError != nil { + if err != nil { return resultsDir } - s.T().Log("Waiting for job") - resolver := apiClient.GetJobStateResolver() - err = resolver.Wait(s.Ctx, submittedJob.Metadata.ID, scenario.JobCheckers...) + getResp, err := api.Jobs().Get(s.Ctx, &apimodels.GetJobRequest{ + JobID: putResp.JobID, + }) s.Require().NoError(err) + jobID := getResp.Job.ID + + s.T().Log("Waiting for job") + s.Require().NoError(NewStateResolver(api).Wait(s.Ctx, jobID, scenario.JobCheckers...)) // Check outputs if scenario.ResultsChecker != nil { s.T().Log("Checking output") - results, err := apiClientV2.Jobs().Results(s.Ctx, &apimodels.ListJobResultsRequest{ - JobID: submittedJob.Metadata.ID, + results, err := api.Jobs().Results(s.Ctx, &apimodels.ListJobResultsRequest{ + JobID: jobID, }) s.Require().NoError(err) diff --git a/pkg/test/scenario/test_scenarios.go b/pkg/test/scenario/test_scenarios.go index c45ca7d041..fa9ab4a0b3 100644 --- a/pkg/test/scenario/test_scenarios.go +++ b/pkg/test/scenario/test_scenarios.go @@ -7,9 +7,10 @@ import ( "github.com/bacalhau-project/bacalhau/pkg/devstack" "github.com/bacalhau-project/bacalhau/pkg/downloader" - legacy_job "github.com/bacalhau-project/bacalhau/pkg/legacyjob" - "github.com/bacalhau-project/bacalhau/pkg/model" - testutils "github.com/bacalhau-project/bacalhau/pkg/test/utils" + dockmodels "github.com/bacalhau-project/bacalhau/pkg/executor/docker/models" + wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models" + "github.com/bacalhau-project/bacalhau/pkg/models" + publisher_local "github.com/bacalhau-project/bacalhau/pkg/publisher/local" "github.com/bacalhau-project/bacalhau/testdata/wasm/cat" "github.com/bacalhau-project/bacalhau/testdata/wasm/csv" "github.com/bacalhau-project/bacalhau/testdata/wasm/dynamic" @@ -23,6 +24,7 @@ const helloWorld = "hello world" const simpleMountPath = "/data/file.txt" const simpleOutputPath = "/output_data/output_file.txt" const catProgram = "cat " + simpleMountPath + " > " + simpleOutputPath +const defaultDockerImage = "ubuntu:latest" const AllowedListedLocalPathsSuffix = string(os.PathSeparator) + "*" @@ -44,14 +46,19 @@ func CatFileToStdout(t testing.TB) Scenario { FileEquals(downloader.DownloadFilenameStderr, ""), FileEquals(downloader.DownloadFilenameStdout, helloWorld), ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(cat.Program())). - WithEntrypoint("_start"). - WithParameters(simpleMountPath). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(cat.Program())). + WithEntrypoint("_start"). + WithParameters(simpleMountPath).MustBuild(), + }, + }, + }, } } @@ -73,19 +80,24 @@ func CatFileToVolume(t testing.TB) Scenario { "test/output_file.txt", catProgram, ), - Outputs: []model.StorageSpec{ + Outputs: []*models.ResultPath{ { Name: "test", Path: "/output_data", }, }, - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder("ubuntu:latest"). - WithEntrypoint("bash", simpleMountPath). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder(defaultDockerImage). + WithEntrypoint("bash", simpleMountPath).MustBuild(), + }, + }, + }, } } @@ -108,13 +120,19 @@ func GrepFile(t testing.TB) Scenario { []string{"kiwi is delicious"}, 2, ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder("ubuntu:latest"). - WithEntrypoint("grep", "kiwi", simpleMountPath). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder(defaultDockerImage). + WithEntrypoint("grep", "kiwi", simpleMountPath). + MustBuild(), + }, + }, + }, } } @@ -137,18 +155,19 @@ func SedFile(t testing.TB) Scenario { []string{"LISBON"}, 5, //nolint:gomnd // magic number ok for testing ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder("ubuntu:latest"). - WithEntrypoint( - "sed", - "-n", - "/38.7[2-4]..,-9.1[3-7]../p", - simpleMountPath, - ). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder(defaultDockerImage). + WithEntrypoint("sed", "-n", "/38.7[2-4]..,-9.1[3-7]../p", simpleMountPath). + MustBuild(), + }, + }, + }, } } @@ -171,18 +190,24 @@ func AwkFile(t testing.TB) Scenario { []string{"LISBON"}, 501, //nolint:gomnd // magic number appropriate for test ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewDockerEngineBuilder("ubuntu:latest"). - WithEntrypoint( - "awk", - "-F,", - "{x=38.7077507-$3; y=-9.1365919-$4; if(x^2+y^2<0.3^2) print}", - simpleMountPath, - ). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: dockmodels.NewDockerEngineBuilder(defaultDockerImage). + WithEntrypoint( + "awk", + "-F,", + "{x=38.7077507-$3; y=-9.1365919-$4; if(x^2+y^2<0.3^2) print}", + simpleMountPath, + ). + MustBuild(), + }, + }, + }, } } @@ -192,10 +217,19 @@ func WasmHelloWorld(t testing.TB) Scenario { downloader.DownloadFilenameStdout, "Hello, world!\n", ), - Spec: model.Spec{ - EngineSpec: model.NewWasmEngineBuilder(InlineData(noop.Program())). - WithEntrypoint("_start"). - Build(), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(noop.Program())). + WithEntrypoint("_start"). + MustBuild(), + Publisher: publisher_local.NewSpecConfig(), + }, + }, }, } } @@ -206,14 +240,20 @@ func WasmExitCode(t testing.TB) Scenario { downloader.DownloadFilenameExitCode, "5", ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(exit_code.Program())). - WithEntrypoint("_start"). - WithEnvironmentVariables(map[string]string{"EXIT_CODE": "5"}). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(exit_code.Program())). + WithEntrypoint("_start"). + WithEnvironmentVariables(map[string]string{"EXIT_CODE": "5"}). + MustBuild(), + }, + }, + }, } } @@ -224,19 +264,25 @@ func WasmEnvVars(t testing.TB) Scenario { []string{"AWESOME=definitely", "TEST=yes"}, 3, //nolint:gomnd // magic number appropriate for test ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(env.Program())). - WithEntrypoint("_start"). - WithEnvironmentVariables( - map[string]string{ - "TEST": "yes", - "AWESOME": "definitely", - }, - ). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(env.Program())). + WithEntrypoint("_start"). + WithEnvironmentVariables( + map[string]string{ + "TEST": "yes", + "AWESOME": "definitely", + }, + ). + MustBuild(), + }, + }, + }, } } @@ -259,23 +305,29 @@ func WasmCsvTransform(t testing.TB) Scenario { []string{"http://www.wikidata.org/entity/Q14949904,Tugela,http://www.wikidata.org/entity/Q1001792,Makybe Diva"}, 269, //nolint:gomnd // magic number appropriate for test ), - Outputs: []model.StorageSpec{ + Outputs: []*models.ResultPath{ { Name: "outputs", Path: "/outputs", }, }, - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(csv.Program())). - WithEntrypoint("_start"). - WithParameters( - "inputs/horses.csv", - "outputs/parents-children.csv", - ). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(csv.Program())). + WithEntrypoint("_start"). + WithParameters( + "inputs/horses.csv", + "outputs/parents-children.csv", + ). + MustBuild(), + }, + }, + }, } } @@ -301,13 +353,19 @@ func WasmDynamicLink(t testing.TB) Scenario { downloader.DownloadFilenameStdout, "17\n", ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(dynamic.Program())). - WithEntrypoint("_start"). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(dynamic.Program())). + WithEntrypoint("_start"). + MustBuild(), + }, + }, + }, } } @@ -329,17 +387,23 @@ func WasmLogTest(t testing.TB) Scenario { []string{"https://www.gutenberg.org"}, // end of the file -1, //nolint:gomnd // magic number appropriate for test ), - Spec: testutils.MakeSpecWithOpts(t, - legacy_job.WithEngineSpec( - model.NewWasmEngineBuilder(InlineData(logtest.Program())). - WithEntrypoint("_start"). - WithParameters( - "inputs/cosmic_computer.txt", - "--fast", - ). - Build(), - ), - ), + Job: &models.Job{ + Name: t.Name(), + Type: models.JobTypeBatch, + Count: 1, + Tasks: []*models.Task{ + { + Name: t.Name(), + Engine: wasmmodels.NewWasmEngineBuilder(InlineData(logtest.Program())). + WithEntrypoint("_start"). + WithParameters( + "inputs/cosmic_computer.txt", + "--fast", + ). + MustBuild(), + }, + }, + }, } }