This project listens for a MongoDB collection events (insert, update, delete, ...) also called "oplogs" for operation logs and distribute them into a Kafka topic of your choice. There is also a replay mode that allows you to initialize all items of a collection into a Kafka topic for the first time.
In addition of the binary, you will also need the following the Kafka library:
- librdkafka (report to the Installation part)
You can download the latest version of the binary built for your architecture here:
The watcher is also available as a Docker image. You can run it using the following example and pass configuration environment variables:
$ docker run \
-e 'REPLAY=true' \
Optionally, you can also download and build it from the sources. You have to retrieve the project sources by using one of the following way:
$ go get -u
# or
$ git clone
Then, build the binary:
$ GOOS=linux GOARCH=amd64 go build -ldflags '-s -w' -o kafka-mongo-watcher ./cmd/watcher/
In order to run the watcher, type the following command with the desired arguments.
You can use flags (as in this example) or environment variables:
$ ./kafka-mongo-watcher -REPLAY=true
<info> HTTP server started {"facility":"kafka-mongo-watcher","version":"wip","addr":":8001","file":"/usr/local/Cellar/go/1.14/libexec/src/runtime/asm_amd64.s","line":1373}
<info> Connected to mongodb database {"facility":"kafka-mongo-watcher","version":"wip","uri":"mongodb://root:toor@,,\u0026authSource=admin"}
<info> Connected to kafka producer {"facility":"kafka-mongo-watcher","version":"wip","bootstrap-servers":""}
In dev environment you can copy .env.dist
in .env
and edit his content in order to customize easily the env variables.
You can set/override configuration variables from .env
file and from variables environment
and or from cli arguments
(If a variables was configured in multiple sources the last will override the previous one)
Configuration variables with prefix are first loaded and then without prefix. For example if you define KAFKA_MONGO_WATCHER_MONGODB_URI=xxxx
it will used for the mongo uri, even if MONGODB_URI=yyyy
is set. This allows some overriding case, sometimes useful inside kubernetes cluster.
Type: string
Description: In case you want to specify a different prefix (not KAFKA_MONGO_WATCHER
) for all configuration environment variables.
in this case
Type: string
Description: In case you want to specify a filtering pipeline, you can specify it here. It works both wil replay and watch mode.
Example value: [ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ]
Type: bool
Description: In case you want to send all collection's documents once (default: false)
Hint: You can also use some built-in variables such as %currentTimestamp%
that will put the current timestamp value right in the aggregation pipeline.
Example value with variables: [ { "$match": { "date": { "$gt": { "$date": { "$numberLong": "%currentTimestamp%" } } } } } ]
Type: string
Description: The MongoDB connection string URI (default: mongodb://root:toor@,...)
Type: string
Description: The MongoDB collection you want to watch (default: "items")
Type: string
Description: The MongoDB database name you want to connect to (default: "watcher")
Type: duration
Description: The MongoDB server selection timeout duration (default: 2s)
Type: integer
Description: In case you want to enable watch batch size on MongoDB watch (default: 0 / no batch)
Type: boolean
Description: In case you want to retrieve the full document when watching for oplogs (default: true)
Type: duration
Description: In case you want to set a maximum value awaiting for new oplogs (default: 0 / don't stop)
Type: string
Description: In case you want to set a logical starting point for the change stream (example : {"_data": <hex string>}
Type: uint32 (increment value)
Type: uint32 (timestamp)
Description: In case you want to set a timestamp for the change stream to only return changes that occurred at or after the given timestamp (default: nil)
Type: integer
Description: The max number of retries when trying to watch a collection (default: 3, set to 0 to disable retry)
Type: duration
Description: Sleeping delay between two watch attempts (default: 500ms)
Type: string
Description: Kafka bootstrap servers list (default: "")
Type: string
Description: Kafka topic to write into (default: "kafka-mongo-watcher")
Type: integer
Description: The maximum size of the internal channel producer size (default: 10000)
A big value here can increase the heap memory of the application as all the payload that have to be sent to Kafka will be maintained in channel.
Type: integer
Description: The maximum message size in bytes at the producer level (default: 1024*1024)
Type: boolean
Description: Used to enable/disable log verbosity (default: true)
Type: string
Description: Used to define first level you want to start display logs (default: "info")
Type: string
Description: In case you want to push logs into a Graylog server, just fill this entry with the endpoint
Type: duration
Description: A idle timeout for HTTP technical server (default: 90s)
Type: duration
Description: A read timeout for HTTP technical server (default: 1s)
Type: duration
Description: A write timeout for HTTP technical server (default: 10s)
Type: string
Description: A specified address for HTTP technical server to listen (default: ":8001")
Type: boolean
Description: Used to enable/disable the configuration print at startup (default: true)
Type: boolean
Description: In case you want to enable Go pprof debugging (default: true). No impact when not used
Type: string
Description: In case you want to enable OpenTelemetry tracing, fill this with the : of your collector endpoint
Type: float64
Description: A fraction between 0 and 1 to enable sampling OpenTelemetry traces
You can enable this debug UI that will be available at
You just have to set HTTP_DEBUG_ENABLED=true
It will allows you to track real time activity on documents watched by your collection.
The watcher also exposes metrics about Go process and Watcher application.
These metrics can be scraped by Prometheus by browsing the following technical HTTP server endpoint:
Unit tests can be run with the following command:
$ go test -v -mod vendor ./...
And integration tests can be run with:
$ make test-integration
This will load needed mongodb and kafka containers and run the tests suite