From 86fdf22c6df5ed161d5be6bb2223c50dc3bb2125 Mon Sep 17 00:00:00 2001 From: Thomas Labarussias Date: Tue, 11 Jul 2023 16:23:18 +0200 Subject: [PATCH] add setting topiccreation for kafka Signed-off-by: Thomas Labarussias --- README.md | 18 ++++++++++++++++++ config.go | 1 + config_example.yaml | 1 + outputs/kafka.go | 9 +++++---- types/types.go | 1 + 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 92ceb72b3..aaaf171d3 100644 --- a/README.md +++ b/README.md @@ -455,6 +455,15 @@ kafka: hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled topic: "" # Name of the topic, if not empty, Kafka output is enabled # minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512) + username: "" # use this username to authenticate to Kafka via SASL (default: "") + password: "" # use this password to authenticate to Kafka via SASL (default: "") + # async: false # produce messages without blocking (default: false) + # requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE") + # compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE") + # balancer: "" # partition balancing strategy when producing, (default: "round_robin") + # clientid: "" # specify a client.id when communicating with the broker for tracing + # topiccreation: false # auto create the topic if it doesn't exist (default: false) kafkarest: address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test") @@ -1008,6 +1017,15 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` : - **KAFKA_HOSTPORT**: The Host:Port of the Kafka (ex: localhost:9092), if not empty, Kafka is _enabled_ - **KAFKA_TOPIC**: The name of the Kafka topic +- **KAFKA_sasl**: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512) +- **KAFKA_USERNAME**: use this username to authenticate to Kafka via SASL (default: "") +- **KAFKA_PASSWORD**: use this password to authenticate to Kafka via SASL (default: "") +- **KAFKA_ASYNC**: produce messages without blocking (default: false) +- **KAFKA_REQUIREDACKS**: number of acknowledges from partition replicas required before receiving (default: "NONE") +- **KAFKA_COMPRESSION**: enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE") +- **KAFKA_BALANCER**: partition balancing strategy when producing, (default: "round_robin") +- **KAFKA_CLIENTID**: specify a client.id when communicating with the broker for tracing +- **KAFKA_TOPICCREATION**: auto create the topic if it doesn't exist (default: false) - **KAFKA_MINIMUMPRIORITY**: minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)` diff --git a/config.go b/config.go index a09374b92..de47185ea 100644 --- a/config.go +++ b/config.go @@ -293,6 +293,7 @@ func getConfig() *types.Configuration { v.SetDefault("Kafka.Compression", "NONE") v.SetDefault("Kafka.Async", false) v.SetDefault("Kafka.RequiredACKs", "NONE") + v.SetDefault("Kafka.TopicCreation", false) v.SetDefault("KafkaRest.Address", "") v.SetDefault("KafkaRest.Version", 2) diff --git a/config_example.yaml b/config_example.yaml index 2a857e95e..ed4e3bfa5 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -281,6 +281,7 @@ kafka: # compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE") # balancer: "" # partition balancing strategy when producing, (default: "round_robin") # clientid: "" # specify a client.id when communicating with the broker for tracing + # topiccreation: false # auto create the topic if it doesn't exist (default: false) kafkarest: address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test") diff --git a/outputs/kafka.go b/outputs/kafka.go index a4deb6d6e..067491092 100644 --- a/outputs/kafka.go +++ b/outputs/kafka.go @@ -61,10 +61,11 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt } kafkaWriter := &kafka.Writer{ - Addr: kafka.TCP(config.Kafka.HostPort), - Topic: config.Kafka.Topic, - Async: config.Kafka.Async, - Transport: transport, + Addr: kafka.TCP(config.Kafka.HostPort), + Topic: config.Kafka.Topic, + Async: config.Kafka.Async, + Transport: transport, + AllowAutoTopicCreation: config.Kafka.TopicCreation, } switch strings.ToLower(config.Kafka.Balancer) { diff --git a/types/types.go b/types/types.go index 8a7cd96d6..08011668a 100644 --- a/types/types.go +++ b/types/types.go @@ -487,6 +487,7 @@ type kafkaConfig struct { Compression string Async bool RequiredACKs string + TopicCreation bool } type KafkaRestConfig struct {