Skip to content

Commit

Permalink
Move all processors to new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Dec 28, 2023
1 parent 0fb3813 commit e8fde02
Show file tree
Hide file tree
Showing 156 changed files with 3,828 additions and 3,490 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file.
### Changed

- The `parse_parquet` Bloblang function, `parquet_decode`, `parquet_encode` processors and the `parquet` input have all been upgraded to the latest version of the underlying Parquet library. Since this underlying library is experimental it is likely that behaviour changes will result. One significant change is that encoding numerical values that are larger than the column type (`float64` into `FLOAT`, `int64` into `INT32`, etc) will no longer be automatically converted.
- The `parse_log` processor field `codec` is now deprecated.

## 4.24.0 - 2023-11-24

Expand Down
18 changes: 0 additions & 18 deletions internal/batch/policy/batchconfig/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package batchconfig

import (
"gopkg.in/yaml.v3"

"github.com/benthosdev/benthos/v4/internal/component/processor"
)

Expand All @@ -26,22 +24,6 @@ func NewConfig() Config {
}
}

// FromAny attempts to extract a Config from any value.
func FromAny(v any) (conf Config, err error) {
conf = NewConfig()
if pNode, ok := v.(*yaml.Node); ok {
err = pNode.Decode(&conf)
return
}

var node yaml.Node
if err = node.Encode(v); err != nil {
return
}
err = node.Decode(&conf)
return
}

// IsNoop returns true if this batch policy configuration does nothing.
func (p Config) IsNoop() bool {
if p.ByteSize > 0 {
Expand Down
124 changes: 0 additions & 124 deletions internal/batch/policy/batchconfig/config_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions internal/bundle/tracing/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestBundleOutputWithProcessorsTracing(t *testing.T) {

blobConf := processor.NewConfig()
blobConf.Type = "bloblang"
blobConf.Bloblang = "root = content().uppercase()"
blobConf.Plugin = "root = content().uppercase()"
outConfig.Processors = append(outConfig.Processors, blobConf)

mgr, err := manager.New(
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestBundleProcessorTracing(t *testing.T) {
procConfig := processor.NewConfig()
procConfig.Label = "foo"
procConfig.Type = "bloblang"
procConfig.Bloblang = `
procConfig.Plugin = `
let ctr = content().number()
root.count = if $ctr % 2 == 0 { throw("nah %v".format($ctr)) } else { $ctr }
meta bar = "new bar value"
Expand Down Expand Up @@ -575,7 +575,7 @@ func TestBundleProcessorTracingError(t *testing.T) {
procConfig := processor.NewConfig()
procConfig.Label = "foo"
procConfig.Type = "bloblang"
procConfig.Bloblang = `let nope`
procConfig.Plugin = `let nope`

mgr, err := manager.New(
manager.NewResourceConfig(),
Expand All @@ -597,7 +597,7 @@ func TestBundleProcessorTracingDisabled(t *testing.T) {
procConfig := processor.NewConfig()
procConfig.Label = "foo"
procConfig.Type = "bloblang"
procConfig.Bloblang = `
procConfig.Plugin = `
let ctr = content().number()
root.count = if $ctr % 2 == 0 { throw("nah %v".format($ctr)) } else { $ctr }
meta bar = "new bar value"
Expand Down
10 changes: 5 additions & 5 deletions internal/cli/test/case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,23 @@ func TestCase(t *testing.T) {

procConf = processor.NewConfig()
procConf.Type = "bloblang"
procConf.Bloblang = `root = content().uppercase()`
procConf.Plugin = `root = content().uppercase()`
if proc, err = mock.NewManager().NewProcessor(procConf); err != nil {
t.Fatal(err)
}
provider["/input/broker/inputs/0/processors"] = []processor.V1{proc}

procConf = processor.NewConfig()
procConf.Type = "bloblang"
procConf.Bloblang = `root = deleted()`
procConf.Plugin = `root = deleted()`
if proc, err = mock.NewManager().NewProcessor(procConf); err != nil {
t.Fatal(err)
}
provider["/input/broker/inputs/1/processors"] = []processor.V1{proc}

procConf = processor.NewConfig()
procConf.Type = "bloblang"
procConf.Bloblang = `root = if batch_index() == 0 { count("batch_id") }`
procConf.Plugin = `root = if batch_index() == 0 { count("batch_id") }`
if proc, err = mock.NewManager().NewProcessor(procConf); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestFileCaseInputs(t *testing.T) {
procConf := processor.NewConfig()

procConf.Type = "bloblang"
procConf.Bloblang = `root = "hello world " + content().string()`
procConf.Plugin = `root = "hello world " + content().string()`
proc, err := mock.NewManager().NewProcessor(procConf)
require.NoError(t, err)

Expand Down Expand Up @@ -336,7 +336,7 @@ func TestFileCaseConditions(t *testing.T) {
procConf := processor.NewConfig()

procConf.Type = "bloblang"
procConf.Bloblang = `root = content().uppercase()`
procConf.Plugin = `root = content().uppercase()`
proc, err := mock.NewManager().NewProcessor(procConf)
require.NoError(t, err)

Expand Down
45 changes: 45 additions & 0 deletions internal/component/buffer/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package buffer

import (
"fmt"

yaml "gopkg.in/yaml.v3"

"github.com/benthosdev/benthos/v4/internal/docs"
Expand Down Expand Up @@ -51,3 +53,46 @@ func (conf *Config) UnmarshalYAML(value *yaml.Node) error {
*conf = Config(aliased)
return nil
}

func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
return t, nil
case *yaml.Node:
return fromYAML(prov, t)
case map[string]any:
return fromMap(prov, t)
}
err = fmt.Errorf("unexpected value, expected object, got %T", value)
return
}

func fromMap(prov docs.Provider, value map[string]any) (conf Config, err error) {
if conf.Type, _, err = docs.GetInferenceCandidateFromMap(prov, docs.TypeBuffer, value); err != nil {
err = docs.NewLintError(0, docs.LintComponentNotFound, err)
return
}

if p, exists := value[conf.Type]; exists {
conf.Plugin = p
} else if p, exists := value["plugin"]; exists {
conf.Plugin = p
}
return
}

func fromYAML(prov docs.Provider, value *yaml.Node) (conf Config, err error) {
if conf.Type, _, err = docs.GetInferenceCandidateFromYAML(prov, docs.TypeBuffer, value); err != nil {
err = docs.NewLintError(value.Line, docs.LintComponentNotFound, err)
return
}

pluginNode, err := docs.GetPluginConfigYAML(conf.Type, value)
if err != nil {
err = docs.NewLintError(value.Line, docs.LintFailedRead, err)
return
}

conf.Plugin = &pluginNode
return
}
54 changes: 54 additions & 0 deletions internal/component/cache/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cache

import (
"fmt"

"gopkg.in/yaml.v3"

"github.com/benthosdev/benthos/v4/internal/docs"
Expand Down Expand Up @@ -55,3 +57,55 @@ func (conf *Config) UnmarshalYAML(value *yaml.Node) error {
*conf = Config(aliased)
return nil
}

func FromAny(prov docs.Provider, value any) (conf Config, err error) {
switch t := value.(type) {
case Config:
return t, nil
case *yaml.Node:
return fromYAML(prov, t)
case map[string]any:
return fromMap(prov, t)
}
err = fmt.Errorf("unexpected value, expected object, got %T", value)
return
}

func fromMap(prov docs.Provider, value map[string]any) (conf Config, err error) {
if conf.Type, _, err = docs.GetInferenceCandidateFromMap(prov, docs.TypeCache, value); err != nil {
err = docs.NewLintError(0, docs.LintComponentNotFound, err)
return
}

conf.Label, _ = value["label"].(string)

if p, exists := value[conf.Type]; exists {
conf.Plugin = p
} else if p, exists := value["plugin"]; exists {
conf.Plugin = p
}
return
}

func fromYAML(prov docs.Provider, value *yaml.Node) (conf Config, err error) {
if conf.Type, _, err = docs.GetInferenceCandidateFromYAML(prov, docs.TypeCache, value); err != nil {
err = docs.NewLintError(value.Line, docs.LintComponentNotFound, err)
return
}

for i := 0; i < len(value.Content)-1; i += 2 {
if value.Content[i].Value == "label" {
conf.Label = value.Content[i+1].Value
break
}
}

pluginNode, err := docs.GetPluginConfigYAML(conf.Type, value)
if err != nil {
err = docs.NewLintError(value.Line, docs.LintFailedRead, err)
return
}

conf.Plugin = &pluginNode
return
}
Loading

0 comments on commit e8fde02

Please sign in to comment.