Skip to content

Commit

Permalink
Fully refactor the entire codebase.
Browse files Browse the repository at this point in the history
This is a large breaking change that almost fully refactors and rewrites
major parts of the codebase.

- Split and cleanly separate the main package into
  relay/producer/source_pool controllers.
- Restructure files and internal packages.
- Rewrite, simplify, and detatch polling and healthcheck logic spread
  across relay and consumer sources into cleaner abstractions.
- Simplify relay to only do relaying and not handle server management.
- Rewrite the old multi-config based consumer into a new "source pool"
  controller that acts like a connection pool and hides away all server
  management and healthcheck complexities.
- Remove and simplify redundant util methods structs.
- Simplify and rewrite orchestration logic in multiple places:
  relay, source pool management, healthchecks.
- Simplify config structure and config initialization across
  controllers.
- Simplify main.go to do initialization and composition only.
  • Loading branch information
knadh committed Apr 29, 2024
1 parent 132af4a commit 631f6a8
Show file tree
Hide file tree
Showing 13 changed files with 1,933 additions and 1,976 deletions.
75 changes: 48 additions & 27 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,48 @@ max_failovers = -1 # infinite
log_level = "debug"
metrics_server_addr = ":7081"

# In failover mode, if we breach the lag threshold in serverA compared to serverB we switch over to serverB immediately
lag_threshold = 100

# Frequency at which we need to poll for healthiness/lags
node_health_check_frequency = "500ms"
# Map of topics from the source to sync to the target.
# source_topic => target_topic:optional_target_partition
# If the target partition is not specified, whatever partition a message
# was received from the source, the same partition is written to the target.
[[topics]]
source_topic1 = "target_topic1:1"
source_topic2 = "target_topic2"

# Max request wait time to check if a node is down
max_request_duration = "100ms"

[source_pool]
# Kafka client config common to all upstream sources ([[sources]]).
initial_offset = "start"
instance_id = "client_instance_id"
group_id = "consumer_group"

# Configuration for retry backoff settings.
# The retry backoff feature can be enabled or disabled.
# The minimum and maximum backoff durations can be specified.
[app.retry_backoff]
enable = true
min = "1s"
max = "10s"
# Frequency at which source servers are polled for health/lag.
healthcheck_interval = "3s"

# Map of topics from the source to sync to the target.
# source_topic => target_topic:optional_target_partition
# If the target partition is not specified, whatever partition a message
# was received from the source, the same partition is written to the target.
[[topics]]
source_topic1 = "target_topic1:1"
source_topic2 = "target_topic2"
# Max difference in total offsets across all topics on a source
# against other sources, which when breached, the source is marked
# as unhealthy because of a lag.
offset_lag_threshold = 1000

# Maximum number of connect/fetch retries before exiting. -1 for infinite.
max_retries = -1

# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"

# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"


[[sources]]
name = "node1"
servers = ["node1:9092"]
servers = ["127.0.0.1:9092"]
session_timeout = "6s"
enable_auth = true
sasl_mechanism = "PLAIN"
Expand Down Expand Up @@ -74,26 +82,39 @@ enable_log = false
# Destination kafka producer configuration
[target]
name = "node3"
servers = ["node3:9092"]
servers = ["127.0.0.1:9095"]
enable_log = false
enable_auth = true
sasl_mechanism = "PLAIN" # PLAIN/SCRAM-SHA-256/SCRAM-SHA-512
username = "user-y"
password = "pass-y"
max_retries = -1
enable_idempotency = true
commit_ack_type = "cluster"
flush_frequency = "20ms"
session_timeout = "6s"
flush_batch_size = 1000
batch_size = 1000
max_message_bytes = 10000000

enable_tls = false
client_key_path = ""
client_cert_path = ""
ca_cert_path = ""

enable_log = false
# -1 for infinite.
max_retries = -1
flush_batch_size = 1000
batch_size = 1000
max_message_bytes = 10000000

# Kafka exponential retry-backoff config for reconnection attempts.
# If both min and max values are the same, then it's a static wait time
# between attempts.
backoff_enable = true
backoff_min = "2s"
backoff_max = "10s"

# Wait timeout of a request/response to a Kafka instance to determine
# whether it's healthy or not.
request_timeout = "100ms"


# Custom go-plugin filter to load to filter messages when relaying
[filter.testfilter]
Expand Down
178 changes: 0 additions & 178 deletions consumer.go

This file was deleted.

Loading

0 comments on commit 631f6a8

Please sign in to comment.