diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index 27fb8476dee..6a34264283d 100644 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -67,7 +67,7 @@ jobs: test-e2e-encap: name: E2e tests on a Kind cluster on Linux - needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + needs: [build-antrea-coverage-image] runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -84,7 +84,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -124,7 +123,7 @@ jobs: test-e2e-encap-no-proxy: name: E2e tests on a Kind cluster on Linux with AntreaProxy disabled - needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + needs: [build-antrea-coverage-image] runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -141,7 +140,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -181,7 +179,7 @@ jobs: test-e2e-encap-proxy-all: name: E2e tests on a Kind cluster on Linux with AntreaProxy all Service support - needs: [ build-antrea-coverage-image, build-flow-aggregator-coverage-image ] + needs: [ build-antrea-coverage-image ] runs-on: [ ubuntu-latest ] steps: - name: Free disk space @@ -198,7 +196,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -238,7 +235,7 @@ jobs: test-e2e-noencap: name: E2e tests on a Kind cluster on Linux (noEncap) - needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + needs: [build-antrea-coverage-image] runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -255,7 +252,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -295,7 +291,7 @@ jobs: test-e2e-hybrid: name: E2e tests on a Kind cluster on Linux (hybrid) - needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + needs: [build-antrea-coverage-image] runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -312,7 +308,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -355,7 +350,7 @@ jobs: # test uses a Geneve overlay. test-e2e-encap-no-np: name: E2e tests on a Kind cluster on Linux with Antrea-native policies disabled - needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + needs: [build-antrea-coverage-image] runs-on: [ubuntu-latest] steps: - name: Free disk space @@ -372,7 +367,6 @@ jobs: - name: Load Antrea image run: | docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar - docker load -i flow-aggregator-cov/flow-aggregator.tar - name: Install Kind run: | curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 @@ -410,6 +404,63 @@ jobs: path: log.tar.gz retention-days: 30 + test-e2e-flow-visibility: + name: E2e tests on a Kind cluster on Linux for Flow Visibility + needs: [build-antrea-coverage-image, build-flow-aggregator-coverage-image] + runs-on: [ubuntu-latest] + steps: + - name: Free disk space + # https://github.com/actions/virtual-environments/issues/709 + run: | + sudo apt-get clean + df -h + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version: 1.17 + - name: Download Antrea images from previous jobs + uses: actions/download-artifact@v3 + - name: Load Antrea image + run: | + docker load -i antrea-ubuntu-cov/antrea-ubuntu.tar + docker load -i flow-aggregator-cov/flow-aggregator.tar + - name: Install Kind + run: | + curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 + chmod +x ./kind + sudo mv kind /usr/local/bin + - name: Run e2e tests + run: | + mkdir log + mkdir test-e2e-fa-coverage + ANTREA_LOG_DIR=$PWD/log ANTREA_COV_DIR=$PWD/test-e2e-fa-coverage ./ci/kind/test-e2e-kind.sh --encap-mode encap --coverage --flow-visibility + - name: Tar coverage files + run: tar -czf test-e2e-fa-coverage.tar.gz test-e2e-fa-coverage + - name: Upload coverage for test-e2e-fa-coverage + uses: actions/upload-artifact@v3 + with: + name: test-e2e-fa-coverage + path: test-e2e-fa-coverage.tar.gz + retention-days: 30 + - name: Codecov + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + file: '*.cov.out*' + flags: kind-e2e-tests + name: codecov-test-e2e-fa + directory: test-e2e-fa-coverage + - name: Tar log files + if: ${{ failure() }} + run: tar -czf log.tar.gz log + - name: Upload test log + uses: actions/upload-artifact@v3 + if: ${{ failure() }} + with: + name: e2e-kind-fa.tar.gz + path: log.tar.gz + retention-days: 30 + test-netpol-tmp: name: Run experimental network policy tests (netpol) on Kind cluster needs: build-antrea-coverage-image diff --git a/build/yamls/flow-visibility/base/kustomization-e2e.yml b/build/yamls/flow-visibility/base/kustomization-e2e.yml new file mode 100644 index 00000000000..2511fbc288b --- /dev/null +++ b/build/yamls/flow-visibility/base/kustomization-e2e.yml @@ -0,0 +1,26 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namespace: flow-visibility + +resources: + - clickhouse.yml + +configMapGenerator: + - name: clickhouse-mounted-configmap + namespace: flow-visibility + files: + - provisioning/datasources/create_table.sh + +# CLICKHOUSE_CONFIG_MAP_NAME exports the value in `metadata.name` from `ConfigMap` named `clickhouse-mounted-configmap`, +# which is used for inserting the value to a CRD for an object of kind `ClickHouseInstallation` +vars: + - name: CLICKHOUSE_CONFIG_MAP_NAME + objref: + kind: ConfigMap + name: clickhouse-mounted-configmap + apiVersion: v1 + fieldref: + fieldpath: metadata.name + +configurations: + - kustomize-config.yml diff --git a/build/yamls/flow-visibility/patches/e2e/imagePullPolicyClickhouse.yml b/build/yamls/flow-visibility/patches/e2e/imagePullPolicyClickhouse.yml new file mode 100644 index 00000000000..484649ec385 --- /dev/null +++ b/build/yamls/flow-visibility/patches/e2e/imagePullPolicyClickhouse.yml @@ -0,0 +1,6 @@ +- op: add + path: /spec/templates/podTemplates/0/spec/containers/0/imagePullPolicy + value: IfNotPresent +- op: add + path: /spec/templates/podTemplates/0/spec/containers/1/imagePullPolicy + value: IfNotPresent diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 6d9d7e94337..229f68ed585 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -29,6 +29,7 @@ _usage="Usage: $0 [--encap-mode ] [--ip-family ] [--no-proxy] [--np --proxy-all Enables Antrea proxy with all Service support. --endpointslice Enables Antrea proxy and EndpointSlice support. --no-np Disables Antrea-native policies. + --flow-visibility Only run flow visibility related e2e tests. --skip A comma-separated list of keywords, with which tests should be skipped. --coverage Enables measure Antrea code coverage when run e2e tests on kind. --setup-only Only perform setting up the cluster and run test. @@ -45,6 +46,9 @@ function print_usage { TESTBED_CMD=$(dirname $0)"/kind-setup.sh" YML_CMD=$(dirname $0)"/../../hack/generate-manifest.sh" FLOWAGGREGATOR_YML_CMD=$(dirname $0)"/../../hack/generate-manifest-flow-aggregator.sh" +FLOW_VISIBILITY_CMD=$(dirname $0)"/../../hack/generate-manifest-flow-visibility.sh --mode e2e" +FLOW_VISIBILITY_HELM_VALUES=$(dirname $0)"/values-flow-exporter.yml" +CH_OPERATOR_YML=$(dirname $0)"/../../build/yamls/clickhouse-operator-install-bundle.yml" function quit { result=$? @@ -61,6 +65,7 @@ proxy=true proxy_all=false endpointslice=false np=true +flow_visibility=false coverage=false skiplist="" setup_only=false @@ -91,6 +96,10 @@ case $key in np=false shift ;; + --flow-visibility) + flow_visibility=true + shift + ;; --skip) skiplist="$2" shift 2 @@ -150,18 +159,20 @@ fi if ! $np; then manifest_args="$manifest_args --no-np" fi +if $flow_visibility; then + manifest_args="$manifest_args --flow-exporter --extra-helm-values-file $FLOW_VISIBILITY_HELM_VALUES" +fi COMMON_IMAGES_LIST=("k8s.gcr.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/library/busybox" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ - "projects.registry.vmware.com/antrea/perftool" \ - "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.12") -for image in "${COMMON_IMAGES_LIST[@]}"; do - for i in `seq 3`; do - docker pull $image && break - sleep 1 - done -done + "projects.registry.vmware.com/antrea/perftool") + +FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.5.12" \ + "projects.registry.vmware.com/antrea/flow-visibility-clickhouse-operator:0.18.2" \ + "projects.registry.vmware.com/antrea/flow-visibility-metrics-exporter:0.18.2" \ + "projects.registry.vmware.com/antrea/flow-visibility-clickhouse-server:21.11" \ + "projects.registry.vmware.com/antrea/flow-visibility-clickhouse-monitor:latest") if $coverage; then manifest_args="$manifest_args --coverage" COMMON_IMAGES_LIST+=("antrea/antrea-ubuntu-coverage:latest") @@ -173,6 +184,20 @@ fi if $proxy_all; then COMMON_IMAGES_LIST+=("k8s.gcr.io/echoserver:1.10") fi +if $flow_visibility; then + COMMON_IMAGES_LIST+=("${FLOW_VISIBILITY_IMAGE_LIST[@]}") + if $coverage; then + COMMON_IMAGES_LIST+=("antrea/flow-aggregator-coverage:latest") + else + COMMON_IMAGES_LIST+=("projects.registry.vmware.com/antrea/flow-aggregator:latest") + fi +fi +for image in "${COMMON_IMAGES_LIST[@]}"; do + for i in `seq 3`; do + docker pull $image && break + sleep 1 + done +done printf -v COMMON_IMAGES "%s " "${COMMON_IMAGES_LIST[@]}" @@ -195,16 +220,32 @@ function setup_cluster { function run_test { current_mode=$1 + coverage_args="" + flow_visibility_args="" if $coverage; then $YML_CMD --encap-mode $current_mode $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea-coverage.yml $YML_CMD --ipsec $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea-ipsec-coverage.yml - $FLOWAGGREGATOR_YML_CMD --coverage | docker exec -i kind-control-plane dd of=/root/flow-aggregator-coverage.yml + timeout="80m" + coverage_args="--coverage --coverage-dir $ANTREA_COV_DIR" else $YML_CMD --encap-mode $current_mode $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea.yml $YML_CMD --ipsec $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea-ipsec.yml - $FLOWAGGREGATOR_YML_CMD | docker exec -i kind-control-plane dd of=/root/flow-aggregator.yml + timeout="75m" fi + + if $flow_visibility; then + timeout="10m" + flow_visibility_args="-run=TestFlowAggregator --flow-visibility" + if $coverage; then + $FLOWAGGREGATOR_YML_CMD --coverage | docker exec -i kind-control-plane dd of=/root/flow-aggregator-coverage.yml + else + $FLOWAGGREGATOR_YML_CMD | docker exec -i kind-control-plane dd of=/root/flow-aggregator.yml + fi + $FLOW_VISIBILITY_CMD | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml + cat $CH_OPERATOR_YML | docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yml + fi + if $proxy_all; then apiserver=$(docker exec -i kind-control-plane kubectl get endpoints kubernetes --no-headers | awk '{print $2}') if $coverage; then @@ -215,11 +256,7 @@ function run_test { fi sleep 1 - if $coverage; then - go test -v -timeout=80m antrea.io/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --coverage --coverage-dir $ANTREA_COV_DIR --skip=$skiplist - else - go test -v -timeout=75m antrea.io/antrea/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist - fi + go test -v -timeout=$timeout antrea.io/antrea/test/e2e $flow_visibility_args -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist $coverage_args } if [[ "$mode" == "" ]] || [[ "$mode" == "encap" ]]; then diff --git a/ci/kind/values-flow-exporter.yml b/ci/kind/values-flow-exporter.yml new file mode 100644 index 00000000000..ff606cd67c3 --- /dev/null +++ b/ci/kind/values-flow-exporter.yml @@ -0,0 +1,4 @@ +flowCollector: + flowPollInterval: "1s" + activeFlowExportTimeout: "2s" + idleFlowExportTimeout: "1s" diff --git a/hack/generate-manifest-flow-visibility.sh b/hack/generate-manifest-flow-visibility.sh index bcb11af965e..ddd312a89fa 100755 --- a/hack/generate-manifest-flow-visibility.sh +++ b/hack/generate-manifest-flow-visibility.sh @@ -20,11 +20,15 @@ function echoerr { >&2 echo "$@" } -_usage="Usage: $0 [--mode (dev|release)] [--keep] [--help|-h] +_usage="Usage: $0 [--mode (dev|release|e2e)] [--keep] [--help|-h] Generate a YAML manifest for the Clickhouse-Grafana Flow-visibility Solution, using Kustomize, and print it to stdout. - --mode (dev|release) Choose the configuration variant that you need (default is 'dev') - --keep Debug flag which will preserve the generated kustomization.yml + --mode (dev|release|e2e) Choose the configuration variant that you need (default is 'dev') + e2e mode generates YAML manifest for e2e test, which includes + clickhouse operator and server with default credentials, + but not Grafana-related functionality + --keep Debug flag which will preserve the generated kustomization.yml + --help, -h Print this message and exit This tool uses kustomize (https://github.com/kubernetes-sigs/kustomize) to generate manifests for Clickhouse-Grafana Flow-visibility Solution. You can set the KUSTOMIZE environment variable to the @@ -66,7 +70,7 @@ case $key in esac done -if [ "$MODE" != "dev" ] && [ "$MODE" != "release" ]; then +if [ "$MODE" != "dev" ] && [ "$MODE" != "release" ] && [ "$MODE" != "e2e" ]; then echoerr "--mode must be one of 'dev' or 'release'" print_help exit 1 @@ -106,10 +110,24 @@ BASE=../../base mkdir $MODE && cd $MODE touch kustomization.yml -$KUSTOMIZE edit add base $BASE # ../../patches/$MODE may be empty so we use find and not simply cp find ../../patches/$MODE -name \*.yml -exec cp {} . \; +if [ "$MODE" == "e2e" ]; then + mkdir -p base/provisioning/datasources + cp $KUSTOMIZATION_DIR/base/clickhouse.yml base/clickhouse.yml + cp $KUSTOMIZATION_DIR/base/kustomization-e2e.yml base/kustomization.yml + cp $KUSTOMIZATION_DIR/base/kustomize-config.yml base/kustomize-config.yml + cp $KUSTOMIZATION_DIR/base/provisioning/datasources/create_table.sh base/provisioning/datasources/create_table.sh + cp $KUSTOMIZATION_DIR/../clickhouse-operator-install-bundle.yml clickhouse-operator-install-bundle.yml + + $KUSTOMIZE edit add base base + $KUSTOMIZE edit set image flow-visibility-clickhouse-monitor=projects.registry.vmware.com/antrea/flow-visibility-clickhouse-monitor:latest + $KUSTOMIZE edit add patch --path imagePullPolicyClickhouse.yml --group clickhouse.altinity.com --version v1 --kind ClickHouseInstallation --name clickhouse +else + $KUSTOMIZE edit add base $BASE +fi + if [ "$MODE" == "dev" ]; then $KUSTOMIZE edit set image flow-visibility-clickhouse-monitor=projects.registry.vmware.com/antrea/flow-visibility-clickhouse-monitor:latest $KUSTOMIZE edit add patch --path imagePullPolicy.yml --group clickhouse.altinity.com --version v1 --kind ClickHouseInstallation --name clickhouse @@ -123,7 +141,6 @@ $KUSTOMIZE build popd > /dev/null - if $KEEP; then echoerr "Kustomization file is at $TMP_DIR/$MODE/kustomization.yml" else diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index fb584c196d1..521a9663dc9 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -30,6 +30,7 @@ Generate a YAML manifest for Antrea using Helm and print it to stdout. --no-proxy Generate a manifest with Antrea proxy disabled --proxy-all Generate a manifest with Antrea proxy with all Service support enabled --endpointslice Generate a manifest with EndpointSlice support enabled + --flow-exporter Generate a manifest with FlowExporter support enabled --no-np Generate a manifest with Antrea-native policies disabled --tun (geneve|vxlan|gre|stt) Choose encap tunnel type from geneve, gre, stt and vxlan (default is geneve) --verbose-log Generate a manifest with increased log-level (level 4) for Antrea agent and controller. @@ -46,6 +47,7 @@ Generate a YAML manifest for Antrea using Helm and print it to stdout. --help, -h Print this message and exit --multicast Generates a manifest for multicast. --multicast-interfaces Multicast interface names (default is empty) + --extra-helm-values-file Optional extra helm values file to override the default config values In 'release' mode, environment variables IMG_NAME and IMG_TAG must be set. @@ -70,6 +72,7 @@ ALLFEATURES=false PROXY=true PROXY_ALL=false ENDPOINTSLICE=false +FLOW_EXPORTER=false NP=true KEEP=false ENCAP_MODE="" @@ -87,6 +90,7 @@ WHEREABOUTS=false FLEXIBLE_IPAM=false MULTICAST=false MULTICAST_INTERFACES="" +HELM_VALUES_FILES=() while [[ $# -gt 0 ]] do @@ -131,6 +135,10 @@ case $key in ENDPOINTSLICE=true shift ;; + --flow-exporter) + FLOW_EXPORTER=true + shift + ;; --no-np) NP=false shift @@ -193,6 +201,14 @@ case $key in MULTICAST_INTERFACES="$2" shift 2 ;; + --extra-helm-values-file) + if [[ ! -f "$2" ]]; then + echoerr "Helm values file $2 does not exist." + exit 1 + fi + HELM_VALUES_FILES=("$2") + shift 2 + ;; -h|--help) print_usage exit 0 @@ -281,7 +297,6 @@ fi TMP_DIR=$(mktemp -d $THIS_DIR/../build/yamls/chart-values.XXXXXXXX) HELM_VALUES=() -HELM_VALUES_FILES=() if $IPSEC; then HELM_VALUES+=("trafficEncryptionMode=ipsec" "tunnelType=gre") @@ -311,6 +326,10 @@ if $ENDPOINTSLICE; then HELM_VALUES+=("featureGates.EndpointSlice=true") fi +if $FLOW_EXPORTER; then + HELM_VALUES+=("featureGates.FlowExporter=true") +fi + if ! $NP; then HELM_VALUES+=("featureGates.AntreaPolicy=false") fi diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index fda353c0afb..f679ed1bf54 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -49,6 +49,12 @@ func skipIfAntreaIPAMTest(tb testing.TB) { } } +func skipIfNotFlowVisibilityTest(tb testing.TB) { + if !testOptions.flowVisibility { + tb.Skipf("Skipping when not running flow visibility test") + } +} + func skipIfNamespaceIsNotEqual(tb testing.TB, actualNamespace, expectNamespace string) { if actualNamespace != expectNamespace { tb.Skipf("Skipping test when namespace is not: %s", expectNamespace) @@ -213,7 +219,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) { +func setupTestForFlowAggregator(tb testing.TB) (*TestData, bool, bool, error) { v4Enabled := clusterInfo.podV4NetworkCIDR != "" v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) @@ -237,17 +243,15 @@ func setupTestWithIPFIXCollector(tb testing.TB) (*TestData, bool, bool, error) { } ipfixCollectorAddr := fmt.Sprintf("%s:tcp", net.JoinHostPort(ipStr, ipfixCollectorPort)) - tb.Logf("Applying flow aggregator YAML with ipfix collector address: %s", ipfixCollectorAddr) - if err := testData.deployFlowAggregator(ipfixCollectorAddr); err != nil { - return testData, v4Enabled, v6Enabled, err - } - tb.Logf("Enabling flow exporter in Antrea Agent") - if err = testData.enableAntreaFlowExporter(""); err != nil { + tb.Logf("Deploying ClickHouse") + chSvcIP, err := testData.deployFlowVisibilityClickHouse() + if err != nil { return testData, v4Enabled, v6Enabled, err } - - tb.Logf("Checking CoreDNS deployment") - if err = testData.checkCoreDNSPods(defaultTimeout); err != nil { + tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) + tb.Logf("Applying flow aggregator YAML with ipfix collector: %s and clickHouse enabled", + ipfixCollectorAddr) + if err := testData.deployFlowAggregator(ipfixCollectorAddr); err != nil { return testData, v4Enabled, v6Enabled, err } return testData, v4Enabled, v6Enabled, nil @@ -323,6 +327,12 @@ func exportLogs(tb testing.TB, data *TestData, logsSubDir string, writeNodeLogs // dump the logs for flow-aggregator Pods to disk. data.forAllMatchingPodsInNamespace("", flowAggregatorNamespace, writePodLogs) + // dump the logs for flow-visibility Pods to disk. + data.forAllMatchingPodsInNamespace("", flowVisibilityNamespace, writePodLogs) + + // dump the logs for clickhouse operator Pods to disk. + data.forAllMatchingPodsInNamespace("app=clickhouse-operator", kubeNamespace, writePodLogs) + // dump the output of "kubectl describe" for Antrea pods to disk. data.forAllMatchingPodsInNamespace("app=antrea", antreaNamespace, func(nodeName, podName, nsName string) error { w := getPodWriter(nodeName, podName, "describe") @@ -390,6 +400,14 @@ func teardownFlowAggregator(tb testing.TB, data *TestData) { if err := data.DeleteNamespace(flowAggregatorNamespace, defaultTimeout); err != nil { tb.Logf("Error when tearing down flow aggregator: %v", err) } + + tb.Logf("Deleting '%s' K8s Namespace and ClickHouse Operator", flowVisibilityNamespace) + if err := data.DeleteNamespace(flowVisibilityNamespace, defaultTimeout); err != nil { + tb.Logf("Error when tearing down flow visibility: %v", err) + } + if err := data.deleteClickHouseOperator(); err != nil { + tb.Logf("Error when removing ClickHouse Operator: %v", err) + } } func teardownTest(tb testing.TB, data *TestData) { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 950015c6d66..33a33da06c8 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -127,6 +127,7 @@ const ( egressAntreaNetworkPolicyName = "test-flow-aggregator-antrea-networkpolicy-egress" testIngressRuleName = "test-ingress-rule-name" testEgressRuleName = "test-egress-rule-name" + clickHousePodName = "chi-clickhouse-clickhouse-0-0-0" iperfTimeSec = 12 protocolIdentifierTCP = 6 // Set target bandwidth(bits/sec) of iPerf traffic to a relatively small value @@ -152,17 +153,15 @@ type testFlow struct { } func TestFlowAggregator(t *testing.T) { + skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) - data, v4Enabled, v6Enabled, err := setupTestWithIPFIXCollector(t) + data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t) if err != nil { t.Fatalf("Error when setting up test: %v", err) } defer func() { teardownTest(t, data) - if err := data.disableAntreaFlowExporter(); err != nil { - t.Errorf("Failed to disable flow exporter in Antrea Agent: %v", err) - } // Execute teardownFlowAggregator later than teardownTest to ensure that the log // of Flow Aggregator has been exported. teardownFlowAggregator(t, data) @@ -621,13 +620,18 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri t.Fatalf("Unit of the traffic bandwidth reported by iperf should be Mbits.") } + checkRecordsForFlowsCollector(t, data, srcIP, dstIP, srcPort, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) + checkRecordsForFlowsClickHouse(t, data, srcIP, dstIP, srcPort, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy, bandwidthInMbps) +} + +func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data) // Iterate over recordSlices and build some results to test with expected results dataRecordsCount := 0 src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { // Check the source port along with source and destination IPs as there - // are flow records for control flows during the iperf with same IPs + // are flow records for control flows during the iperf with same IPs // and destination port. if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { dataRecordsCount = dataRecordsCount + 1 @@ -697,6 +701,76 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } +func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64) { + // Check the source port along with source and destination IPs as there + // are flow records for control flows during the iperf with same IPs + // and destination port. + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, srcPort, checkService, true) + + for _, record := range clickHouseRecords { + // Check if record has both Pod name of source and destination Pod. + if isIntraNode { + checkPodAndNodeDataClickHouse(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName()) + checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeDataClickHouse(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1)) + checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if checkService { + if isIntraNode { + assert.Contains(record.DestinationServicePortName, "antrea-test/perftest-b", "Record with ServiceIP does not have Service name") + } else { + assert.Contains(record.DestinationServicePortName, "antrea-test/perftest-c", "Record with ServiceIP does not have Service name") + } + } + if checkK8sNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Equal(record.IngressNetworkPolicyName, ingressAllowNetworkPolicyName, "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Equal(record.IngressNetworkPolicyNamespace, testNamespace, "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Equal(record.IngressNetworkPolicyType, ipfixregistry.PolicyTypeK8sNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Equal(record.EgressNetworkPolicyName, egressAllowNetworkPolicyName, "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Equal(record.EgressNetworkPolicyNamespace, testNamespace, "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Equal(record.EgressNetworkPolicyType, ipfixregistry.PolicyTypeK8sNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the egress rule") + } + if checkAntreaNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Equal(record.IngressNetworkPolicyName, ingressAntreaNetworkPolicyName, "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Equal(record.IngressNetworkPolicyNamespace, testNamespace, "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Equal(record.IngressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Equal(record.IngressNetworkPolicyRuleName, testIngressRuleName, "Record does not have the correct NetworkPolicy RuleName with the ingress rule") + assert.Equal(record.IngressNetworkPolicyRuleAction, ipfixregistry.NetworkPolicyRuleActionAllow, "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") + assert.Equal(record.EgressNetworkPolicyName, egressAntreaNetworkPolicyName, "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Equal(record.EgressNetworkPolicyNamespace, testNamespace, "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Equal(record.EgressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the egress rule") + assert.Equal(record.EgressNetworkPolicyRuleName, testEgressRuleName, "Record does not have the correct NetworkPolicy RuleName with the egress rule") + assert.Equal(record.EgressNetworkPolicyRuleAction, ipfixregistry.NetworkPolicyRuleActionAllow, "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + } + + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if record.Throughput > 0 { + flowStartTime := record.FlowStartSeconds.Unix() + exportTime := record.FlowEndSeconds.Unix() + var recBandwidth float64 + // flowEndReason == 3 means the end of flow detected + if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + octetTotalCount := record.OctetTotalCount + recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000 + } else { + // Check bandwidth with the field "throughput" except for the last record, + // as their throughput may be significantly lower than the average Iperf throughput. + throughput := record.Throughput + recBandwidth = float64(throughput) / 1000000 + } + t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, ClickHouse record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and ClickHouse record bandwidth should be lower than 15%%, record: %v", record) + } + + } + // Checking only data records as data records cannot be decoded without template record. + assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) +} + func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool) { var cmd string if !isIPv6 { @@ -715,9 +789,20 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st assert.NotContains(t, record, "octetDeltaCount: 0", "octetDeltaCount should be non-zero") } } + + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false) + for _, record := range clickHouseRecords { + checkPodAndNodeDataClickHouse(t, record, srcPodName, srcNodeName, "", "") + checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) + // Since the OVS userspace conntrack implementation doesn't maintain + // packet or byte counter statistics, skip the check for Kind clusters + if testOptions.providerName != "kind" { + assert.Greater(t, record.OctetDeltaCount, uint64(0), "octetDeltaCount should be non-zero") + } + } } -func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6 bool, isIntraNode bool, isANP bool) { +func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { var cmdStr1, cmdStr2 string if !isIPv6 { cmdStr1 = fmt.Sprintf("iperf3 -c %s -n 1", testFlow1.dstIP) @@ -731,6 +816,11 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 _, _, err = data.RunCommandFromPod(testNamespace, testFlow2.srcPodName, "", []string{"timeout", "2", "bash", "-c", cmdStr2}) assert.Error(t, err) + checkRecordsForDenyFlowsCollector(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) + checkRecordsForDenyFlowsClickHouse(t, data, testFlow1, testFlow2, isIPv6, isIntraNode, isANP) +} + +func checkRecordsForDenyFlowsCollector(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { _, recordSlices1 := getCollectorOutput(t, testFlow1.srcIP, testFlow1.dstIP, "", false, false, isIPv6, data) _, recordSlices2 := getCollectorOutput(t, testFlow2.srcIP, testFlow2.dstIP, "", false, false, isIPv6, data) recordSlices := append(recordSlices1, recordSlices2...) @@ -794,6 +884,61 @@ func checkRecordsForDenyFlows(t *testing.T, data *TestData, testFlow1, testFlow2 } } +func checkRecordsForDenyFlowsClickHouse(t *testing.T, data *TestData, testFlow1, testFlow2 testFlow, isIPv6, isIntraNode, isANP bool) { + clickHouseRecords1 := getClickHouseOutput(t, data, testFlow1.srcIP, testFlow1.dstIP, "", false, false) + clickHouseRecords2 := getClickHouseOutput(t, data, testFlow2.srcIP, testFlow2.dstIP, "", false, false) + recordSlices := append(clickHouseRecords1, clickHouseRecords2...) + // Iterate over recordSlices and build some results to test with expected results + for _, record := range recordSlices { + var srcPodName, dstPodName string + if record.SourceIP == testFlow1.srcIP && record.DestinationIP == testFlow1.dstIP { + srcPodName = testFlow1.srcPodName + dstPodName = testFlow1.dstPodName + } else if record.SourceIP == testFlow2.srcIP && record.DestinationIP == testFlow2.dstIP { + srcPodName = testFlow2.srcPodName + dstPodName = testFlow2.dstPodName + } + + if isIntraNode { + checkPodAndNodeDataClickHouse(t, record, srcPodName, controlPlaneNodeName(), dstPodName, controlPlaneNodeName()) + checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeDataClickHouse(t, record, srcPodName, controlPlaneNodeName(), dstPodName, workerNodeName(1)) + checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if !isANP { // K8s Network Policies + if (record.IngressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionDrop) && (record.IngressNetworkPolicyName != ingressDropANPName) { + assert.Equal(record.DestinationIP, testFlow1.dstIP) + } else if (record.EgressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionDrop) && (record.EgressNetworkPolicyName != egressDropANPName) { + assert.Equal(record.DestinationIP, testFlow2.dstIP) + } + } else { // Antrea Network Policies + if record.IngressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionReject { + assert.Equal(record.IngressNetworkPolicyName, ingressRejectANPName, "Record does not have Antrea NetworkPolicy name with ingress reject rule") + assert.Equal(record.IngressNetworkPolicyNamespace, testNamespace, "Record does not have correct ingressNetworkPolicyNamespace") + assert.Equal(record.IngressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the ingress reject rule") + assert.Equal(record.IngressNetworkPolicyRuleName, testIngressRuleName, "Record does not have the correct NetworkPolicy RuleName with the ingress reject rule") + } else if record.IngressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionDrop { + assert.Equal(record.IngressNetworkPolicyName, ingressDropANPName, "Record does not have Antrea NetworkPolicy name with ingress drop rule") + assert.Equal(record.IngressNetworkPolicyNamespace, testNamespace, "Record does not have correct ingressNetworkPolicyNamespace") + assert.Equal(record.IngressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the ingress drop rule") + assert.Equal(record.IngressNetworkPolicyRuleName, testIngressRuleName, "Record does not have the correct NetworkPolicy RuleName with the ingress drop rule") + } else if record.EgressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionReject { + assert.Equal(record.EgressNetworkPolicyName, egressRejectANPName, "Record does not have Antrea NetworkPolicy name with egress reject rule") + assert.Equal(record.EgressNetworkPolicyNamespace, testNamespace, "Record does not have correct egressNetworkPolicyNamespace") + assert.Equal(record.EgressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the egress reject rule") + assert.Equal(record.EgressNetworkPolicyRuleName, testEgressRuleName, "Record does not have the correct NetworkPolicy RuleName with the egress reject rule") + } else if record.EgressNetworkPolicyRuleAction == ipfixregistry.NetworkPolicyRuleActionDrop { + assert.Equal(record.EgressNetworkPolicyName, egressDropANPName, "Record does not have Antrea NetworkPolicy name with egress drop rule") + assert.Equal(record.EgressNetworkPolicyNamespace, testNamespace, "Record does not have correct egressNetworkPolicyNamespace") + assert.Equal(record.EgressNetworkPolicyType, ipfixregistry.PolicyTypeAntreaNetworkPolicy, "Record does not have the correct NetworkPolicy Type with the egress drop rule") + assert.Equal(record.EgressNetworkPolicyRuleName, testEgressRuleName, "Record does not have the correct NetworkPolicy RuleName with the egress drop rule") + } + } + } +} + func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode string) { assert := assert.New(t) assert.Contains(record, srcPod, "Record with srcIP does not have Pod name: %s", srcPod) @@ -813,10 +958,33 @@ func checkPodAndNodeData(t *testing.T, record, srcPod, srcNode, dstPod, dstNode } } +func checkPodAndNodeDataClickHouse(t *testing.T, record *ClickHouseFullRow, srcPod, srcNode, dstPod, dstNode string) { + assert := assert.New(t) + assert.Equal(record.SourcePodName, srcPod, "Record with srcIP does not have Pod name: %s", srcPod) + assert.Equal(record.SourcePodNamespace, testNamespace, "Record does not have correct sourcePodNamespace: %s", testNamespace) + assert.Equal(record.SourceNodeName, srcNode, "Record does not have correct sourceNodeName: %s", srcNode) + // For Pod-To-External flow type, we send traffic to an external address, + // so we skip the verification of destination Pod info. + // Also, source Pod labels are different for Pod-To-External flow test. + if dstPod != "" { + assert.Equal(record.DestinationPodName, dstPod, "Record with dstIP does not have Pod name: %s", dstPod) + assert.Equal(record.DestinationPodNamespace, testNamespace, "Record does not have correct destinationPodNamespace: %s", testNamespace) + assert.Equal(record.DestinationNodeName, dstNode, "Record does not have correct destinationNodeName: %s", dstNode) + assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"perftool\"}", srcPod), "Record does not have correct label for source Pod") + assert.Equal(record.DestinationPodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"perftool\"}", dstPod), "Record does not have correct label for destination Pod") + } else { + assert.Equal(record.SourcePodLabels, fmt.Sprintf("{\"antrea-e2e\":\"%s\",\"app\":\"busybox\"}", srcPod), "Record does not have correct label for source Pod") + } +} + func checkFlowType(t *testing.T, record string, flowType uint8) { assert.Containsf(t, record, fmt.Sprintf("flowType: %d", flowType), "Record does not have correct flowType") } +func checkFlowTypeClickHouse(t *testing.T, record *ClickHouseFullRow, flowType uint8) { + assert.Equal(t, record.FlowType, flowType, "Record does not have correct flowType") +} + func getUint64FieldFromRecord(t *testing.T, record string, field string) uint64 { if strings.Contains(record, "TEMPLATE SET") { return 0 @@ -873,6 +1041,67 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService return collectorOutput, recordSlices } +// getClickHouseOutput queries clickhouse with built-in client and checks if we have +// received all the expected records for a given flow with source IP, destination IP +// and source port. We send source port to ignore the control flows during the iperf test. +// Polling timeout is coded assuming IPFIX output has been checked first. +func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isDstService, checkAllRecords bool) []*ClickHouseFullRow { + var flowRecords []*ClickHouseFullRow + var queryOutput string + + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + if isDstService { + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + } + if len(srcPort) > 0 { + query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) + } + cmd := []string{ + "clickhouse-client", + "--date_time_output_format=iso", + "--format=JSONEachRow", + fmt.Sprintf("--query=%s", query), + } + // ClickHouse output expected to be checked after IPFIX collector. + // Waiting additional 4x commit interval to be adequate for 3 commit attempts. + err := wait.PollImmediate(500*time.Millisecond, aggregatorClickHouseCommitInterval*4, func() (bool, error) { + queryOutput, _, err := data.RunCommandFromPod(flowVisibilityNamespace, clickHousePodName, "clickhouse", cmd) + if err != nil { + return false, err + } + + rows := strings.Split(queryOutput, "\n") + flowRecords = make([]*ClickHouseFullRow, 0, len(rows)) + for _, row := range rows { + row = strings.TrimSpace(row) + if len(row) == 0 { + continue + } + flowRecord := ClickHouseFullRow{} + err = json.Unmarshal([]byte(row), &flowRecord) + if err != nil { + return false, err + } + flowRecords = append(flowRecords, &flowRecord) + } + + if checkAllRecords { + for _, record := range flowRecords { + flowStartTime := record.FlowStartSeconds.Unix() + exportTime := record.FlowEndSeconds.Unix() + // flowEndReason == 3 means the end of flow detected + if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + return true, nil + } + } + return false, nil + } + return len(flowRecords) > 0, nil + }) + require.NoErrorf(t, err, "ClickHouse did not receive the expected records in query output: %v; query: %s", queryOutput, query) + return flowRecords +} + func getRecordsFromOutput(output string) []string { re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") output = re.ReplaceAllString(output, "") @@ -1147,3 +1376,55 @@ func matchSrcAndDstAddress(srcIP string, dstIP string, isDstService bool, isIPv6 } return srcField, dstField } + +type ClickHouseFullRow struct { + TimeInserted time.Time `json:"timeInserted"` + FlowStartSeconds time.Time `json:"flowStartSeconds"` + FlowEndSeconds time.Time `json:"flowEndSeconds"` + FlowEndSecondsFromSourceNode time.Time `json:"flowEndSecondsFromSourceNode"` + FlowEndSecondsFromDestinationNode time.Time `json:"flowEndSecondsFromDestinationNode"` + FlowEndReason uint8 `json:"flowEndReason"` + SourceIP string `json:"sourceIP"` + DestinationIP string `json:"destinationIP"` + SourceTransportPort uint16 `json:"sourceTransportPort"` + DestinationTransportPort uint16 `json:"destinationTransportPort"` + ProtocolIdentifier uint8 `json:"protocolIdentifier"` + PacketTotalCount uint64 `json:"packetTotalCount,string"` + OctetTotalCount uint64 `json:"octetTotalCount,string"` + PacketDeltaCount uint64 `json:"packetDeltaCount,string"` + OctetDeltaCount uint64 `json:"octetDeltaCount,string"` + ReversePacketTotalCount uint64 `json:"reversePacketTotalCount,string"` + ReverseOctetTotalCount uint64 `json:"reverseOctetTotalCount,string"` + ReversePacketDeltaCount uint64 `json:"reversePacketDeltaCount,string"` + ReverseOctetDeltaCount uint64 `json:"reverseOctetDeltaCount,string"` + SourcePodName string `json:"sourcePodName"` + SourcePodNamespace string `json:"sourcePodNamespace"` + SourceNodeName string `json:"sourceNodeName"` + DestinationPodName string `json:"destinationPodName"` + DestinationPodNamespace string `json:"destinationPodNamespace"` + DestinationNodeName string `json:"destinationNodeName"` + DestinationClusterIP string `json:"destinationClusterIP"` + DestinationServicePort uint16 `json:"destinationServicePort"` + DestinationServicePortName string `json:"destinationServicePortName"` + IngressNetworkPolicyName string `json:"ingressNetworkPolicyName"` + IngressNetworkPolicyNamespace string `json:"ingressNetworkPolicyNamespace"` + IngressNetworkPolicyRuleName string `json:"ingressNetworkPolicyRuleName"` + IngressNetworkPolicyRuleAction uint8 `json:"ingressNetworkPolicyRuleAction"` + IngressNetworkPolicyType uint8 `json:"ingressNetworkPolicyType"` + EgressNetworkPolicyName string `json:"egressNetworkPolicyName"` + EgressNetworkPolicyNamespace string `json:"egressNetworkPolicyNamespace"` + EgressNetworkPolicyRuleName string `json:"egressNetworkPolicyRuleName"` + EgressNetworkPolicyRuleAction uint8 `json:"egressNetworkPolicyRuleAction"` + EgressNetworkPolicyType uint8 `json:"egressNetworkPolicyType"` + TcpState string `json:"tcpState"` + FlowType uint8 `json:"flowType"` + SourcePodLabels string `json:"sourcePodLabels"` + DestinationPodLabels string `json:"destinationPodLabels"` + Throughput uint64 `json:"throughput,string"` + ReverseThroughput uint64 `json:"reverseThroughput,string"` + ThroughputFromSourceNode uint64 `json:"throughputFromSourceNode,string"` + ThroughputFromDestinationNode uint64 `json:"throughputFromDestinationNode,string"` + ReverseThroughputFromSourceNode uint64 `json:"reverseThroughputFromSourceNode,string"` + ReverseThroughputFromDestinationNode uint64 `json:"reverseThroughputFromDestinationNode,string"` + Trusted uint8 `json:"trusted"` +} diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 7111157a38e..e5dc0f213ec 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -99,6 +99,10 @@ const ( antreaIPSecCovYML string = "antrea-ipsec-coverage.yml" flowAggregatorYML string = "flow-aggregator.yml" flowAggregatorCovYML string = "flow-aggregator-coverage.yml" + flowVisibilityYML string = "flow-visibility.yml" + chOperatorYML string = "clickhouse-operator-install-bundle.yml" + flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0" + flowVisibilityNamespace string = "flow-visibility" defaultBridgeName string = "br-int" monitoringNamespace string = "monitoring" @@ -123,6 +127,7 @@ const ( perftoolImage = "projects.registry.vmware.com/antrea/perftool" ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.5.12" ipfixCollectorPort = "4739" + clickHouseHTTPPort = "8123" nginxLBService = "nginx-loadbalancer" @@ -131,6 +136,7 @@ const ( exporterIdleFlowExportTimeout = 1 * time.Second aggregatorActiveFlowRecordTimeout = 3500 * time.Millisecond aggregatorInactiveFlowRecordTimeout = 6 * time.Second + aggregatorClickHouseCommitInterval = 1 * time.Second statefulSetRestartAnnotationKey = "antrea-e2e/restartedAt" ) @@ -181,6 +187,7 @@ type TestOptions struct { withBench bool enableCoverage bool enableAntreaIPAM bool + flowVisibility bool coverageDir string skipCases string } @@ -705,14 +712,69 @@ func (data *TestData) enableAntreaFlowExporter(ipfixCollector string) error { return data.mutateAntreaConfigMap(nil, ac, false, true) } -func (data *TestData) disableAntreaFlowExporter() error { - ac := func(config *agentconfig.AgentConfig) { - config.FeatureGates["FlowExporter"] = false +// deployFlowVisibilityClickHouse deploys ClickHouse operator and DB. +func (data *TestData) deployFlowVisibilityClickHouse() (string, error) { + err := data.CreateNamespace(flowVisibilityNamespace, nil) + if err != nil { + return "", err } - return data.mutateAntreaConfigMap(nil, ac, false, true) + + rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", chOperatorYML)) + if err != nil || rc != 0 { + return "", fmt.Errorf("error when deploying the ClickHouse Operator YML; %s not available on the control-plane Node", chOperatorYML) + } + if err := wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) { + rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", flowVisibilityYML)) + if err != nil || rc != 0 { + // ClickHouseInstallation CRD from ClickHouse Operator install bundle applied soon before + // applying CR. Sometimes apiserver validation fails to recognize resource of + // kind: ClickHouseInstallation. Retry in such scenario. + if strings.Contains(stderr, "ClickHouseInstallation") || strings.Contains(stdout, "ClickHouseInstallation") { + return false, nil + } + return false, fmt.Errorf("error when deploying the flow visibility YML %s: %s, %s, %v", flowVisibilityYML, stdout, stderr, err) + } + return true, nil + }); err != nil { + return "", err + } + + // check for clickhouse pod Ready. Wait for 2x timeout as ch operator needs to be running first to handle chi + if err = data.podWaitForReady(2*defaultTimeout, flowVisibilityCHPodName, flowVisibilityNamespace); err != nil { + return "", err + } + + // check clickhouse service http port for service connectivity + chSvc, err := data.GetService("flow-visibility", "clickhouse-clickhouse") + if err != nil { + return "", err + } + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (bool, error) { + rc, stdout, stderr, err := testData.RunCommandOnNode(controlPlaneNodeName(), + fmt.Sprintf("curl -Ss %s:%s", chSvc.Spec.ClusterIP, clickHouseHTTPPort)) + if rc != 0 || err != nil { + log.Infof("Failed to curl clickhouse Service: %s", strings.Trim(stderr, "\n")) + return false, nil + } else { + log.Infof("Successfully curl'ed clickhouse Service: %s", strings.Trim(stdout, "\n")) + return true, nil + } + }); err != nil { + return "", fmt.Errorf("timeout checking http port connectivity of clickhouse service: %v", err) + } + + return chSvc.Spec.ClusterIP, nil } -// deployFlowAggregator deploys the Flow Aggregator with ipfix collector address. +func (data *TestData) deleteClickHouseOperator() error { + rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl delete -f %s -n kube-system", chOperatorYML)) + if err != nil || rc != 0 { + return fmt.Errorf("error when deleting ClickHouse operator: %v", err) + } + return nil +} + +// deployFlowAggregator deploys the Flow Aggregator with ipfix collector and clickHouse address. func (data *TestData) deployFlowAggregator(ipfixCollector string) error { flowAggYaml := flowAggregatorYML if testOptions.enableCoverage { @@ -722,11 +784,7 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string) error { if err != nil || rc != 0 { return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml) } - svc, err := data.clientset.CoreV1().Services(flowAggregatorNamespace).Get(context.TODO(), flowAggregatorDeployment, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("unable to get service %v: %v", flowAggregatorDeployment, err) - } - if err = data.mutateFlowAggregatorConfigMap(ipfixCollector, svc.Spec.ClusterIP); err != nil { + if err = data.mutateFlowAggregatorConfigMap(ipfixCollector); err != nil { return err } if rc, _, _, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { @@ -734,10 +792,18 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string) error { _, logStdout, _, _ := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s logs -l app=flow-aggregator", flowAggregatorNamespace)) return fmt.Errorf("error when waiting for the Flow Aggregator rollout to complete. kubectl describe output: %s, logs: %s", stdout, logStdout) } + // Check for flow-aggregator Pod running again for db connection establishment + flowAggPod, err := data.getFlowAggregator() + if err != nil { + return fmt.Errorf("error when getting flow-aggregator Pod: %v", err) + } + if err = data.podWaitForReady(2*defaultTimeout, flowAggPod.Name, flowAggregatorNamespace); err != nil { + return err + } return nil } -func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollector string, faClusterIP string) error { +func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string) error { configMap, err := data.GetFlowAggregatorConfigMap() if err != nil { return err @@ -750,7 +816,11 @@ func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollector string, faClu flowAggregatorConf.FlowCollector = flowaggregatorconfig.FlowCollectorConfig{ Enable: true, - Address: ipfixCollector, + Address: ipfixCollectorAddr, + } + flowAggregatorConf.ClickHouse = flowaggregatorconfig.ClickHouseConfig{ + Enable: true, + CommitInterval: aggregatorClickHouseCommitInterval.String(), } flowAggregatorConf.ActiveFlowRecordTimeout = aggregatorActiveFlowRecordTimeout.String() flowAggregatorConf.InactiveFlowRecordTimeout = aggregatorInactiveFlowRecordTimeout.String() @@ -1201,6 +1271,20 @@ func (data *TestData) podWaitForRunning(timeout time.Duration, name, namespace s return err } +// podWaitForReady polls the k8s apiserver until the specified Pod is in the "Ready" status (or +// until the provided timeout expires). +func (data *TestData) podWaitForReady(timeout time.Duration, name, namespace string) error { + _, err := data.PodWaitFor(timeout, name, namespace, func(p *corev1.Pod) (bool, error) { + for _, condition := range p.Status.Conditions { + if condition.Type == corev1.PodReady { + return condition.Status == corev1.ConditionTrue, nil + } + } + return false, nil + }) + return err +} + // podWaitForIPs polls the K8s apiserver until the specified Pod is in the "running" state (or until // the provided timeout expires). The function then returns the IP addresses assigned to the Pod. If the // Pod is not using "hostNetwork", the function also checks that an IP address exists in each required diff --git a/test/e2e/infra/vagrant/push_antrea.sh b/test/e2e/infra/vagrant/push_antrea.sh index 5769eac9cb7..3f6097e221c 100755 --- a/test/e2e/infra/vagrant/push_antrea.sh +++ b/test/e2e/infra/vagrant/push_antrea.sh @@ -166,10 +166,10 @@ CH_OPERATOR_INSTALL_BUNDLE_YML=$THIS_DIR/../../../../build/yamls/clickhouse-oper FLOW_VIS_YML="/tmp/flow-visibility.yml" # If a flow collector address is also provided, we update the Antrea -# manifest (to enable all features) +# manifest to enable FlowExporter. if [[ $FLOW_COLLECTOR != "" ]]; then - echo "Generating manifest with all features enabled along with FlowExporter feature" - $THIS_DIR/../../../../hack/generate-manifest.sh --mode dev --all-features > "${ANTREA_YML}" + echo "Generating manifest with FlowExporter enabled" + $THIS_DIR/../../../../hack/generate-manifest.sh --mode dev --flow-exporter > "${ANTREA_YML}" fi # Push Antrea image and related manifest. diff --git a/test/e2e/main_test.go b/test/e2e/main_test.go index c43aa9adb93..cad7fc67f0b 100644 --- a/test/e2e/main_test.go +++ b/test/e2e/main_test.go @@ -81,6 +81,7 @@ func testMain(m *testing.M) int { flag.BoolVar(&testOptions.withBench, "benchtest", false, "Run tests include benchmark tests") flag.BoolVar(&testOptions.enableCoverage, "coverage", false, "Run tests and measure coverage") flag.BoolVar(&testOptions.enableAntreaIPAM, "antrea-ipam", false, "Run tests with AntreaIPAM") + flag.BoolVar(&testOptions.flowVisibility, "flow-visibility", false, "Run flow visibility tests") flag.StringVar(&testOptions.coverageDir, "coverage-dir", "", "Directory for coverage data files") flag.StringVar(&testOptions.skipCases, "skip", "", "Key words to skip cases") flag.Parse()