Skip to content

Commit

Permalink
Merge pull request #135 from influxdata/crepererum/socks5_tests
Browse files Browse the repository at this point in the history
test: SOCKS5 in CI and docker compose files
  • Loading branch information
kodiakhq[bot] authored May 23, 2022
2 parents bb5c9c7 + ea46869 commit 6510b23
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 75 deletions.
10 changes: 8 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ jobs:
- --kafka-addr redpanda-2:9092
- --rpc-addr redpanda-2:33145
- --seeds redpanda-0:33145
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
Expand All @@ -196,7 +198,8 @@ jobs:
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "redpanda-1:9092"
KAFKA_CONNECT: "invalid:9092,redpanda-1:9092"
SOCKS_PROXY: "proxy:1080"
steps:
- checkout
- rust_components
Expand Down Expand Up @@ -247,6 +250,8 @@ jobs:
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
Expand All @@ -265,7 +270,8 @@ jobs:
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "kafka-1:9093"
KAFKA_CONNECT: "invalid:9093,kafka-1:9093"
SOCKS_PROXY: "proxy:1080"
steps:
- checkout
- rust_components
Expand Down
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
```

in another session.
Expand All @@ -129,12 +129,24 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
environment variables.

### Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable `SOCKS_PROXY`. The following
command requires a running proxy on the local machine.

```console
$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full
```

The SOCKS5 proxy will automatically be started by the docker compose files. Note that `KAFKA_CONNECT` was extended by
addresses that are reachable via the proxy.

### Java Interopt
To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the
`TEST_JAVA_INTEROPT=1` environment variable set.
Expand Down Expand Up @@ -217,14 +229,14 @@ execution that hooks right into the place where it is about to exit:
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```
Expand Down
14 changes: 9 additions & 5 deletions benches/write_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ macro_rules! maybe_skip_kafka_integration {
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, Some(kafka_connection)) => {
let kafka_connection: Vec<String> =
kafka_connection.split(",").map(|s| s.to_owned()).collect();
kafka_connection
}
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
Expand Down Expand Up @@ -227,7 +231,7 @@ fn random_topic_name() -> String {
format!("test_topic_{}", uuid::Uuid::new_v4())
}

async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer, String) {
async fn setup_rdkafka(connection: Vec<String>, buffering: bool) -> (FutureProducer, String) {
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
producer::FutureRecord,
Expand All @@ -239,7 +243,7 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,

// configure clients
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", connection);
cfg.set("bootstrap.servers", connection.join(","));
cfg.set("message.timeout.ms", "5000");
if buffering {
cfg.set("batch.num.messages", PARALLEL_BATCH_SIZE.to_string()); // = loads
Expand Down Expand Up @@ -273,10 +277,10 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,
(producer_client, topic_name)
}

async fn setup_rskafka(connection: String) -> PartitionClient {
async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
client
.controller_client()
.unwrap()
Expand Down
34 changes: 21 additions & 13 deletions docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "2"
version: "3"

services:
zookeeper:
Expand All @@ -12,14 +12,14 @@ services:
kafka-0:
image: docker.io/bitnami/kafka:3
ports:
- "9093:9093"
- "9010:9010"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_0_data:/bitnami/kafka
Expand All @@ -28,14 +28,14 @@ services:
kafka-1:
image: docker.io/bitnami/kafka:3
ports:
- "9094:9094"
- "9011:9011"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_1_data:/bitnami/kafka
Expand All @@ -44,19 +44,27 @@ services:
kafka-2:
image: docker.io/bitnami/kafka:3
ports:
- "9095:9095"
- "9012:9012"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://localhost:9095
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_2_data:/bitnami/kafka
depends_on:
- zookeeper
proxy:
image: serjs/go-socks5-proxy
ports:
- "1080:1080"
depends_on:
- kafka-0
- kafka-1
- kafka-2

volumes:
zookeeper_data:
Expand Down
28 changes: 18 additions & 10 deletions docker-compose-redpanda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-0
ports:
- '9092:9092'
- '9010:9010'
command:
- redpanda
- start
Expand All @@ -14,15 +14,15 @@ services:
- --overprovisioned
- --node-id 0
- --check=false
- --kafka-addr 0.0.0.0:9092
- --advertise-kafka-addr 127.0.0.1:9092
- --kafka-addr EXTERNAL://0.0.0.0:9010,FOR_PROXY://0.0.0.0:9020
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9010,FOR_PROXY://redpanda-0:9020
- --rpc-addr 0.0.0.0:33145
- --advertise-rpc-addr redpanda-0:33145
redpanda-1:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-1
ports:
- '9093:9093'
- '9011:9011'
command:
- redpanda
- start
Expand All @@ -33,15 +33,15 @@ services:
- --node-id 1
- --seeds "redpanda-0:33145"
- --check=false
- --kafka-addr 0.0.0.0:9093
- --advertise-kafka-addr 127.0.0.1:9093
- --kafka-addr EXTERNAL://0.0.0.0:9011,FOR_PROXY://0.0.0.0:9021
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9011,FOR_PROXY://redpanda-1:9021
- --rpc-addr 0.0.0.0:33146
- --advertise-rpc-addr redpanda-1:33146
redpanda-2:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-2
ports:
- '9094:9094'
- '9012:9012'
command:
- redpanda
- start
Expand All @@ -52,7 +52,15 @@ services:
- --node-id 2
- --seeds "redpanda-0:33145"
- --check=false
- --kafka-addr 0.0.0.0:9094
- --advertise-kafka-addr 127.0.0.1:9094
- --kafka-addr EXTERNAL://0.0.0.0:9012,FOR_PROXY://0.0.0.0:9022
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9012,FOR_PROXY://redpanda-2:9022
- --rpc-addr 0.0.0.0:33147
- --advertise-rpc-addr redpanda-2:33147
- --advertise-rpc-addr redpanda-2:33147
proxy:
image: serjs/go-socks5-proxy
ports:
- "1080:1080"
depends_on:
- redpanda-0
- redpanda-1
- redpanda-2
31 changes: 17 additions & 14 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ async fn test_plain() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(vec![connection]).build().await.unwrap();
ClientBuilder::new(connection).build().await.unwrap();
}

#[tokio::test]
async fn test_topic_crud() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
let topics = client.list_topics().await.unwrap();

Expand Down Expand Up @@ -109,22 +109,25 @@ async fn test_tls() {
.unwrap();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(vec![connection])
ClientBuilder::new(connection)
.tls_config(Arc::new(config))
.build()
.await
.unwrap();
}

// Disabled as currently no SOCKS5 integration tests
#[cfg(feature = "transport-socks5")]
#[ignore]
#[tokio::test]
async fn test_socks5() {
maybe_start_logging();

let client = ClientBuilder::new(vec!["my-cluster-kafka-bootstrap:9092".to_owned()])
.socks5_proxy("localhost:1080".to_owned())
// e.g. "my-connection-kafka-bootstrap:9092"
let connection = maybe_skip_kafka_integration!();
// e.g. "localhost:1080"
let proxy = maybe_skip_SOCKS_PROXY!();

let client = ClientBuilder::new(connection)
.socks5_proxy(proxy)
.build()
.await
.unwrap();
Expand All @@ -143,7 +146,7 @@ async fn test_produce_empty() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -165,7 +168,7 @@ async fn test_consume_empty() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -189,7 +192,7 @@ async fn test_consume_offset_out_of_range() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand Down Expand Up @@ -222,7 +225,7 @@ async fn test_get_offset() {
let topic_name = random_topic_name();
let n_partitions = 1;

let client = ClientBuilder::new(vec![connection.clone()])
let client = ClientBuilder::new(connection.clone())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -286,7 +289,7 @@ async fn test_produce_consume_size_cutoff() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -359,7 +362,7 @@ async fn test_consume_midbatch() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -404,7 +407,7 @@ async fn test_delete_records() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down
Loading

0 comments on commit 6510b23

Please sign in to comment.