Skip to content

Commit

Permalink
Merge branch 'main' into DelintingRound8
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn authored Nov 13, 2023
2 parents 336aa1e + d7c3cdf commit eeededc
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 378 deletions.
4 changes: 4 additions & 0 deletions TrafficCapture/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ For more details, check out the [Docker Solution README](dockerSolution/README.m

The Traffic Capture Proxy Server acts as a middleman, capturing traffic going to a source, which can then be used by the Traffic Replayer.

This tool can be attached to coordinator nodes in clusters with a minimum of two coordinator nodes and start capturing traffic
while having zero downtime on the cluster. Be aware that zero downtime is only achievable **if** the remaining nodes in-service can handle the additional load on the cluster.
More details on attaching a Capture Proxy can be found here: [Capture Proxy](trafficCaptureProxyServer/README.md).

### Traffic Replayer

The Traffic Replayer consumes streams of IP packets that were previously recorded by the Traffic Capture Proxy Server and replays the requests to another HTTP
Expand Down
10 changes: 0 additions & 10 deletions TrafficCapture/dockerSolution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ down again.
Notice that most of the Dockerfiles are dynamically constructed in the build hierarchy. Some efforts have been made
to ensure that changes will make it into containers to be launched.

If a user wants to use their own checkout of the traffic-comparator repo, just set the environment variable "
TRAFFIC_COMPARATOR_DIRECTORY" to the directory that contains `setup.py`. Otherwise, if that isn't set, the traffic
comparator repo will be checked out to the build directory and that will be used. Notice that the checkout happens when
the directory wasn't present and there wasn't an environment variable specifying a directory. Once a directory exists,
it will be mounted to the traffic-comparator and jupyter services.

Netcat is still used to connect several of the components and we're still working on improving the resiliency story
between these containers. The long term approach is to replace fixed streams with message bus approaches directly (i.e.
Kafka). In the short term, we can and are beginning, to leverage things like conditions on dependent services.

### Running the Docker Solution

While in the TrafficCapture directory, run the following command:
Expand Down
64 changes: 1 addition & 63 deletions TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ import java.security.MessageDigest
import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import org.apache.tools.ant.taskdefs.condition.Os

def getTrafficComparatorDirectory() {
String overrideTrafficComparatorDirectory = System.getenv(TRAFFIC_COMPARATOR_DIRECTORY_ENV)
String rval = overrideTrafficComparatorDirectory != null ?
overrideTrafficComparatorDirectory : TRAFFIC_COMPARATOR_REPO_DIRECTORY;
return rval
}

ext {
TRAFFIC_COMPARATOR_REPO_DIRECTORY = "build/traffic-comparator"
TRAFFIC_COMPARATOR_DIRECTORY_ENV = "TRAFFIC_COMPARATOR_DIRECTORY"
REALIZED_TRAFFIC_COMPARATOR_DIRECTORY = project.file(getTrafficComparatorDirectory())
}

def calculateDockerHash = { projectName ->
CommonUtils.calculateDockerHash(projectName, project)
}
Expand All @@ -31,17 +18,6 @@ dependencies {
implementation project(':trafficReplayer')
}

task cloneComparatorRepoIfNeeded(type: Exec) {
String comparatorDirectory = project.file(REALIZED_TRAFFIC_COMPARATOR_DIRECTORY);
String repo = 'https://github.com/opensearch-project/traffic-comparator.git'
onlyIf {
!(new File(comparatorDirectory).exists())
}
commandLine = Os.isFamily(Os.FAMILY_WINDOWS) ?
['git', 'clone', repo, TRAFFIC_COMPARATOR_REPO_DIRECTORY ] :
['/bin/sh', '-c', "git clone ${repo} ${TRAFFIC_COMPARATOR_REPO_DIRECTORY}"]
}

def dockerFilesForExternalServices = [
"elasticsearchWithSearchGuard": "elasticsearch_searchguard",
"migrationConsole": "migration_console"
Expand All @@ -56,36 +32,6 @@ dockerFilesForExternalServices.each { projectName, dockerImageName ->
}
}

def trafficComparatorServices = [
"trafficComparator": "traffic_comparator",
"jupyterNotebook": "jupyter_notebook"
]
trafficComparatorServices.forEach {projectName, dockerImageName ->
def dockerBuildDir = "build/docker/${projectName}"
task("copyArtifact_${projectName}", type: Copy) {
dependsOn(tasks.getByName('cloneComparatorRepoIfNeeded'))
from REALIZED_TRAFFIC_COMPARATOR_DIRECTORY
into dockerBuildDir
include '*.py'
include '/traffic_comparator/*'
if (projectName == 'jupyterNotebook') {
include '*.ipynb'
}
}

task "createDockerfile_${projectName}"(type: com.bmuschko.gradle.docker.tasks.image.Dockerfile) {
dependsOn "copyArtifact_${projectName}"
destFile = project.file("${dockerBuildDir}/Dockerfile")
from 'python:3.10.10'
runCommand("apt-get update && apt-get install -y netcat lsof")
copyFile("setup.py", "/setup.py")
copyFile(".", "/containerTC/")
runCommand("pip3 install --editable \".[data]\"")
// container stay-alive
defaultCommand('tail', '-f', '/dev/null')
}
}

def javaContainerServices = [
"trafficCaptureProxyServer": "capture_proxy",
"trafficReplayer": "traffic_replayer"
Expand All @@ -101,7 +47,7 @@ javaContainerServices.each { projectName, dockerImageName ->
CommonUtils.createDockerfile(project, projectName, baseImageProjectOverrides, dockerFilesForExternalServices)
}

(javaContainerServices + trafficComparatorServices).forEach { projectName, dockerImageName ->
javaContainerServices.forEach { projectName, dockerImageName ->
def dockerBuildDir = "build/docker/${projectName}"
task "buildDockerImage_${projectName}"(type: DockerBuildImage) {
dependsOn "createDockerfile_${projectName}"
Expand All @@ -112,11 +58,6 @@ javaContainerServices.each { projectName, dockerImageName ->
}

dockerCompose {
String overrideTrafficComparatorDirectory = System.getenv(TRAFFIC_COMPARATOR_DIRECTORY_ENV)
if (overrideTrafficComparatorDirectory == null) {
environment.put(TRAFFIC_COMPARATOR_DIRECTORY_ENV, REALIZED_TRAFFIC_COMPARATOR_DIRECTORY)
exposeAsEnvironment(this)
}
useComposeFiles = project.hasProperty('multiProxy') ?
['src/main/docker/docker-compose.yml', 'src/main/docker/docker-compose-multi.yml'] :
['src/main/docker/docker-compose.yml', 'src/main/docker/docker-compose-single.yml']
Expand All @@ -128,10 +69,7 @@ task buildDockerImages {

dependsOn buildDockerImage_trafficCaptureProxyServer
dependsOn buildDockerImage_trafficReplayer
dependsOn buildDockerImage_trafficComparator
dependsOn buildDockerImage_jupyterNotebook
}

tasks.getByName('composeUp')
.dependsOn(tasks.getByName('buildDockerImages'))
.dependsOn(tasks.getByName('cloneComparatorRepoIfNeeded'))
31 changes: 1 addition & 30 deletions TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
trafficcomparator:
condition: service_healthy
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46YWRtaW4= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group | nc trafficcomparator 9220"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46YWRtaW4= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id default-logging-group"

opensearchtarget:
image: 'opensearchproject/opensearch:latest'
Expand All @@ -56,33 +54,6 @@ services:
ports:
- "29200:9200"

trafficcomparator:
image: 'migrations/traffic_comparator:latest'
networks:
- migrations
ports:
- "9220:9220"
healthcheck:
test: "lsof -i -P -n"
volumes:
- ${TRAFFIC_COMPARATOR_DIRECTORY}:/trafficComparator
- sharedComparatorSqlResults:/shared
command: /bin/sh -c "cd trafficComparator && pip3 install --editable . && nc -v -l -p 9220 | tee /dev/stderr | trafficcomparator -vv stream | trafficcomparator dump-to-sqlite --db /shared/comparisons.db"

jupyter-notebook:
image: 'migrations/jupyter_notebook:latest'
networks:
- migrations
ports:
- "8888:8888"
volumes:
- ${TRAFFIC_COMPARATOR_DIRECTORY}:/trafficComparator
- sharedComparatorSqlResults:/shared
environment:
# this needs to match the output db that traffic_comparator writes to
- COMPARISONS_DB_LOCATION=/shared/comparisons.db
command: /bin/sh -c 'cd trafficComparator && pip3 install --editable ".[data]" && jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser --allow-root'

migration-console:
image: 'migrations/migration_console:latest'
networks:
Expand Down
81 changes: 81 additions & 0 deletions TrafficCapture/trafficCaptureProxyServer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Capture Proxy

## How to attach a Capture Proxy on a coordinator node.

Follow documentation for [deploying solution](../../deployment/README.md). Then, on a cluster with at least two coordinator nodes, the user can attach a Capture Proxy on a node by following these steps:
Please note that this is one method for installing the Capture Proxy on a node, and that these steps may vary depending on your environment.


These are the **prerequisites** to being able to attach the Capture Proxy:

* **Make sure that your MSK client is accessible by the coordinator nodes in the cluster*
* Add the following IAM policy to the node/EC2 instance so that it’s able to store the captured traffic in Kafka:
* From the AWS Console, go to the EC2 instance page, click on **IAM Role**, click on **Add permissions**, choose **Create inline policy**, click on **JSON VIEW** then add the following policy (replace region and account-id).

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "kafka-cluster:Connect",
"Resource": "arn:aws:kafka:<region>:<account-id>:cluster/migration-msk-cluster-<stage>/*",
"Effect": "Allow"
},
{
"Action": [
"kafka-cluster:CreateTopic",
"kafka-cluster:DescribeTopic",
"kafka-cluster:WriteData"
],
"Resource": "arn:aws:kafka:<region>:<account-id>:topic/migration-msk-cluster-<stage>/*",
"Effect": "Allow"
}
]
}
```

* **Verify Java installation is accessible**.
* From linux command line of that EC2 instance, Check that the JAVA_HOME environment variable is set properly `(echo $JAVA_HOME)`, if not, then try running the following command that might help set it correctly:

`JAVA_HOME=$(dirname "$(dirname "$(type -p java)")")`
* If that doesn’t work, then find the java directory on your node and set it as $JAVA_HOME

### Follow these steps to attach a Capture Proxy on the node.

1. **Log in to one of the coordinator nodes** for command line access.
2. **Update node’s port setting**.
1. **Update elasticsearch.yml/opensearch.yml**. Add this line to the node’s config file:
http.port: 19200
3. **Restart Elasticsearch/OpenSearch process** so that the process will bind to the newly configured port. For example, if systemctl is available on your linux distribution you can run the following (Note: depending on your installation of Elasticsearch, these methods may not work for you)
1. `sudo systemctl restart elasticsearch.service`

4. **Verify process is bound to new port**. Run netstat -tapn to see if the new port is being listened on.
If the new port is not there, then there is a chance that Elasticsearch/ OpenSearch is not running, in that case, you must start the process again. (Depending on your setup, restarting/starting the Elasticsearch process may differ)
5. **Test the new port** by sending any kind of traffic or request, e.g; curl https://localhost:19200 or http://
6. **Download Capture Proxy**:
1. Go to the Opensearch Migrations latest releases page: https://github.com/opensearch-project/opensearch-migrations/releases/latest
2. Copy the link for the Capture Proxy tar file, mind your instance’s architecture.
3. `curl -L0 <capture-proxy-tar-file-link> --output CaptureProxyX64.tar.gz`
4. Unpack solution tarball: `tar -xvf CaptureProxyX64.tar.gz`
5. `cd CaptureProxyX64/bin`
7. **Running the Capture Proxy**:
1. `nohup ./CaptureProxyX64 --kafkaConnection <msk-endpoint> --destinationUri http://localhost:19200 —listenPort 9200 —enableMSKAuth --insecureDestination &`

**Explanation of parameters** in the command above:

* **--kafkaConnection**: your MSK client endpoint.
* **--destinationUri**: URI of the server that the Capture Proxy is capturing traffic for.
* **--listenPort**: Exposed port for clients to connect to this proxy. (The original port that the node was listening to)
* **--enableMSKAuth**: Enables SASL Kafka properties required for connecting to MSK with IAM auth.
* **--insecureDestination**: Do not check the destination server’s certificate.

8. **Test the port** that the Capture Proxy is now listening to.
1. `curl https://localhost:9200` or `http://`
2. You should expect the same response when sending a request to either ports (9200, 19200), except that the traffic sent to the port that the Capture Proxy is listening to, will be captured and sent to your MSK Client, also forwarded to the new Elasticsearch port.
9. **Verify requests are sent to Kafka**
* **Verify that a new topic has been created**
1. Log in to the Migration Console container.
2. Go the Kafka tools directory
cd kafka-tools/kafka/bin
3. Run the following command to list the Kafka topics, and confirm that a new topic was created.
`./kafka-topics.sh --bootstrap-server "$MIGRATION_KAFKA_BROKER_ENDPOINTS" --list --command-config ../../aws/msk-iam-auth.properties`
2 changes: 1 addition & 1 deletion TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# HTTP Traffic Replayer

This package consumes streams of IP packets that were previously recorded and replays the requests to another HTTP
server, recording the packet traffic of the new interactions for future analysis (see the Comparator tools).
server, recording the packet traffic of the new interactions for future analysis.

## Overview

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.migrations.replay.datatypes.UniqueSourceRequestKey;

import java.io.IOException;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
Expand All @@ -27,6 +26,7 @@

@Slf4j
public class SourceTargetCaptureTuple implements AutoCloseable {
public static final String OUTPUT_TUPLE_JSON_LOGGER = "OutputTupleJsonLogger";
final UniqueSourceRequestKey uniqueRequestKey;
final RequestResponsePacketPair sourcePair;
final TransformedPackets targetRequestData;
Expand Down Expand Up @@ -57,11 +57,9 @@ public void close() {
}

public static class TupleToStreamConsumer implements Consumer<SourceTargetCaptureTuple> {
OutputStream outputStream;
Logger tupleLogger = LogManager.getLogger("OutputTupleJsonLogger");
Logger tupleLogger = LogManager.getLogger(OUTPUT_TUPLE_JSON_LOGGER);

public TupleToStreamConsumer(OutputStream outputStream){
this.outputStream = outputStream;
public TupleToStreamConsumer() {
}

private JSONObject jsonFromHttpDataUnsafe(List<byte[]> data) throws IOException {
Expand Down Expand Up @@ -175,10 +173,7 @@ private JSONObject toJSONObject(SourceTargetCaptureTuple tuple) {
@SneakyThrows
public void accept(SourceTargetCaptureTuple triple) {
JSONObject jsonObject = toJSONObject(triple);

tupleLogger.info(()->jsonObject.toString());
outputStream.write((jsonObject.toString()+"\n").getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@
import software.amazon.awssdk.regions.Region;

import javax.net.ssl.SSLException;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ref.WeakReference;
import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -237,12 +234,6 @@ static class Parameters {
arity=1,
description = "input file to read the request/response traces for the source cluster")
String inputFilename;
@Parameter(required = false,
names = {"-o", "--output"},
arity=1,
description = "output file to hold the request/response traces for the source and target cluster")
String outputFilename;

@Parameter(required = false,
names = {"-t", PACKET_TIMEOUT_SECONDS_PARAMETER_NAME},
arity = 1,
Expand Down Expand Up @@ -338,17 +329,14 @@ public static void main(String[] args)
return;
}

try (OutputStream outputStream = params.outputFilename == null ? System.out :
new FileOutputStream(params.outputFilename, true);
var bufferedOutputStream = new BufferedOutputStream(outputStream);
var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(params,
try (var blockingTrafficSource = TrafficCaptureSourceFactory.createTrafficCaptureSource(params,
Duration.ofSeconds(params.lookaheadTimeSeconds));
var authTransformer = buildAuthTransformerFactory(params))
{
var tr = new TrafficReplayer(uri, params.transformerConfig, authTransformer,
params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests);
setupShutdownHookForReplayer(tr);
var tupleWriter = new SourceTargetCaptureTuple.TupleToStreamConsumer(bufferedOutputStream);
var tupleWriter = new SourceTargetCaptureTuple.TupleToStreamConsumer();
var timeShifter = new TimeShifter(params.speedupFactor);
tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(params.observedPacketConnectionTimeout),
blockingTrafficSource, timeShifter, tupleWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* transformation and the headers present (such as for gzipped or chunked encodings).
*
* There will be a number of reasons that we need to reuse the source captured packets - both today
* (for the output to the comparator) and in the future (for retrying transient network errors or
* (for analysing and comparing) and in the future (for retrying transient network errors or
* transformation errors). With that in mind, the HttpJsonTransformer now keeps track of all of
* the ByteBufs passed into it and can redrive them through the underlying network packet handler.
* Cases where that would happen with this edit are where the payload wasn't being modified, nor
Expand Down Expand Up @@ -58,8 +58,8 @@ public class HttpJsonTransformingConsumer<R> implements IPacketFinalizingConsume
private final List<List<Integer>> chunkSizes;
// This is here for recovery, in case anything goes wrong with a transformation & we want to
// just dump it directly. Notice that we're already storing all of the bytes until the response
// comes back so that we can format the output that goes to the comparator. These should be
// backed by the exact same byte[] arrays, so the memory consumption should already be absorbed.
// comes back so that we can format the output. These should be backed by the exact same
// byte[] arrays, so the memory consumption should already be absorbed.
private final List<ByteBuf> chunks;

public HttpJsonTransformingConsumer(IJsonTransformer transformer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void close() {

@Override
public String toString() {
if (data == null) {
if (isClosed()) {
return "CLOSED";
}
return new StringJoiner(", ", TransformedPackets.class.getSimpleName() + "[", "]")
Expand Down
Loading

0 comments on commit eeededc

Please sign in to comment.