Skip to content

Commit

Permalink
allow to use tls for kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Labarussias <[email protected]>
  • Loading branch information
Issif authored and poiana committed Jul 27, 2023
1 parent 5cd6c34 commit 01eee81
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 2 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ kafka:
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
tls: false # Use TLS for the connections (default: false)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
Expand Down Expand Up @@ -1023,7 +1024,8 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
details. If empty, no Text is displayed before sections.
- **KAFKA_HOSTPORT**: comma separated list of Apache Kafka bootstrap nodes for establishing the initial connection to the cluster (ex: localhost:9092,localhost:9093). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is _enabled_
- **KAFKA_TOPIC**: The name of the Kafka topic
- **KAFKA_sasl**: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
- **KAFKA_SASL**: "" SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
- **KAFKA_TLS**: "" Use TLS for the connections (default: false)
- **KAFKA_USERNAME**: use this username to authenticate to Kafka via SASL (default: "")
- **KAFKA_PASSWORD**: use this password to authenticate to Kafka via SASL (default: "")
- **KAFKA_ASYNC**: produce messages without blocking (default: false)
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.Topic", "")
v.SetDefault("Kafka.MinimumPriority", "")
v.SetDefault("Kafka.SASL", "")
v.SetDefault("Kafka.TLS", false)
v.SetDefault("Kafka.Username", "")
v.SetDefault("Kafka.Password", "")
v.SetDefault("Kafka.Balancer", "round_robin")
Expand Down
1 change: 1 addition & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ kafka:
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
tls: false # Use TLS for the connections (default: false)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
Expand Down
17 changes: 16 additions & 1 deletion outputs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package outputs

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log"
Expand All @@ -28,6 +29,12 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt
ClientID: config.Kafka.ClientID,
}

if config.Kafka.TLS {
transport.TLS = &tls.Config{
MinVersion: tls.VersionTLS12,
}
}

var err error

if config.Kafka.SASL != "" {
Expand Down Expand Up @@ -143,7 +150,15 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {
}

// Errors are logged/captured via handleKafkaCompletion function, ignore here
_ = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
err = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
if err != nil {
c.incrKafkaErrorMetrics(1)
log.Printf("[ERROR] : Kafka - %v\n", err.Error())
return
} else {
c.incrKafkaSuccessMetrics(1)
log.Printf("[INFO] : Kafka - Publish OK\n")
}
}

// handleKafkaCompletion is called when a message is produced
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ type kafkaConfig struct {
Topic string
MinimumPriority string
SASL string
TLS bool
Username string
Password string
Balancer string
Expand Down

0 comments on commit 01eee81

Please sign in to comment.