Skip to content

Set watches on ZooKeeper znodes and produce them as events to Kafka

License

Notifications You must be signed in to change notification settings

rmaestroni/zk-watch2kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

31 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

zk-watch2kafka

Maintainability Docker Cloud Build Status Docker Pulls

Set watches on ZooKeeper znodes and produce them as events to Kafka.

There are two operations you can associate with a watcher

  • GET_DATA: the watcher is triggered when the znode data is changed or the znode is deleted;
  • GET_CHILDREN: the watcher is triggered when a new node is added as a children of the watched node, or an existing children is deleted or renamed.

Zk-watch2kafka can operate on multiple ZooKeeper ensemble and produce as well to many Kafka clusters.

The operations, znodes, source and destination clusters are defined through a JSON configuration file. See the section below for an example and a detailed description of the options.

When the program starts it produces one event for each watch set. Note that for a GET_DATA operation, if the target znode is deleted you won't see any event in case a node with the same name will be re-created afterwards.

From an high-level perspective the program implements this loop:

  1. Given a znode, apply the operation and set a watcher;
  2. Produce an event with the result of the operation;
  3. When a watcher triggers, go back to (1).

Hence there's no guarantee the program will trigger an event for every change to a znode. This is because between 3 and 1 (i.e. after the watcher is triggered and before a new watcher is set), an arbitrary amount of changes could be applied to the znode. However it is guaranteed that at least one event representing the "last state" of the znode will be produced.

See the ZooKeeper documentation for more informations.

Configuration

Sample configuration

{
  "watches": [
    {
      "zookeeper_id": "some-local-zk",
      "zookeeper": "localhost:2181",
      "znode": "foo-bar",
      "operation": "GET_DATA",

      "kafka": "localhost:9092",
      "target_topic": "zookeeper-watch-events",

      "enable_idempotence": true,
      "acks": "",
      "retries": -1,

      "serializer": "cloud.thh.zk_watch2kafka.kafka.serializers.AvroSerializer"
    },
    {
      "zookeeper_id": "some-local-zk",
      "zookeeper": "localhost:2181",
      "znode": "bar-baz",
      "operation": "GET_CHILDREN",

      "kafka": "localhost:9092",
      "target_topic": "zookeeper-watch-events",

      "enable_idempotence": true,
      "acks": "all",
      "retries": 10,

      "serializer": "cloud.thh.zk_watch2kafka.kafka.serializers.BsonSerializer"
    }
  ]
}

Every item in watches is an object consisting of

  • zookeeper_id - An identifier for the ZooKeeper cluster, this string will be prepended to every produced message key. Its purpose is to make possible to easily recognize which cluster generated the event;
  • zookeeper - ZooKeeper connection string;
  • znode - Znode to watch;
  • operation - Either GET_DATA or GET_CHILDREN;
  • kafka - Kafka brokers list, comma separated;
  • target_topic - The topic name to write to;
  • enable_idempotence - Whether to use an idempotent producer or not. When true the options acks and retries are ignored;
  • acks - Producer required acks (only with enable_idempotence=false);
  • retries - Producer retries (only with enable_idempotence=false);
  • serializer - The serializer qualified class name, defining how to serialize the value to Kafka. The class is expected to extend cloud.thh.zk_watch2kafka.kafka.ZkEventSerializer.

Usage

The program expects the path to the configuration file as the only command line parameter. With Docker for instance you can mount the configuration and run with

docker run --rm \
  -v "_full_path_to_config_:/app/config.json" \
  -e LOG_LEVEL="WARN" \
  -e JAVA_OPTIONS="-Xmx128m" \
  rmaestroni/zk-watch2kafka \
  config.json

Take a look at entrypoint.sh to run zk-watch2kafka natively. Essentially you need to set up accordingly your classpath with -cp and call the main class cloud.thh.zk_watch2kafka.App passing the JSON config file as parameter.

Build

Build with Docker

docker build -t zk-watch2kafka .

(or use any other tag as your discretion).

Build without Docker

This project was tested with Java 11 and Maven 3.6, although it may work with other versions.

mvn package && mvn dependency:copy-dependencies

Development setup

docker-compose up -d

Will run zk-watch2kafka along with a ZooKeeper and a Kafka container.

License

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.