Skip to content

Commit

Permalink
Merge pull request #2 from zeromicro/master
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
lvphps committed Feb 21, 2024
2 parents 7647654 + 4a99c09 commit ec0517f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 26 deletions.
1 change: 1 addition & 0 deletions kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type KqConf struct {
Brokers []string
Group string
Topic string
CaFile string `json:",optional"`
Offset string `json:",options=first|last,default=last"`
Conns int `json:",default=1"`
Consumers int `json:",default=8"`
Expand Down
71 changes: 45 additions & 26 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,92 +11,111 @@ import (
)

type (
PushOption func(options *chunkOptions)
PushOption func(options *pushOptions)

Pusher struct {
produer *kafka.Writer
producer *kafka.Writer
topic string
executor *executors.ChunkExecutor
}

chunkOptions struct {
pushOptions struct {
// kafka.Writer options
allowAutoTopicCreation bool

// executors.ChunkExecutor options
chunkSize int
flushInterval time.Duration
}
)

// NewPusher returns a Pusher with the given Kafka addresses and topic.
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
producer := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Compression: kafka.Snappy,
}

var options pushOptions
for _, opt := range opts {
opt(&options)
}

// apply kafka.Writer options
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation

// apply ChunkExecutor options
var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
}
if options.flushInterval > 0 {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
}

pusher := &Pusher{
produer: producer,
topic: topic,
producer: producer,
topic: topic,
}
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
chunk := make([]kafka.Message, len(tasks))
for i := range tasks {
chunk[i] = tasks[i].(kafka.Message)
}
if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil {
logx.Error(err)
}
}, newOptions(opts)...)
}, chunkOpts...)

return pusher
}

// Close closes the Pusher and releases any resources used by it.
func (p *Pusher) Close() error {
if p.executor != nil {
p.executor.Flush()
}
return p.produer.Close()

return p.producer.Close()
}

// Name returns the name of the Kafka topic that the Pusher is sending messages to.
func (p *Pusher) Name() string {
return p.topic
}

// Push sends a message to the Kafka topic.
func (p *Pusher) Push(v string) error {
msg := kafka.Message{
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), // current timestamp
Value: []byte(v),
}
if p.executor != nil {
return p.executor.Add(msg, len(v))
} else {
return p.produer.WriteMessages(context.Background(), msg)
return p.producer.WriteMessages(context.Background(), msg)
}
}

// WithChunkSize customizes the Pusher with the given chunk size.
func WithChunkSize(chunkSize int) PushOption {
return func(options *chunkOptions) {
return func(options *pushOptions) {
options.chunkSize = chunkSize
}
}

// WithFlushInterval customizes the Pusher with the given flush interval.
func WithFlushInterval(interval time.Duration) PushOption {
return func(options *chunkOptions) {
return func(options *pushOptions) {
options.flushInterval = interval
}
}

func newOptions(opts []PushOption) []executors.ChunkOption {
var options chunkOptions
for _, opt := range opts {
opt(&options)
}

var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
}
if options.flushInterval > 0 {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithAllowAutoTopicCreation() PushOption {
return func(options *pushOptions) {
options.allowAutoTopicCreation = true
}
return chunkOpts
}
20 changes: 20 additions & 0 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package kq

import (
"context"
"crypto/tls"
"crypto/x509"
"io"
"log"
"os"
"time"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -121,6 +124,23 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
},
}
}
if len(c.CaFile) > 0 {
caCert, err := os.ReadFile(c.CaFile)
if err != nil {
log.Fatal(err)
}

caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(caCert)
if !ok {
log.Fatal(err)
}

readerConfig.Dialer.TLS = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
}
consumer := kafka.NewReader(readerConfig)

return &kafkaQueue{
Expand Down

0 comments on commit ec0517f

Please sign in to comment.