Skip to content

Commit

Permalink
Add e2e test for Python Sink (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 26, 2023
1 parent d514021 commit 21debe2
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,40 @@ public void testSource() {

deleteAppAndAwaitCleanup(tenant, applicationId);
}

@Test
public void testSink() {
installLangStreamCluster(true);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
final String applicationId = "my-test-app";
Map<String, Object> admin =
(Map<String, Object>)
streamingCluster.configuration().getOrDefault("admin", Map.of());
String bootStrapServers =
(String) admin.getOrDefault("bootstrap.servers", "localhost:9092");
deployLocalApplicationAndAwaitReady(
tenant,
applicationId,
"experimental-python-sink",
Map.of("KAFKA_BOOTSTRAP_SERVERS", bootStrapServers),
1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30"
.formatted(applicationId)
.split(" "));

final String output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30"
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"write: my-value\",\"headers\":{}}"));

deleteAppAndAwaitCleanup(tenant, applicationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class BaseEndToEndTest implements TestWatcher {
protected static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory());
protected static KubeCluster kubeCluster;
protected static StreamingClusterProvider streamingClusterProvider;
protected static StreamingCluster streamingCluster;
protected static File instanceFile;
protected static CodeStorageProvider codeStorageProvider;
protected static CodeStorageProvider.CodeStorageConfig codeStorageConfig;
Expand Down Expand Up @@ -463,7 +464,7 @@ public static void setup() {
imagesFutures.get(3))
.join();

final StreamingCluster streamingCluster = streamingClusterFuture.join();
streamingCluster = streamingClusterFuture.join();

final Map<String, Map<String, Object>> instanceContent =
Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
return [
SimpleRecord(
record.value() + "!!" + self.secret_value, headers=record.headers()
)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: produce-input
type: produce
topic: ls-test-input
- id: consume-output
type: consume
topic: ls-test-output
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module: "module-1"
id: "pipeline-1"
name: "Test Sink"
topics:
- name: ls-test-input
creation-mode: create-if-not-exists
schema:
type: string
- name: ls-test-output
creation-mode: create-if-not-exists
schema:
type: string
pipeline:
- name: "Sink using Python"
resources:
size: 2
id: "test-python-sink"
type: "experimental-python-sink"
input: ls-test-input
configuration:
className: example.TestSink
bootstrapServers: "{{ secrets.kafka.bootstrap-servers }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from confluent_kafka import Producer


class TestSink(object):
def __init__(self):
self.commit_callback = None
self.producer = None

def init(self, config):
logging.info("Init config: " + str(config))
self.producer = Producer({"bootstrap.servers": config["bootstrapServers"]})

def write(self, records):
logging.info("Write records: " + str(records))
try:
for record in records:
self.producer.produce(
"ls-test-output", value=("write: " + record.value()).encode("utf-8")
)
self.producer.flush()
self.commit_callback.commit(records)
except Exception as e:
logging.error("Error writing records: " + str(e))
raise e

def set_commit_callback(self, commit_callback):
self.commit_callback = commit_callback
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
return [
SimpleRecord(
record.value() + "!!" + self.secret_value, headers=record.headers()
)
]
5 changes: 4 additions & 1 deletion langstream-e2e-tests/src/test/resources/secrets/secret1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ secrets:
token: "${ASTRA_TOKEN}"
database: "${ASTRA_DATABASE}"
secureBundle: "${ASTRA_SECURE_BUNDLE}"
environment: "${ASTRA_ENVIRONMENT}"
environment: "${ASTRA_ENVIRONMENT}"
- id: kafka
data:
bootstrap-servers: "${KAFKA_BOOTSTRAP_SERVERS}"

0 comments on commit 21debe2

Please sign in to comment.