From 786d566a1fb61ef08339e861619fdb5e56637ed8 Mon Sep 17 00:00:00 2001 From: Eric Pinzur <2641606+epinzur@users.noreply.github.com> Date: Tue, 1 Aug 2023 20:23:15 +0200 Subject: [PATCH] removed redis destination (#542) we haven't supported this for a long time, lets remove it from the codebase --- CONTRIBUTING.md | 7 +- Makefile | 4 +- clients/python/src/kaskada/formatters.py | 15 -- .../python/src/kaskada/formatters_shared.py | 14 -- clients/python/src/kaskada/materialization.py | 40 ----- clients/python/tests/test_materialization.py | 68 +------- crates/sparrow-runtime/src/execute/output.rs | 5 - .../src/execute/output/redis.rs | 21 --- .../src/execute/progress_reporter.rs | 3 - .../modules/developing/pages/queries.adoc | 41 +++-- docs-src/modules/installing/pages/local.adoc | 3 +- examples/Customer_Retention_(OSS).ipynb | 3 +- .../kaskada/v1alpha/destinations.proto | 51 +----- .../kaskada/v1alpha/query_service.proto | 16 -- .../kaskada/v2alpha/query_service.proto | 9 - tests/integration/api/api_suite_test.go | 26 +-- tests/integration/api/go.mod | 2 - tests/integration/api/go.sum | 4 - .../integration/api/graceful_shutdown_test.go | 160 ++++++++---------- .../integration/api/materializations_test.go | 2 +- tests/integration/api/queries_v2_test.go | 1 - tests/integration/api/query_v1_rest_test.go | 2 +- tests/integration/api/query_v1_test.go | 2 +- tests/integration/api/tables_test.go | 12 +- tests/integration/cli/cli_suite_test.go | 2 +- tests/integration/docker-compose.yml | 10 +- tests/integration/shared/go.mod | 2 +- tests/integration/shared/go.sum | 3 +- tests/integration/shared/helpers/helpers.go | 99 +++++++++-- wren/client/object_store_client.go | 2 +- wren/compute/compute_manager.go | 3 - wren/go.sum | 8 + wren/property/query_result_type.go | 2 - wren/service/query_v1.go | 2 +- 34 files changed, 219 insertions(+), 425 deletions(-) delete mode 100644 crates/sparrow-runtime/src/execute/output/redis.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e6a5fd12f..7d674785f 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -94,7 +94,7 @@ Run `cargo build --release -p sparrow-main` to build a release (optimized) binar ### Testing & Building the API * ensure docker is running locally -* run `make proto/generate` and `make ent/generate`. See the `./wren/README.md` for more info on those. +* run `make proto/generate`, `make ent/generate`, `make wren/generate-mocks`. See the `./wren/README.md` for more info on those. * run `make wren/test` ### Testing & Building the Python Client @@ -129,6 +129,7 @@ After making code changes, `ctrl-c` in the services window and restart it. #### locally, with the local backend +* run `make test/int/docker-up-dependencies-only` in one terminal window to get the dependencies up * run `make sparrow/run` in one terminal window to get the Engine service up * run `make wren/run` in a second terminal window to get the Manager service up * run `make test/int/run-api` in a third another terminal window to run the integration tests @@ -137,7 +138,7 @@ After making code changes, `ctrl-c` in the proper service window and restart it. #### locally, with the s3 backend -* run `make test/int/docker-up-s3-only` in one terminal window to get the dependencies up +* run `make test/int/docker-up-dependencies-only` in one terminal window to get the dependencies up * run `make sparrow/run-s3` in a second terminal window to get the Engine service up * run `make wren/run-s3` in a third terminal window to get the Manager service up * run `make test/int/run-api-s3` in a fourth terminal window to run the integration tests @@ -157,4 +158,4 @@ After making code changes, `ctrl-c` in the proper service window and restart it. Provides the language server integration for Rust code. * * [Even Better TOML](https://marketplace.visualstudio.com/items?itemName=tamasfe.even-better-toml). Optional. * * [Cargo](https://marketplace.visualstudio.com/items?itemName=panicbit.cargo). Optional. -* * [Crates](https://marketplace.visualstudio.com/items?itemName=serayuzgur.crates) Optional. \ No newline at end of file +* * [Crates](https://marketplace.visualstudio.com/items?itemName=serayuzgur.crates) Optional. diff --git a/Makefile b/Makefile index fecc4c580..4fb2a7d5e 100644 --- a/Makefile +++ b/Makefile @@ -64,8 +64,8 @@ test/int/docker-up: test/int/docker-up-s3: docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate -test/int/docker-up-s3-only: - docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate minio +test/int/docker-up-dependencies-only: + docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.s3.yml up --build --remove-orphans --force-recreate minio pulsar test/int/docker-up-postgres: docker compose -f ./tests/integration/docker-compose.yml -f ./tests/integration/docker-compose.postgres.yml up --build --remove-orphans --force-recreate diff --git a/clients/python/src/kaskada/formatters.py b/clients/python/src/kaskada/formatters.py index 2a19024c8..2d869a3f8 100644 --- a/clients/python/src/kaskada/formatters.py +++ b/clients/python/src/kaskada/formatters.py @@ -58,14 +58,6 @@ def get_query_response_content(resp_obj): path = resp_obj.file_results.paths[i] appendChildIfNotNone(nested_table, tr(td(pre(i)), td(a(path, _href=path)))) resultsExist = True - elif hasattr(resp_obj, "redis_bulk"): - nested_table = table(_class="kda_table") - details.appendChild(html_table_row("redis_bulk", nested_table)) - nested_table.appendChild(tr(th("index"), th("path"))) - for i in range(len(resp_obj.redis_bulk.paths)): - path = resp_obj.redis_bulk.paths[i] - appendChildIfNotNone(nested_table, tr(td(pre(i)), td(a(path, _href=path)))) - resultsExist = True ( can_execute, @@ -689,13 +681,6 @@ def try_init(): html_formatter.for_type( "kaskada.materialization.MaterializationView", generic_object_html_formatter ) - html_formatter.for_type( - "kaskada.materialization.RedisAIDestination", generic_object_html_formatter - ) - html_formatter.for_type( - "kaskada.kaskada.v1alpha.materialization_service_pb2.RedisAI", - generic_object_html_formatter, - ) html_formatter.for_type( "kaskada.kaskada.v1alpha.materialization_service_pb2.Destination", generic_object_html_formatter, diff --git a/clients/python/src/kaskada/formatters_shared.py b/clients/python/src/kaskada/formatters_shared.py index a90405156..beada1808 100644 --- a/clients/python/src/kaskada/formatters_shared.py +++ b/clients/python/src/kaskada/formatters_shared.py @@ -123,20 +123,6 @@ def get_materialization_html_and_schema_df(obj): if hasattr(obj, "destination") and obj.HasField("destination"): destination = table(_class="kda_table") details.appendChild(html_table_row("destination", destination)) - if hasattr(obj.destination, "redis_a_i") and obj.destination.HasField( - "redis_a_i" - ): - redis_a_i = table(_class="kda_table") - appendHtmlObjTableRowIfAttrExists( - redis_a_i, obj.destination.redis_a_i, "host" - ) - appendHtmlObjTableRowIfAttrExists( - redis_a_i, obj.destination.redis_a_i, "port" - ) - appendHtmlObjTableRowIfAttrExists( - redis_a_i, obj.destination.redis_a_i, "db" - ) - destination.appendChild(html_table_row("redis_a_i", redis_a_i)) if hasattr(obj, "slice"): details.appendChild(html_table_row("slice", get_slice_request_html(obj.slice))) diff --git a/clients/python/src/kaskada/materialization.py b/clients/python/src/kaskada/materialization.py index cad04a835..1d6a0ae3c 100644 --- a/clients/python/src/kaskada/materialization.py +++ b/clients/python/src/kaskada/materialization.py @@ -23,44 +23,6 @@ def to_request(self) -> Dict[str, Any]: pass -class RedisDestination(Destination): - def __init__( - self, - host_name: str, - port: int, - use_tls: bool, - database_number: int, - password: str, - tls_cert: str, - tls_key: str, - tls_ca_cert: str, - insecure_skip_verify: bool, - ) -> None: - super().__init__() - self._host_name = host_name - self._port = port - self._use_tls = use_tls - self._database_number = database_number - self._password = password - self._tls_cert = tls_cert - self._tls_key = tls_key - self._tls_ca_cert = tls_ca_cert - self._insecure_skip_verify = insecure_skip_verify - - def to_request(self) -> Dict[str, Any]: - return { - "host_name": self._host_name, - "port": self._port, - "use_tls": self._use_tls, - "database_number": self._database_number, - "password": self._password, - "tls_cert": self._tls_cert, - "tls_key": self._tls_key, - "tls_ca_cert": self._tls_ca_cert, - "insecure_skip_verify": self._insecure_skip_verify, - } - - class FileType(Enum): FILE_TYPE_UNSPECIFIED = 0 FILE_TYPE_PARQUET = 1 @@ -160,8 +122,6 @@ def create_materialization( } if isinstance(destination, ObjectStoreDestination): materialization["destination"] = {"object_store": destination.to_request()} - elif isinstance(destination, RedisDestination): - materialization["destination"] = {"redis": destination.to_request()} elif isinstance(destination, PulsarDestination): materialization["destination"] = { "pulsar": {"config": destination.to_request()} diff --git a/clients/python/tests/test_materialization.py b/clients/python/tests/test_materialization.py index 1dbfce673..04c537bb7 100644 --- a/clients/python/tests/test_materialization.py +++ b/clients/python/tests/test_materialization.py @@ -11,7 +11,6 @@ MaterializationView, ObjectStoreDestination, PulsarDestination, - RedisDestination, create_materialization, delete_materialization, get_materialization, @@ -21,23 +20,6 @@ from kaskada.slice_filters import EntityFilter -def test_redis_destination_to_request(): - params = { - "host_name": "my_host_name", - "port": 1234, - "use_tls": False, - "database_number": 4321, - "password": "my_password", - "tls_cert": "my_tls_cert", - "tls_key": "my_tls_key", - "tls_ca_cert": "my_tls_ca_cert", - "insecure_skip_verify": True, - } - - result = RedisDestination(**params) - assert result.to_request() == params - - def test_object_store_destination_to_request(): csv_file = FileType.FILE_TYPE_CSV output_prefix = "/my_prefix" @@ -218,7 +200,7 @@ def to_request(self) -> Dict[str, Any]: entity_keys: "my_entity_b" } } - + """ @@ -300,54 +282,6 @@ def test_create_materialization_object_store_parquet_destination(mockClient): ) -@patch("kaskada.client.Client") -def test_create_materialization_redis_destination(mockClient): - params = { - "host_name": "my_host_name", - "port": 1234, - "use_tls": False, - "database_number": 4321, - "password": "my_password", - "tls_cert": "my_tls_cert", - "tls_key": "my_tls_key", - "tls_ca_cert": "my_tls_ca_cert", - "insecure_skip_verify": True, - } - - redis_destination = RedisDestination(**params) - - name = "my_awkward_tacos" - expression = "last(tacos)" - destination = redis_destination - views = [MaterializationView("my_second_view", "last(awkward)")] - slice_filter = EntityFilter(["my_entity_a", "my_entity_b"]) - - expected_request = material_pb.CreateMaterializationRequest( - **{ - "materialization": { - "materialization_name": name, - "expression": expression, - "with_views": [ - {"name": "my_second_view", "expression": "last(awkward)"} - ], - "destination": {"redis": redis_destination.to_request()}, - "slice": slice_filter.to_request(), - } - } - ) - create_materialization( - name, - expression, - destination, - views, - slice_filter=slice_filter, - client=mockClient, - ) - mockClient.materialization_stub.CreateMaterialization.assert_called_with( - expected_request, metadata=mockClient.get_metadata() - ) - - @patch("kaskada.client.Client") def test_create_materialization_astra_streaming_destination(mockClient): params = { diff --git a/crates/sparrow-runtime/src/execute/output.rs b/crates/sparrow-runtime/src/execute/output.rs index 052e853dc..d0cbd50ab 100644 --- a/crates/sparrow-runtime/src/execute/output.rs +++ b/crates/sparrow-runtime/src/execute/output.rs @@ -59,11 +59,6 @@ impl TryFrom for Destination { sparrow_api::kaskada::v1alpha::destination::Destination::ObjectStore(destination) => { Ok(Destination::ObjectStore(destination)) } - sparrow_api::kaskada::v1alpha::destination::Destination::Redis(_) => { - error_stack::bail!(Error::FeatureNotEnabled { - feature: "redis".to_owned() - }) - } #[cfg(not(feature = "pulsar"))] sparrow_api::kaskada::v1alpha::destination::Destination::Pulsar(_) => { error_stack::bail!(Error::FeatureNotEnabled { diff --git a/crates/sparrow-runtime/src/execute/output/redis.rs b/crates/sparrow-runtime/src/execute/output/redis.rs deleted file mode 100644 index 34a51b9c0..000000000 --- a/crates/sparrow-runtime/src/execute/output/redis.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::execute::progress_reporter::ProgressUpdate; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use error_stack::Result; -use futures::stream::BoxStream; -use sparrow_api::kaskada::v1alpha::RedisDestination; - -#[derive(derive_more::Display, Debug)] -#[display(fmt = "Redis Destination is unsupported")] -pub struct Error; - -impl error_stack::Context for Error {} - -pub(super) async fn write( - _redis: RedisDestination, - _schema: SchemaRef, - _progress_updates_tx: tokio::sync::mpsc::Sender, - _batches: BoxStream<'static, RecordBatch>, -) -> Result<(), Error> { - error_stack::bail!(Error); -} diff --git a/crates/sparrow-runtime/src/execute/progress_reporter.rs b/crates/sparrow-runtime/src/execute/progress_reporter.rs index b1e846bb6..f4e858712 100644 --- a/crates/sparrow-runtime/src/execute/progress_reporter.rs +++ b/crates/sparrow-runtime/src/execute/progress_reporter.rs @@ -181,9 +181,6 @@ impl ProgressTracker { })), }) } - destination::Destination::Redis(_) => { - error_stack::bail!(Error::UnsupportedOutput { output: "redis" }) - } } } } diff --git a/docs-src/modules/developing/pages/queries.adoc b/docs-src/modules/developing/pages/queries.adoc index 21cd81a80..e7bdee8f4 100644 --- a/docs-src/modules/developing/pages/queries.adoc +++ b/docs-src/modules/developing/pages/queries.adoc @@ -14,12 +14,12 @@ To learn more about Fenl: == Query syntax quickstart -Kaskada's query language builds on the lessons of 50+ years of query language design to provide a declarative, composable, easy-to-read, and type-safe way of describing computations related to time. +Kaskada's query language builds on the lessons of 50+ years of query language design to provide a declarative, composable, easy-to-read, and type-safe way of describing computations related to time. The following is a quick overview of the query language's main features and syntax. === Viewing and filtering the contents of a table -Kaskada queries are built by composing simple expressions. +Kaskada queries are built by composing simple expressions. Every expression returns a timeline. [source,Fenl] @@ -86,7 +86,7 @@ Pipe syntax allows multiple operations to be chained together. Write your operat ---- { largest_spend_over_2_purchases: purchase.amount - | when(Purchase.category == "food") + | when(Purchase.category == "food") | sum(window=sliding(2, Purchase.category == "food")) # Inner aggregation | max() # Outer aggregation } @@ -135,7 +135,7 @@ let purchases_yesterday = in { purchases_in_last_day: purchases_now - purchases_yesterday } ---- -In this example we take the timeline produced by `purchases_now` and move it forward in time by one day using the `xref:fenl:catalog.adoc#shift-by[shift_by()]` function. +In this example we take the timeline produced by `purchases_now` and move it forward in time by one day using the `xref:fenl:catalog.adoc#shift-by[shift_by()]` function. We then subtract the shifted value from the original, unshifted value === Simple, composable syntax @@ -149,7 +149,7 @@ let cadence = hourly() # Anything can be named and re-used let hourly_big_purchases = Purchase | when(Purchase.amount > 10) -# Filter anywhere +# Filter anywhere | count(window=since(cadence)) # Aggregate anything | when(cadence) @@ -243,10 +243,10 @@ from kaskada.api.session import LocalBuilder session = LocalBuilder().build() -query = """{ - time: Purchase.purchase_time, - entity: Purchase.customer_id, - max_amount: last(Purchase.amount) | max(), +query = """{ + time: Purchase.purchase_time, + entity: Purchase.customer_id, + max_amount: last(Purchase.amount) | max(), min_amount: Purchase.amount | min() }""" @@ -272,9 +272,7 @@ views stored in the system. `"all-results"` _(default)_, or `"final-results"` which returns only the final values for each entity. * *response_as*: Determines how the response is returned. Either -`"parquet"` _(default)_ or `"redis-bulk"`. -** If `"redis-bulk"`, result_behavior is assumed to be -`"final-results"`. +`"parquet"` _(default)_ or `"csv"`. * *data_token_id*: Enables repeatable queries. Queries performed against the same data token always run on the same input data. * *limits*: Configures limits on the output set. @@ -317,8 +315,7 @@ available: `all-results` _(default)_, or `final-results` which returns only the final values for each entity. * *--output*: Output format for the query results. One of `df` dataframe -_(default)_, `json`, `parquet` or `redis-bulk`. -** If `redis-bulk`, --result-behavior is assumed to be `final-results`. +_(default)_, `json`, or `parquet`. * *--data-token*: Enables repeatable queries. Queries performed against the same data token always run on the same input data. * *--preview-rows*: Produces a preview of the data with at least this @@ -515,7 +512,7 @@ transactions Returns a dataframe of 71599 rows, instead of the full dataset of 100000 rows. -[NOTE] +[NOTE] ==== It may seem odd that many thousands of rows were returned when only 50 were requested. This happens because query operates on batches @@ -536,12 +533,12 @@ To query Kaskada using the CLI, the query string should be provided on `STDIN`. An easy way to define a query is to create a text file containing the query. [source,Fenl] -.query.fenl +.query.fenl ---- -{ - time: Purchase.purchase_time, - entity: Purchase.customer_id, - max_amount: last(Purchase.amount) | max(), +{ + time: Purchase.purchase_time, + entity: Purchase.customer_id, + max_amount: last(Purchase.amount) | max(), min_amount: Purchase.amount | min() } ---- @@ -567,11 +564,11 @@ By default, query results are written to a Parquet file: the locations of these } ---- -The resulting files are stored in the JSON path `outputTo.objectStore.outputPaths.paths` as an array of paths. +The resulting files are stored in the JSON path `outputTo.objectStore.outputPaths.paths` as an array of paths. [TIP] ==== -To slice and/or filter JSON output we can use https://stedolan.github.io/jq/[jq]. +To slice and/or filter JSON output we can use https://stedolan.github.io/jq/[jq]. ==== [source,bash] diff --git a/docs-src/modules/installing/pages/local.adoc b/docs-src/modules/installing/pages/local.adoc index 6d1d2c329..a05824cc0 100644 --- a/docs-src/modules/installing/pages/local.adoc +++ b/docs-src/modules/installing/pages/local.adoc @@ -141,8 +141,7 @@ optional arguments: repeatable queries. --debug DEBUG Shows debugging information --output OUTPUT Output format for the query results. One of "df" - (default), "json", "parquet" or "redis-bulk". "redis- - bulk" implies --result-behavior "final-results" + (default), "json", or "parquet". --preview-rows PREVIEW_ROWS Produces a preview of the data with at least this many rows. diff --git a/examples/Customer_Retention_(OSS).ipynb b/examples/Customer_Retention_(OSS).ipynb index 6dce401fc..a66085354 100644 --- a/examples/Customer_Retention_(OSS).ipynb +++ b/examples/Customer_Retention_(OSS).ipynb @@ -467,6 +467,7 @@ ] }, { + "attachments": {}, "cell_type": "markdown", "id": "9b400634-fa26-4202-9fc7-951f2d431532", "metadata": { @@ -481,7 +482,7 @@ "- Kaskada connects directly the event-based data available in production\n", "- Data scientists define the predictor features used to power training sets\n", "- Data and ML Engineers call Kaskada to compute the **same** features at the time of now() in production\n", - "- Kaskada provides production grade targets such as Redis for feature and model serving" + "- Kaskada provides production grade targets such as Pulsar for feature and model serving" ] }, { diff --git a/proto/kaskada/kaskada/v1alpha/destinations.proto b/proto/kaskada/kaskada/v1alpha/destinations.proto index b1d4f80d5..cdcc21960 100644 --- a/proto/kaskada/kaskada/v1alpha/destinations.proto +++ b/proto/kaskada/kaskada/v1alpha/destinations.proto @@ -8,9 +8,10 @@ import "kaskada/kaskada/v1alpha/pulsar.proto"; // Describes the destination results are materialized to. message Destination { + reserved 2; // old redis oneof destination + oneof destination { ObjectStoreDestination object_store = 1; - RedisDestination redis = 2; PulsarDestination pulsar = 3; } } @@ -39,54 +40,6 @@ message ObjectStoreDestination { } } -// Writes the results directly to a RedisAI instance. -// -// Uses a series of AI.TENSORSET operations -// -// The query expression's type must be a record. -// The record type must include a field named 'key'. -// The value of the 'key' field is used as the AI.TENSORSET key. -// All other fields must be numeric. -// -// See https://redis.io/topics/protocol -message RedisDestination { - // The hostname of the Redis instance. - string host_name = 1; - - // The port of the Redis instance. - int32 port = 2; - - // When `true`, TLS will be used to connect to Redis. - bool use_tls = 3; - - // The Redis database number 0 to 15. - int32 database_number = 4; - - // The password to connect to the Redis instance - string password = 5; - - // An X.509 certificate to use for authenticating the server - // to connected clients, masters or cluster peers. - // The string should be PEM formatted. - string tls_cert = 6; - - // An X.509 private key to use for authenticating the server - // to connected clients, masters or cluster peers. - // The string should be PEM formatted. - string tls_key = 7; - - // A PEM encoded CA's certificate. - string tls_ca_cert = 8; - - // InsecureSkipVerify controls whether a client verifies the - // server's certificate chain and host name. - // If this field is true, TLS accepts any certificate - // presented by the server and any host name in that certificate. - // In this mode, TLS is susceptible to man-in-the-middle attacks. - // This should be used only for testing. - bool insecure_skip_verify = 9; -} - message PulsarDestination { PulsarConfig config = 1; } diff --git a/proto/kaskada/kaskada/v1alpha/query_service.proto b/proto/kaskada/kaskada/v1alpha/query_service.proto index c8e70481e..f8d581550 100644 --- a/proto/kaskada/kaskada/v1alpha/query_service.proto +++ b/proto/kaskada/kaskada/v1alpha/query_service.proto @@ -69,22 +69,6 @@ message Query { int64 preview_rows = 1; } - message RedisBulkResponse { - // The tensor shape to output values. - // - // Exactly one dimension's value must be zero - this dimension's - // cardinality is determined by the number of output values. The - // number of output values must be a multiple of the product of - // the nonzero dimensions. - // - // Example: - // [0] - Column vector: [1,2,3,4,5,6] - // [1, 0] - Single row vector: [[1,2,3,4,5,6]] - // [2, 0] - tuple vectors: [[1,2], [3,4], [5,6]] - // [0, 2] - two row vectors: [[1,2,3], [4,5,6]] - repeated int32 shape = 1; - } - enum ResultBehavior { // Unspecified - Invalid Value RESULT_BEHAVIOR_UNSPECIFIED = 0; diff --git a/proto/kaskada/kaskada/v2alpha/query_service.proto b/proto/kaskada/kaskada/v2alpha/query_service.proto index 4b235611d..efc8ba6e4 100644 --- a/proto/kaskada/kaskada/v2alpha/query_service.proto +++ b/proto/kaskada/kaskada/v2alpha/query_service.proto @@ -107,9 +107,6 @@ message QueryConfig { kaskada.v1alpha.Destination destination = 3; // Determines how results are returned. - // - // Note that for Destination -> RedisBulkResponse or RedisAI, the only - // valid option is `FinalResults` ResultBehavior result_behavior = 4; // Configure limits on the output set. @@ -140,12 +137,6 @@ message ParquetResults { repeated string paths = 1; } -message RedisBulkResults { - // URIs identifying the Redis Bulk files containing the query - // results. - repeated string paths = 1; -} - message QueryOutput { kaskada.v1alpha.FileResults file_results = 1; } diff --git a/tests/integration/api/api_suite_test.go b/tests/integration/api/api_suite_test.go index c25b987b7..ba422211a 100644 --- a/tests/integration/api/api_suite_test.go +++ b/tests/integration/api/api_suite_test.go @@ -7,15 +7,12 @@ import ( "io" "net/http" "net/url" - "os" "runtime" "strings" "testing" "time" - "github.com/RedisAI/redisai-go/redisai" "github.com/apache/pulsar-client-go/pulsar" - "github.com/gomodule/redigo/redis" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" @@ -42,8 +39,6 @@ var ( minioRootUser = flag.String("minio-root-user", "minio", "root username for connecting to minio") minioRootPassword = flag.String("minio-root-password", "minio123", "root password for connecting to minio") minioEndpoint = flag.String("minio-endpoint", "127.0.0.1:9000", "endpoint for connecting to minio") - redisAIPort = flag.Int("redis-ai-port", 6379, "Port to connect to the redis-ai integration instance. Note that this should be a specific instance for integration tests only, as the test cleanup will wipe any existing data from the redis instance.") - redisAIHost = flag.String("redis-ai-host", "127.0.0.1", "Host to connect to the redis-ai integration instance. Note that this should be a specific instance for integration tests only, as the test cleanup will wipe any existing data from the redis instance.") kaskadaHostname = flag.String("hostname", "127.0.0.1", "hostname of Kaskada to connect") kaskadaGrpcPort = flag.Int("grpc-port", 50051, "Kaskada's gRPC port to connect") kaskadaRestPort = flag.Int("rest-port", 3365, "Kaskada's REST port to connect") @@ -58,7 +53,7 @@ var ( grpcConfig helpers.HostConfig ) -// Before starting tests, delete all tables associated with the Integration clientID. Also completely wipes connected RedisAI instance. +// Before starting tests, delete all tables associated with the Integration clientID. var _ = BeforeSuite(func() { flag.Parse() @@ -112,23 +107,6 @@ func isARM() bool { return strings.Contains(runtime.GOARCH, "arm") } -func getRedisAIClient(db int) *redisai.Client { - pool := &redis.Pool{Dial: func() (redis.Conn, error) { - return redis.Dial("tcp", fmt.Sprintf("%s:%d", *redisAIHost, *redisAIPort), redis.DialDatabase(db)) - }} - - return redisai.Connect("", pool) -} - -func wipeRedisDatabase(db int) { - //Cleanup all existing data in RedisAI - redisAIClient := getRedisAIClient(db) - defer redisAIClient.Close() - redisAIClient.ActiveConnNX() - err := redisAIClient.ActiveConn.Send("FLUSHALL", "SYNC") - Expect(err).ShouldNot(HaveOccurred()) -} - func getRestRequest(ctx context.Context, method, endpoint string, jsonBody []byte) *http.Request { var ( req *http.Request @@ -205,7 +183,7 @@ func primitiveSchemaField(name string, primitiveType v1alpha.DataType_PrimitiveT } func getRemotePulsarHostname() string { - if os.Getenv("ENV") == "local-local" { + if helpers.TestsAreRunningLocally() { return "localhost" } else { return "pulsar" diff --git a/tests/integration/api/go.mod b/tests/integration/api/go.mod index a85c5257a..3ee201394 100644 --- a/tests/integration/api/go.mod +++ b/tests/integration/api/go.mod @@ -10,7 +10,6 @@ replace github.com/kaskada-ai/kaskada/tests/integration/shared => ../shared require ( github.com/apache/pulsar-client-go v0.9.0 - github.com/gomodule/redigo v1.8.9 github.com/google/uuid v1.3.0 github.com/jt-nti/gproto v0.0.0-20210304092907-23e645af1351 github.com/mattn/go-sqlite3 v1.14.16 @@ -110,7 +109,6 @@ require ( require ( ariga.io/atlas v0.5.0 // indirect entgo.io/ent v0.11.1 // indirect - github.com/RedisAI/redisai-go v1.0.1 github.com/agext/levenshtein v1.2.1 // indirect github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect github.com/c2fo/vfs v2.1.4+incompatible // indirect diff --git a/tests/integration/api/go.sum b/tests/integration/api/go.sum index 433c20bc2..abc9d8d78 100644 --- a/tests/integration/api/go.sum +++ b/tests/integration/api/go.sum @@ -147,8 +147,6 @@ github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/RedisAI/redisai-go v1.0.1 h1:SUsds3o10WdI9e9g/vVRIGqh3i/FF5DhtomzEqLp/d8= -github.com/RedisAI/redisai-go v1.0.1/go.mod h1:FRf3yzUzpFsX6c+v5gBdRmTpR2+O/3lH0X50HT7qsbM= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -372,8 +370,6 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= -github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v1.11.0 h1:O7CEyB8Cb3/DmtxODGtLHcEvpr81Jm5qLg/hsHnxA2A= diff --git a/tests/integration/api/graceful_shutdown_test.go b/tests/integration/api/graceful_shutdown_test.go index ad00addd1..e2fdf6e9a 100644 --- a/tests/integration/api/graceful_shutdown_test.go +++ b/tests/integration/api/graceful_shutdown_test.go @@ -2,49 +2,51 @@ package api_test import ( "context" + "fmt" "os/exec" "time" - "github.com/RedisAI/redisai-go/redisai" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - . "github.com/onsi/gomega/gstruct" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "google.golang.org/protobuf/types/known/wrapperspb" v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" helpers "github.com/kaskada-ai/kaskada/tests/integration/shared/helpers" . "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers" ) -var _ = PDescribe("Graceful Shutdown test", Ordered, Label("redis"), Label("redis-ai"), func() { +var _ = Describe("Graceful Shutdown test", Ordered, Label("docker"), func() { var ( ctx context.Context cancel context.CancelFunc conn *grpc.ClientConn - key1 string - key2 string - redisAIClient *redisai.Client materializationClient v1alpha.MaterializationServiceClient tableClient v1alpha.TableServiceClient queryClient v1alpha.QueryServiceClient + outputURI string table *v1alpha.Table + tableName string + materializationName string ) + tableName = "graceful_shutdown_table" + materializationName = "graceful_shutdown_mat" + query := ` { -time: transactions.transaction_time, -key: transactions.id, -max_price: transactions.price | max(), -min_spent_in_single_transaction: min(transactions.price * transactions.quantity) -max_spent_in_single_transaction: max(transactions.price * transactions.quantity) +time: graceful_shutdown_table.transaction_time, +key: graceful_shutdown_table.id, +max_price: graceful_shutdown_table.price | max(), +min_spent_in_single_transaction: min(graceful_shutdown_table.price * graceful_shutdown_table.quantity), +max_spent_in_single_transaction: max(graceful_shutdown_table.price * graceful_shutdown_table.quantity) }` - redisDb := 4 kaskadaIsDown := false terminateKaskada := func() { + defer GinkgoRecover() + cmd := exec.Command("docker", "kill", "-s", "SIGTERM", "kaskada") err := cmd.Run() Expect(err).ShouldNot(HaveOccurred(), "Unable to terminate kaskada") @@ -77,37 +79,39 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) _, err := tableClient.ListTables(ctx, &v1alpha.ListTablesRequest{}) g.Expect(err).ShouldNot(HaveOccurred()) + + materializationClient = v1alpha.NewMaterializationServiceClient(conn) + queryClient = v1alpha.NewQueryServiceClient(conn) }, "30s", "1s").Should(Succeed()) kaskadaIsDown = false - - // get a grpc client for the materialization & compute services - materializationClient = v1alpha.NewMaterializationServiceClient(conn) - queryClient = v1alpha.NewQueryServiceClient(conn) } BeforeAll(func() { - // get a redis connections for verifying results - redisAIClient = getRedisAIClient(redisDb) + if helpers.TestsAreRunningLocally() { + Skip("tests running locally, skipping gracefull shutdown test") + } - wipeRedisDatabase(redisDb) + // define the output path and make sure it is empty + outputURI = fmt.Sprintf("file:///data/output/%s", materializationName) - // declare the keys we are testing for - key1 = "Symdt3HKIYEFyzRCgdQl2/OKVBzjl7aO1XcKd7o70wM=" - key2 = "c5obkiyX5gof2EdzWlYbXZ98xfu+cpjxxvANgTfRNzM=" + //get connection to wren + ctx, cancel, conn = grpcConfig.GetContextCancelConnection(20) + ctx = metadata.AppendToOutgoingContext(ctx, "client-id", *integrationClientID) + + tableClient = v1alpha.NewTableServiceClient(conn) + materializationClient = v1alpha.NewMaterializationServiceClient(conn) + queryClient = v1alpha.NewQueryServiceClient(conn) // delete the table and materialization if not cleaned up in the previous run - tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: "transactions"}) - materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: "transaction_details"}) + tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName}) + materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: materializationName}) // create a table table = &v1alpha.Table{ - TableName: "transactions", + TableName: tableName, TimeColumnName: "transaction_time", EntityKeyColumnName: "id", - SubsortColumnName: &wrapperspb.StringValue{ - Value: "idx", - }, } _, err := tableClient.CreateTable(ctx, &v1alpha.CreateTableRequest{Table: table}) Expect(err).ShouldNot(HaveOccurredGrpc()) @@ -117,12 +121,15 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) }) AfterAll(func() { + if helpers.TestsAreRunningLocally() { + Skip("tests running locally, skipping gracefull shutdown test") + } + // clean up items created - materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: "transaction_details"}) - // this materialization might not have been created if test had an issue, so we don't check error here - _, err := tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: "transactions"}) + _, err := tableClient.DeleteTable(ctx, &v1alpha.DeleteTableRequest{TableName: tableName}) Expect(err).ShouldNot(HaveOccurred()) + cancel() conn.Close() }) @@ -134,17 +141,19 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) Context("When the table schema is created correctly", func() { Describe("Start a query, and then send a termination signal to Kaskada", func() { It("should return query results before exiting", func() { - go terminateKaskada() - destination := &v1alpha.Destination_ObjectStore{ - ObjectStore: &v1alpha.ObjectStoreDestination{ - FileType: v1alpha.FileType_FILE_TYPE_PARQUET, + destination := &v1alpha.Destination{ + Destination: &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: v1alpha.FileType_FILE_TYPE_PARQUET, + }, }, } + go terminateKaskada() stream, err := queryClient.CreateQuery(ctx, &v1alpha.CreateQueryRequest{ Query: &v1alpha.Query{ Expression: query, - Destination: &v1alpha.Destination{Destination: destination}, + Destination: destination, ResultBehavior: v1alpha.Query_RESULT_BEHAVIOR_ALL_RESULTS, }, QueryOptions: &v1alpha.QueryOptions{ @@ -164,20 +173,7 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) resultsUrl := res.GetDestination().GetObjectStore().GetOutputPaths().Paths[0] results := helpers.DownloadParquet(resultsUrl) - Expect(len(results)).Should(Equal(100000)) - Expect(results).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Time": PointTo(BeEquivalentTo(20150106)), - "Key": PointTo(Equal(key1)), - "Max_list_price": PointTo(BeEquivalentTo(149)), - "Min_paid": PointTo(BeEquivalentTo(149)), - }))) - - Expect(results).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ - "Time": PointTo(BeEquivalentTo(20150104)), - "Key": PointTo(Equal(key2)), - "Max_list_price": PointTo(BeEquivalentTo(149)), - "Min_paid": PointTo(BeEquivalentTo(149)), - }))) + Expect(len(results)).Should(Equal(50000)) }) }) @@ -185,40 +181,32 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) It("create the materialzation without error", func() { go terminateKaskada() + destination := &v1alpha.Destination{ + Destination: &v1alpha.Destination_ObjectStore{ + ObjectStore: &v1alpha.ObjectStoreDestination{ + FileType: v1alpha.FileType_FILE_TYPE_PARQUET, + OutputPrefixUri: outputURI, + }, + }, + } + res, err := materializationClient.CreateMaterialization(ctx, &v1alpha.CreateMaterializationRequest{ Materialization: &v1alpha.Materialization{ - MaterializationName: "transaction_details", + MaterializationName: materializationName, Expression: query, - Destination: &v1alpha.Destination{ - Destination: &v1alpha.Destination_Redis{ - Redis: &v1alpha.RedisDestination{ - HostName: "redis", - Port: 6379, - DatabaseNumber: int32(redisDb), - }, - }, - }, + Destination: destination, }, }) Expect(err).ShouldNot(HaveOccurredGrpc()) Expect(res).ShouldNot(BeNil()) }) - It("Should upload results to redis before terminating", func() { + It("Should output results to a file before terminating", func() { Eventually(func(g Gomega) { - dataType, shape, values, err := redisAIClient.TensorGetValues(key1) - g.Expect(err).ShouldNot(HaveOccurred()) - g.Expect(dataType).Should(Equal("INT64")) - g.Expect(shape).Should(Equal([]int64{1, 3})) - g.Expect(values).Should(Equal([]int64{20150106, 149, 149})) - }, "30s", "1s").Should(Succeed()) + filePaths := helpers.EventuallyListOutputFiles(materializationName + "/0", g) - Eventually(func(g Gomega) { - dataType, shape, values, err := redisAIClient.TensorGetValues(key2) - g.Expect(err).ShouldNot(HaveOccurred()) - g.Expect(dataType).Should(Equal("INT64")) - g.Expect(shape).Should(Equal([]int64{1, 3})) - g.Expect(values).Should(Equal([]int64{20150104, 149, 149})) + results := helpers.DownloadParquet(filePaths[0]) + g.Expect(len(results)).Should(Equal(50000)) }, "30s", "1s").Should(Succeed()) }) }) @@ -230,23 +218,21 @@ max_spent_in_single_transaction: max(transactions.price * transactions.quantity) helpers.LoadTestFileIntoTable(ctx, conn, table, "transactions/transactions_part2.parquet") }) - It("Should upload new results to redis before terminating", func() { + It("Should output results to a file before terminating", func() { Eventually(func(g Gomega) { - dataType, shape, values, err := redisAIClient.TensorGetValues(key1) - g.Expect(err).ShouldNot(HaveOccurred()) - g.Expect(dataType).Should(Equal("INT64")) - g.Expect(shape).Should(Equal([]int64{1, 3})) - g.Expect(values).Should(Equal([]int64{20150109, 149, 100})) - }, "30s", "1s").Should(Succeed()) + filePaths := helpers.EventuallyListOutputFiles(materializationName + "/1", g) - Eventually(func(g Gomega) { - dataType, shape, values, err := redisAIClient.TensorGetValues(key2) - g.Expect(err).ShouldNot(HaveOccurred()) - g.Expect(dataType).Should(Equal("INT64")) - g.Expect(shape).Should(Equal([]int64{1, 3})) - g.Expect(values).Should(Equal([]int64{20150111, 149, 149})) + results := helpers.DownloadParquet(filePaths[0]) + g.Expect(len(results)).Should(Equal(10000)) }, "30s", "1s").Should(Succeed()) }) }) + + Describe("Cleeanup the materialization used in the test", func() { + It("Should work without error", func() { + _, err := materializationClient.DeleteMaterialization(ctx, &v1alpha.DeleteMaterializationRequest{MaterializationName: materializationName}) + Expect(err).ShouldNot(HaveOccurred()) + }) + }) }) }) diff --git a/tests/integration/api/materializations_test.go b/tests/integration/api/materializations_test.go index 57b002da0..bb16576dd 100644 --- a/tests/integration/api/materializations_test.go +++ b/tests/integration/api/materializations_test.go @@ -16,7 +16,7 @@ import ( . "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers" ) -var _ = Describe("Materializations", Ordered, Label("redis"), Label("redis-ai"), func() { +var _ = Describe("Materializations", Ordered, func() { var ctx context.Context var cancel context.CancelFunc var conn *grpc.ClientConn diff --git a/tests/integration/api/queries_v2_test.go b/tests/integration/api/queries_v2_test.go index 13d59b046..40be09df9 100644 --- a/tests/integration/api/queries_v2_test.go +++ b/tests/integration/api/queries_v2_test.go @@ -112,7 +112,6 @@ var _ = Describe("Queries V2", Ordered, func() { Expect(config.ExperimentalFeatures).Should(BeNil()) Expect(config.Limits).Should(BeNil()) Expect(config.Destination.GetObjectStore()).ShouldNot(BeNil()) - Expect(config.Destination.GetRedis()).Should(BeNil()) Expect(config.ResultBehavior.GetAllResults()).ShouldNot(BeNil()) Expect(config.ResultBehavior.GetFinalResults()).Should(BeNil()) Expect(config.Slice).Should(BeNil()) diff --git a/tests/integration/api/query_v1_rest_test.go b/tests/integration/api/query_v1_rest_test.go index ae863b299..3787ab5ce 100644 --- a/tests/integration/api/query_v1_rest_test.go +++ b/tests/integration/api/query_v1_rest_test.go @@ -61,7 +61,7 @@ var _ = PDescribe("Query V1 REST", Ordered, func() { loadReq := loadRequestJson{ TableName: table.TableName, - ParquetFileUri: helpers.GetFileURI("purchases_part1.parquet"), + ParquetFileUri: helpers.GetTestFileURI("purchases_part1.parquet"), } jsonBody, err = json.Marshal(loadReq) diff --git a/tests/integration/api/query_v1_test.go b/tests/integration/api/query_v1_test.go index e90e3ab22..0ee7177ea 100644 --- a/tests/integration/api/query_v1_test.go +++ b/tests/integration/api/query_v1_test.go @@ -193,7 +193,7 @@ min_amount: query_v1_test.amount | min(), SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI("purchases/purchases_part2.parquet"), + Uri: helpers.GetTestFileURI("purchases/purchases_part2.parquet"), }, }, CopyToFilesystem: true, diff --git a/tests/integration/api/tables_test.go b/tests/integration/api/tables_test.go index 667999c6b..b9cf4013e 100644 --- a/tests/integration/api/tables_test.go +++ b/tests/integration/api/tables_test.go @@ -236,7 +236,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, }) @@ -298,7 +298,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, }) @@ -320,7 +320,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, }) @@ -366,7 +366,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, CopyToFilesystem: true, @@ -387,7 +387,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, CopyToFilesystem: true, @@ -538,7 +538,7 @@ var _ = Describe("Tables", Ordered, func() { SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: v1alpha.FileType_FILE_TYPE_PARQUET, - Uri: helpers.GetFileURI(fileName), + Uri: helpers.GetTestFileURI(fileName), }, }, }) diff --git a/tests/integration/cli/cli_suite_test.go b/tests/integration/cli/cli_suite_test.go index d1bfd3f1e..2b1d49f5b 100644 --- a/tests/integration/cli/cli_suite_test.go +++ b/tests/integration/cli/cli_suite_test.go @@ -40,7 +40,7 @@ var ( grpcConfig helpers.HostConfig ) -// Before starting tests, delete all tables associated with the Integration clientID. Also completely wipes connected RedisAI instance. +// Before starting tests, delete all tables associated with the Integration clientID. var _ = BeforeSuite(func() { flag.Parse() diff --git a/tests/integration/docker-compose.yml b/tests/integration/docker-compose.yml index 23e2941fc..f019de962 100644 --- a/tests/integration/docker-compose.yml +++ b/tests/integration/docker-compose.yml @@ -4,7 +4,7 @@ networks: integration: services: - + pulsar: image: apachepulsar/pulsar:3.0.0 container_name: pulsar @@ -24,7 +24,7 @@ services: retries: 5 start_period: 20s restart: unless-stopped - volumes: + volumes: - pulsardata:/pulsar/data - pulsarconf:/pulsar/conf @@ -35,7 +35,7 @@ services: container_name: kaskada depends_on: - pulsar - environment: + environment: # for sparrow SPARROW_LOG_FILTER: "egg::=warn,sparrow_=info,info" # for wren @@ -47,7 +47,7 @@ services: TMPDIR: "/data/tmp" logging: driver: "json-file" - options: + options: tag: "{{.ImageName}}|{{.Name}}|{{.ImageFullID}}|{{.FullID}}" networks: - integration @@ -59,7 +59,7 @@ services: - ../../testdata:/testdata restart: unless-stopped -volumes: +volumes: pulsardata: pulsarconf: diff --git a/tests/integration/shared/go.mod b/tests/integration/shared/go.mod index bccf971de..00b630d20 100644 --- a/tests/integration/shared/go.mod +++ b/tests/integration/shared/go.mod @@ -72,7 +72,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.9 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/stretchr/objx v0.5.0 // indirect - github.com/stretchr/testify v1.8.1 // indirect + github.com/stretchr/testify v1.8.4 // indirect github.com/tinylib/msgp v1.1.6 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.5.0 // indirect diff --git a/tests/integration/shared/go.sum b/tests/integration/shared/go.sum index 9b9c49b47..95108dac4 100644 --- a/tests/integration/shared/go.sum +++ b/tests/integration/shared/go.sum @@ -564,8 +564,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= diff --git a/tests/integration/shared/helpers/helpers.go b/tests/integration/shared/helpers/helpers.go index 5c2e4e9e5..31aad3c01 100644 --- a/tests/integration/shared/helpers/helpers.go +++ b/tests/integration/shared/helpers/helpers.go @@ -26,6 +26,7 @@ import ( . "github.com/kaskada-ai/kaskada/tests/integration/shared/matchers" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/reader" "google.golang.org/grpc" @@ -35,6 +36,11 @@ import ( v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" ) +const ( + localOutputPath string = "../data/output" + localTestdataPath string = "../../../testdata" +) + // HostConfig holds the data needed to connect to a particular grpc server type HostConfig struct { Hostname string @@ -104,6 +110,14 @@ func DownloadParquet(url string) []interface{} { return rows } +func TestsAreRunningLocally() bool { + return os.Getenv("ENV") == "local-local" +} + +func TestsAreRunningLocallyInDocker() bool { + return os.Getenv("ENV") == "local-docker" +} + func downloadFile(url string) (localPath string, cleanup func()) { if strings.HasPrefix(url, "http://") { // download to temp file @@ -125,17 +139,79 @@ func downloadFile(url string) (localPath string, cleanup func()) { } localPath = strings.TrimPrefix(url, "file://") cleanup = func() {} - if os.Getenv("ENV") != "local-local" { - localPath = fmt.Sprintf("../%s", localPath) + if TestsAreRunningLocallyInDocker() && !strings.HasPrefix(localPath, "..") { + localPath = fmt.Sprintf("..%s", localPath) } return } -func GetFileURI(fileName string) string { - if os.Getenv("ENV") == "local-local" { + + +// Gets an output path URI +func GetOutputPathURI(subPath string) string { + subPath = vfs_utils.EnsureTrailingSlash(subPath) + if TestsAreRunningLocally() { workDir, err := os.Getwd() Expect(err).ShouldNot(HaveOccurred()) - path := filepath.Join(workDir, "../../../testdata", fileName) + path := filepath.Join(workDir, localOutputPath, subPath) + return fmt.Sprintf("file://%s", path) + } + return fmt.Sprintf("file:///data/output/%s", subPath) +} + +// Deletes all files at the output path +func EmptyOutputPath(subPath string) { + subPath = vfs_utils.EnsureTrailingSlash(subPath) + workDir, err := os.Getwd() + Expect(err).ShouldNot(HaveOccurred()) + path := filepath.Join(workDir, localOutputPath, subPath) + err = os.RemoveAll(path) + Expect(err).ShouldNot(HaveOccurred()) +} + +// Lists all files at the output path +func ListOutputFiles(subPath string) []string { + subPath = vfs_utils.EnsureTrailingSlash(subPath) + workDir, err := os.Getwd() + Expect(err).ShouldNot(HaveOccurred()) + path := filepath.Join(workDir, localOutputPath, subPath) + dirEntries, err := os.ReadDir(path) + Expect(err).ShouldNot(HaveOccurred()) + + paths := []string{} + for _, dirEntry := range dirEntries { + if !dirEntry.IsDir() { + paths = append(paths, filepath.Join(path, dirEntry.Name())) + } + } + return paths +} + +// Lists all files at the output path +func EventuallyListOutputFiles(subPath string, g types.Gomega) []string { + subPath = vfs_utils.EnsureTrailingSlash(subPath) + workDir, err := os.Getwd() + g.Expect(err).ShouldNot(HaveOccurred()) + path := filepath.Join(workDir, localOutputPath, subPath) + dirEntries, err := os.ReadDir(path) + g.Expect(err).ShouldNot(HaveOccurred()) + + paths := []string{} + for _, dirEntry := range dirEntries { + if !dirEntry.IsDir() { + paths = append(paths, filepath.Join(localOutputPath, subPath, dirEntry.Name())) + } + } + g.Expect(len(paths)).Should(BeNumerically(">", 0)) + return paths +} + +// Gets a file URI for file in the testdata path +func GetTestFileURI(fileName string) string { + if TestsAreRunningLocally() { + workDir, err := os.Getwd() + Expect(err).ShouldNot(HaveOccurred()) + path := filepath.Join(workDir, localTestdataPath, fileName) return fmt.Sprintf("file://%s", path) } return fmt.Sprintf("file:///testdata/%s", fileName) @@ -143,7 +219,7 @@ func GetFileURI(fileName string) string { // Reads a file from the testdata path func ReadTestFile(fileName string) []byte { - filePath := fmt.Sprintf("../../../testdata/%s", fileName) + filePath := fmt.Sprintf("%s/%s", localTestdataPath, fileName) fileData, err := os.ReadFile(filePath) Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("issue reading testdata file: %s", fileName)) return fileData @@ -151,14 +227,14 @@ func ReadTestFile(fileName string) []byte { // Writes a file to the testdata path func WriteTestFile(fileName string, data []byte) { - filePath := fmt.Sprintf("../../../testdata/%s", fileName) + filePath := fmt.Sprintf("%s/%s", localTestdataPath, fileName) err := os.WriteFile(filePath, data, 0666) Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("issue writing testdata file: %s", fileName)) } // Deletes a file from the testdata path func DeleteTestFile(fileName string) { - filePath := fmt.Sprintf("../../../testdata/%s", fileName) + filePath := fmt.Sprintf("%s/%s", localTestdataPath, fileName) if fileExists(filePath) { err := os.Remove(filePath) Expect(err).ShouldNot(HaveOccurred(), fmt.Sprintf("issue deleting testdata file: %s", fileName)) @@ -194,7 +270,7 @@ func LoadTestFilesIntoTable(ctx context.Context, conn *grpc.ClientConn, table *v SourceData: &v1alpha.LoadDataRequest_FileInput{ FileInput: &v1alpha.FileInput{ FileType: fileType, - Uri: GetFileURI(fileName), + Uri: GetTestFileURI(fileName), }, }, } @@ -364,11 +440,6 @@ func GetMergedCreateQueryResponse(stream v1alpha.QueryService_CreateQueryClient) if queryResponse.RequestDetails != nil { mergedResponse.RequestDetails = queryResponse.RequestDetails } - if queryResponse.GetDestination().GetRedis() != nil { - mergedResponse.Destination = &v1alpha.Destination{ - Destination: &v1alpha.Destination_Redis{Redis: queryResponse.GetDestination().GetRedis()}, - } - } if queryResponse.GetDestination().GetObjectStore().GetOutputPaths() != nil { newPaths := queryResponse.GetDestination().GetObjectStore().GetOutputPaths().Paths existingPaths := []string{} diff --git a/wren/client/object_store_client.go b/wren/client/object_store_client.go index befa3733d..9398b2873 100644 --- a/wren/client/object_store_client.go +++ b/wren/client/object_store_client.go @@ -242,7 +242,7 @@ func (c objectStoreClient) GetPresignedDownloadURL(ctx context.Context, URI stri switch c.objectStoreType { case object_store_type_local: - presignedURL = file.Path() + presignedURL = fmt.Sprintf("file://%s", file.Path()) return case object_store_type_s3: diff --git a/wren/compute/compute_manager.go b/wren/compute/compute_manager.go index c632ff79c..b7e45ee5b 100644 --- a/wren/compute/compute_manager.go +++ b/wren/compute/compute_manager.go @@ -284,9 +284,6 @@ func (m *computeManager) processMaterializations(requestCtx context.Context, own case *v1alpha.Destination_Pulsar: matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializating to pulsar") destination.Destination = kind - case *v1alpha.Destination_Redis: - matLogger.Info().Interface("type", kind).Str("when", "pre-compute").Msg("materializing to redis") - destination.Destination = kind default: matLogger.Error().Interface("type", kind).Str("when", "pre-compute").Msg("materialization output type not implemented") return fmt.Errorf("materialization output type %s is not implemented", kind) diff --git a/wren/go.sum b/wren/go.sum index cfafcdcb0..d25a2aa64 100644 --- a/wren/go.sum +++ b/wren/go.sum @@ -283,6 +283,8 @@ github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-ieproxy v0.0.9 h1:RvVbLiMv/Hbjf1gRaC2AQyzwbdVhdId7D2vPnXIml4k= github.com/mattn/go-ieproxy v0.0.9/go.mod h1:eF30/rfdQUO9EnzNIZQr0r9HiLMlZNCpJkHbmMuOAE0= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.14.14 h1:qZgc/Rwetq+MtyE18WhzjokPD93dNqLGNT3QJuLvBGw= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -296,6 +298,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo/v2 v2.1.4 h1:GNapqRSid3zijZ9H77KrgVG4/8KqiyRsxcSxe+7ApXY= github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU= github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= @@ -349,7 +353,11 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU= +github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM= github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/wren/property/query_result_type.go b/wren/property/query_result_type.go index 526965349..60e735ece 100644 --- a/wren/property/query_result_type.go +++ b/wren/property/query_result_type.go @@ -5,7 +5,6 @@ type QueryResultType string const ( QueryResultTypeUnspecified QueryResultType = "UNSPECIFIED" QueryResultTypeParquet QueryResultType = "PARQUET" - QueryResultTypeRedisBulk QueryResultType = "REDIS_BULK" ) // Values provides list valid values for Enum. @@ -13,7 +12,6 @@ func (QueryResultType) Values() (kinds []string) { resulttypes := []QueryResultType{ QueryResultTypeUnspecified, QueryResultTypeParquet, - QueryResultTypeRedisBulk, } for _, s := range resulttypes { diff --git a/wren/service/query_v1.go b/wren/service/query_v1.go index a2c964d1d..b8e9d8860 100644 --- a/wren/service/query_v1.go +++ b/wren/service/query_v1.go @@ -426,7 +426,7 @@ func (q *queryV1Service) validateOutputTo(ctx context.Context, query *v1alpha.Qu default: subLogger.Warn().Interface("kind", kind).Interface("type", kind.ObjectStore.FileType).Msg("unknown output_to file_type, defaulting to 'ObjectStore->Parquet'") } - case *v1alpha.Destination_Pulsar, *v1alpha.Destination_Redis: + case *v1alpha.Destination_Pulsar: return fmt.Errorf("query output type: %s is only valid for materializations", kind) default: subLogger.Warn().Interface("kind", kind).Msg("unknown output_to, defaulting to 'ObjectStore->Parquet'")