diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java index 492a85e8e..b282f0875 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/PythonAgentsIT.java @@ -92,6 +92,35 @@ public void testSource() { awaitCleanup(tenantNamespace, applicationId, "-test-python-source"); } + @Test + public void testSink() { + installLangStreamCluster(true); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + final String applicationId = "my-test-app"; + deployLocalApplication(applicationId, "experimental-python-sink"); + final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant; + awaitApplicationReady(applicationId, 1); + + executeCommandOnClient( + "bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30" + .formatted(applicationId) + .split(" ")); + + final String output = + executeCommandOnClient( + "bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30" + .formatted(applicationId) + .split(" ")); + log.info("Output: {}", output); + Assertions.assertTrue( + output.contains("{\"record\":{\"key\":null,\"value\":\"write: my-value\",\"headers\":{}}")); + + executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" ")); + + awaitCleanup(tenantNamespace, applicationId, "-test-python-source"); + } + private static void awaitCleanup( String tenantNamespace, String applicationId, String appNameSuffix) { Awaitility.await() diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/gateways.yaml new file mode 100644 index 000000000..22928d8db --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/gateways.yaml @@ -0,0 +1,23 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: produce-input + type: produce + topic: ls-test-input + - id: consume-output + type: consume + topic: ls-test-output \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/pipeline.yaml new file mode 100644 index 000000000..4a170c556 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/pipeline.yaml @@ -0,0 +1,33 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module: "module-1" +id: "pipeline-1" +name: "Test Sink" +topics: + - name: ls-test-input + creation-mode: create-if-not-exists + schema: + type: string +pipeline: + - name: "Sink using Python" + resources: + size: 2 + id: "test-python-sink" + type: "experimental-python-sink" + input: ls-test-input + configuration: + className: example.TestSink \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/python/example.py b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/python/example.py new file mode 100644 index 000000000..9e36eea1b --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/experimental-python-sink/python/example.py @@ -0,0 +1,33 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from confluent_kafka import Producer + + +class TestSink(object): + def __init__(self): + self.commit_callback = None + self.producer = Producer({}) + + def write(self, records): + logging.info("Write records: " + str(records)) + for record in records: + self.producer.produce("ls-test-output", ("write: " + record.value()).encode('utf-8')) + self.commit_callback.commit(records) + + def set_commit_callback(self, commit_callback): + self.commit_callback = commit_callback