Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add setting topiccreation for kafka #554

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ kafka:
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
# requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE")
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing
# topiccreation: false # auto create the topic if it doesn't exist (default: false)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down Expand Up @@ -1008,6 +1017,15 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **KAFKA_HOSTPORT**: The Host:Port of the Kafka (ex: localhost:9092), if not
empty, Kafka is _enabled_
- **KAFKA_TOPIC**: The name of the Kafka topic
- **KAFKA_sasl**: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
- **KAFKA_USERNAME**: use this username to authenticate to Kafka via SASL (default: "")
- **KAFKA_PASSWORD**: use this password to authenticate to Kafka via SASL (default: "")
- **KAFKA_ASYNC**: produce messages without blocking (default: false)
- **KAFKA_REQUIREDACKS**: number of acknowledges from partition replicas required before receiving (default: "NONE")
- **KAFKA_COMPRESSION**: enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
- **KAFKA_BALANCER**: partition balancing strategy when producing, (default: "round_robin")
- **KAFKA_CLIENTID**: specify a client.id when communicating with the broker for tracing
- **KAFKA_TOPICCREATION**: auto create the topic if it doesn't exist (default: false)
- **KAFKA_MINIMUMPRIORITY**: minimum priority of event for using this output,
order is
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.Compression", "NONE")
v.SetDefault("Kafka.Async", false)
v.SetDefault("Kafka.RequiredACKs", "NONE")
v.SetDefault("Kafka.TopicCreation", false)

v.SetDefault("KafkaRest.Address", "")
v.SetDefault("KafkaRest.Version", 2)
Expand Down
1 change: 1 addition & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ kafka:
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing
# topiccreation: false # auto create the topic if it doesn't exist (default: false)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down
9 changes: 5 additions & 4 deletions outputs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt
}

kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
AllowAutoTopicCreation: config.Kafka.TopicCreation,
}

switch strings.ToLower(config.Kafka.Balancer) {
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ type kafkaConfig struct {
Compression string
Async bool
RequiredACKs string
TopicCreation bool
}

type KafkaRestConfig struct {
Expand Down
Loading