Skip to content

Commit

Permalink
Add e2e test for Python Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Sep 25, 2023
1 parent 2eabe42 commit 1846091
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,35 @@ public void testSource() {
awaitCleanup(tenantNamespace, applicationId, "-test-python-source");
}

@Test
public void testSink() {
installLangStreamCluster(true);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
final String applicationId = "my-test-app";
deployLocalApplication(applicationId, "experimental-python-sink");
final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant;
awaitApplicationReady(applicationId, 1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30"
.formatted(applicationId)
.split(" "));

final String output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30"
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
Assertions.assertTrue(
output.contains("{\"record\":{\"key\":null,\"value\":\"write: my-value\",\"headers\":{}}"));

executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" "));

awaitCleanup(tenantNamespace, applicationId, "-test-python-source");
}

private static void awaitCleanup(
String tenantNamespace, String applicationId, String appNameSuffix) {
Awaitility.await()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: produce-input
type: produce
topic: ls-test-input
- id: consume-output
type: consume
topic: ls-test-output
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module: "module-1"
id: "pipeline-1"
name: "Test Sink"
topics:
- name: ls-test-input
creation-mode: create-if-not-exists
schema:
type: string
pipeline:
- name: "Sink using Python"
resources:
size: 2
id: "test-python-sink"
type: "experimental-python-sink"
input: ls-test-input
configuration:
className: example.TestSink
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
from confluent_kafka import Producer


class TestSink(object):
def __init__(self):
self.commit_callback = None
self.producer = Producer({})

def write(self, records):
logging.info("Write records: " + str(records))
for record in records:
self.producer.produce("ls-test-output", ("write: " + record.value()).encode('utf-8'))
self.commit_callback.commit(records)

def set_commit_callback(self, commit_callback):
self.commit_callback = commit_callback

0 comments on commit 1846091

Please sign in to comment.