Skip to content

Commit

Permalink
add setting topiccreation for kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Labarussias <[email protected]>
  • Loading branch information
Issif authored and poiana committed Jul 11, 2023
1 parent ff247a9 commit 86fdf22
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 4 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ kafka:
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
# requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE")
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing
# topiccreation: false # auto create the topic if it doesn't exist (default: false)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down Expand Up @@ -1008,6 +1017,15 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **KAFKA_HOSTPORT**: The Host:Port of the Kafka (ex: localhost:9092), if not
empty, Kafka is _enabled_
- **KAFKA_TOPIC**: The name of the Kafka topic
- **KAFKA_sasl**: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
- **KAFKA_USERNAME**: use this username to authenticate to Kafka via SASL (default: "")
- **KAFKA_PASSWORD**: use this password to authenticate to Kafka via SASL (default: "")
- **KAFKA_ASYNC**: produce messages without blocking (default: false)
- **KAFKA_REQUIREDACKS**: number of acknowledges from partition replicas required before receiving (default: "NONE")
- **KAFKA_COMPRESSION**: enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
- **KAFKA_BALANCER**: partition balancing strategy when producing, (default: "round_robin")
- **KAFKA_CLIENTID**: specify a client.id when communicating with the broker for tracing
- **KAFKA_TOPICCREATION**: auto create the topic if it doesn't exist (default: false)
- **KAFKA_MINIMUMPRIORITY**: minimum priority of event for using this output,
order is
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.Compression", "NONE")
v.SetDefault("Kafka.Async", false)
v.SetDefault("Kafka.RequiredACKs", "NONE")
v.SetDefault("Kafka.TopicCreation", false)

v.SetDefault("KafkaRest.Address", "")
v.SetDefault("KafkaRest.Version", 2)
Expand Down
1 change: 1 addition & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ kafka:
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing
# topiccreation: false # auto create the topic if it doesn't exist (default: false)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down
9 changes: 5 additions & 4 deletions outputs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt
}

kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
AllowAutoTopicCreation: config.Kafka.TopicCreation,
}

switch strings.ToLower(config.Kafka.Balancer) {
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ type kafkaConfig struct {
Compression string
Async bool
RequiredACKs string
TopicCreation bool
}

type KafkaRestConfig struct {
Expand Down

0 comments on commit 86fdf22

Please sign in to comment.