From 00cd7cd67f80defb68f22cdfd55347afe4a7b6db Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 21 Oct 2024 09:36:30 +1100 Subject: [PATCH] Split ShotoverNode::address into address_for_peers and address_for_client --- changelog.md | 1 + docs/src/transforms.md | 10 +++++++--- shotover-proxy/benches/windsock/kafka/bench.rs | 3 ++- .../kafka/cluster-1-rack/topology-single.yaml | 3 ++- .../kafka/cluster-1-rack/topology1.yaml | 9 ++++++--- .../kafka/cluster-1-rack/topology2.yaml | 9 ++++++--- .../kafka/cluster-1-rack/topology3.yaml | 9 ++++++--- .../kafka/cluster-2-racks/topology-rack1.yaml | 6 ++++-- .../kafka/cluster-2-racks/topology-rack2.yaml | 6 ++++-- .../kafka/cluster-3-racks/topology-rack1.yaml | 9 ++++++--- .../kafka/cluster-3-racks/topology-rack2.yaml | 9 ++++++--- .../kafka/cluster-3-racks/topology-rack3.yaml | 9 ++++++--- .../kafka/cluster-mtls/topology.yaml | 3 ++- .../cluster-sasl-plain/topology-single.yaml | 3 ++- .../kafka/cluster-sasl-plain/topology1.yaml | 9 ++++++--- .../kafka/cluster-sasl-plain/topology2.yaml | 9 ++++++--- .../kafka/cluster-sasl-plain/topology3.yaml | 9 ++++++--- .../topology-single.yaml | 3 ++- .../topology1.yaml | 9 ++++++--- .../topology2.yaml | 9 ++++++--- .../topology3.yaml | 9 ++++++--- .../cluster-sasl-scram/topology-single.yaml | 3 ++- .../kafka/cluster-tls/topology.yaml | 3 ++- .../src/transforms/kafka/sink_cluster/mod.rs | 14 +++++++------- .../kafka/sink_cluster/shotover_node.rs | 18 ++++++++++++------ 25 files changed, 121 insertions(+), 63 deletions(-) diff --git a/changelog.md b/changelog.md index ba786f75c..b7afa7f3d 100644 --- a/changelog.md +++ b/changelog.md @@ -13,6 +13,7 @@ This assists us in knowing when to make the next release a breaking release and ### topology.yaml * A new mandatory configuration `check_shotover_peers_delay_ms` is added for `KafkaSinkCluster`. See [transform.md](docs/src/transforms.md) for details on this configuration. +* The `address` field for each shotover node in `KafkaSinkCluster` is replaced with `address_for_clients` and `address_for_peers`. See [transform.md](docs/src/transforms.md) for details on these fields. ## 0.4.0 diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 126fb0e2b..ffba4a09b 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -261,10 +261,14 @@ If SCRAM authentication against the first kafka broker fails, shotover will term # A list of every Shotover node that will be proxying to the same kafka cluster. # This field should be identical for all Shotover nodes proxying to the same kafka cluster. shotover_nodes: - # Address of the Shotover node. + # Address of the Shotover node that is reported to the kafka clients. # This is usually the same address as the Shotover source that is connected to this sink. - # But it may be different if you want Shotover to report a different address. - - address: "127.0.0.1:9092" + # But it may be different if you want Shotover to report a different address to its clients. + - address_for_client: "127.0.0.1:9092" + # Address of the shotover node as used to check for peers that are up. + # This is usually the same address as the Shotover source that is connected to this sink. + # But it may be different if you want Shotover to connect to its peers via a different address. + address_for_peers: "127.0.0.1:9092" # The rack the Shotover node will report as and route messages to. # For performance reasons, the Shotover node should be physically located in this rack. rack: "rack0" diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 4f0a524b4..920f1f3e6 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -100,7 +100,8 @@ impl KafkaBench { check_shotover_peers_delay_ms: Some(3000), first_contact_points: vec![kafka_address], shotover_nodes: vec![ShotoverNodeConfig { - address: host_address.parse().unwrap(), + address_for_clients: host_address.parse().unwrap(), + address_for_peers: host_address.parse().unwrap(), rack: "rack1".into(), broker_id: 0, }], diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml index f8981a0cd..b8aab8676 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml index 720561055..fb1ce8fae 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml index 8678ddd60..424e12443 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml index 7d8bfae08..fe1c82229 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml index c74a25d44..d1ffab795 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml @@ -6,10 +6,12 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack2" broker_id: 1 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml index 2291e6d42..b11bc3119 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml @@ -6,10 +6,12 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack2" broker_id: 1 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml index 194e80241..900f4eb6a 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_clients: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_clients: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml index 611282f92..ebca6989a 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_clients: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_clients: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml index ae8c6c9b8..6b4842f30 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_clients: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_clients: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml index f7df4c2b3..e0d658f96 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml @@ -9,7 +9,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "localhost:9192" + address_for_clients: "localhost:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml index f8981a0cd..b8aab8676 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml index 720561055..fb1ce8fae 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml index 8678ddd60..424e12443 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml index 7d8bfae08..fe1c82229 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml index 35d46ac54..3310c9581 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml index c7337cadb..0e6a9411d 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml index fc7c5627f..1b9f1061d 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml index 5d69bb456..1bd8957ba 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_clients: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_clients: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml index f8981a0cd..b8aab8676 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml index 2cdee5dcc..6d020cb91 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -9,7 +9,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_clients: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index e3ec494aa..980bf379d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -110,7 +110,7 @@ impl TransformConfig for KafkaSinkClusterConfig { if !unique_broker_ids.insert(node.broker_id) { return Err(anyhow::anyhow!( "Duplicate broker_id found in shotover node {}", - node.address + node.address_for_clients )); } } @@ -2800,8 +2800,8 @@ impl KafkaSinkCluster { partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack); let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash); - find_coordinator.host = shotover_node.address.host.clone(); - find_coordinator.port = shotover_node.address.port; + find_coordinator.host = shotover_node.address_for_clients.host.clone(); + find_coordinator.port = shotover_node.address_for_clients.port; find_coordinator.node_id = shotover_node.broker_id; } } else { @@ -2830,8 +2830,8 @@ impl KafkaSinkCluster { partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack); let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash); - coordinator.host = shotover_node.address.host.clone(); - coordinator.port = shotover_node.address.port; + coordinator.host = shotover_node.address_for_clients.host.clone(); + coordinator.port = shotover_node.address_for_clients.port; coordinator.node_id = shotover_node.broker_id; } } @@ -2861,8 +2861,8 @@ impl KafkaSinkCluster { .map(|shotover_node| { MetadataResponseBroker::default() .with_node_id(shotover_node.broker_id) - .with_host(shotover_node.address.host.clone()) - .with_port(shotover_node.address.port) + .with_host(shotover_node.address_for_clients.host.clone()) + .with_port(shotover_node.address_for_clients.port) .with_rack(Some(shotover_node.rack.clone())) }) .collect(); diff --git a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs index 1e330f0e9..deff5f9df 100644 --- a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs @@ -14,7 +14,8 @@ use tokio::time::sleep; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct ShotoverNodeConfig { - pub address: String, + pub address_for_clients: String, + pub address_for_peers: String, pub rack: String, pub broker_id: i32, } @@ -22,7 +23,8 @@ pub struct ShotoverNodeConfig { impl ShotoverNodeConfig { pub(crate) fn build(self) -> anyhow::Result { Ok(ShotoverNode { - address: KafkaAddress::from_str(&self.address)?, + address_for_clients: KafkaAddress::from_str(&self.address_for_clients)?, + address_for_peers: KafkaAddress::from_str(&self.address_for_peers)?, rack: StrBytes::from_string(self.rack), broker_id: BrokerId(self.broker_id), state: Arc::new(AtomicShotoverNodeState::new(ShotoverNodeState::Up)), @@ -32,7 +34,8 @@ impl ShotoverNodeConfig { #[derive(Clone)] pub(crate) struct ShotoverNode { - pub address: KafkaAddress, + pub address_for_clients: KafkaAddress, + pub address_for_peers: KafkaAddress, pub rack: StrBytes, pub broker_id: BrokerId, #[allow(unused)] @@ -99,8 +102,8 @@ async fn check_shotover_peers( let tcp_stream = tcp_stream( connect_timeout, ( - shotover_peer.address.host.as_str(), - shotover_peer.address.port as u16, + shotover_peer.address_for_peers.host.as_str(), + shotover_peer.address_for_peers.port as u16, ), ) .await; @@ -109,7 +112,10 @@ async fn check_shotover_peers( shotover_peer.set_state(ShotoverNodeState::Up); } Err(_) => { - tracing::warn!("Shotover peer {} is down", shotover_peer.address); + tracing::warn!( + "Shotover peer {} is down", + shotover_peer.address_for_clients + ); shotover_peer.set_state(ShotoverNodeState::Down); } }