Skip to content

Commit

Permalink
Merge pull request redpanda-data#23787 from mmaslankaprv/iceberg_rest…
Browse files Browse the repository at this point in the history
…_catalog_cleanup

Iceberg rest catalog cleanup
  • Loading branch information
mmaslankaprv authored Oct 16, 2024
2 parents 1754e47 + 1b6dbdb commit 41595d8
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 0 deletions.
6 changes: 6 additions & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ RUN /ocsf-server && rm /ocsf-server
FROM base AS polaris
COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/polaris /
RUN /polaris && rm /polaris
#################################

FROM base as iceberg-rest
COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/iceberg-rest-catalog /
RUN /iceberg-rest-catalog && rm /iceberg-rest-catalog

#################################

Expand Down Expand Up @@ -333,6 +338,7 @@ COPY --from=ocsf /opt/ocsf-schema/ /opt/ocsf-schema/
COPY --from=ocsf /opt/ocsf-server/ /opt/ocsf-server/
COPY --from=polaris /opt/polaris/ /opt/polaris/
COPY --from=flink /opt/flink/ /opt/flink/
COPY --from=iceberg-rest /opt/iceberg-rest-image /opt/iceberg-rest-image

RUN ldconfig

Expand Down
8 changes: 8 additions & 0 deletions tests/docker/ducktape-deps/iceberg-rest-catalog
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -e

git -C /opt clone https://github.com/tabular-io/iceberg-rest-image.git
cd /opt/iceberg-rest-image
git reset --hard 62a5078032e0675d34b377d2ead2eea814c1da48
ARCH=$(dpkg-architecture -q DEB_BUILD_ARCH)
JAVA_HOME="/usr/lib/jvm/java-17-openjdk-${ARCH}" ./gradlew --no-daemon --info shadowJar
132 changes: 132 additions & 0 deletions tests/rptest/services/apache_iceberg_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright 2024 Vectorized, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import os

from ducktape.services.service import Service
from ducktape.cluster.cluster import ClusterNode
from ducktape.utils.util import wait_until
from pyiceberg.catalog import load_catalog

import uuid


class IcebergRESTCatalog(Service):
"""A Iceberg REST service compatible with minio. This service is a thin REST wrapper over
a catalog IO implementation controlled via CATALOG_IO__IMPL. Out of the box, it defaults
to org.apache.iceberg.jdbc.JdbcCatalog over a temporary sqlite db. It can also wrap over a
S3 based (minio) implementaion by setting org.apache.iceberg.aws.s3.S3FileIO.
"""

PERSISTENT_ROOT = "/var/lib/iceberg_rest/"
LOG_FILE = os.path.join(PERSISTENT_ROOT, "iceberg_rest_server.log")
JAR = "iceberg-rest-image-all.jar"
JAR_PATH = f"/opt/iceberg-rest-image/build/libs/{JAR}"
logs = {"iceberg_rest_logs": {"path": LOG_FILE, "collect_default": True}}
DEFAULT_CATALOG_IMPL = "org.apache.iceberg.jdbc.JdbcCatalog"
DEFAULT_CATALOG_JDBC_URI = f"jdbc:sqlite:file:/tmp/{uuid.uuid1()}_iceberg_rest_mode=memory"
DEFAULT_CATALOG_DB_USER = "user"
DEFAULT_CATALOG_DB_PASS = "password"

def __init__(self,
ctx,
cloud_storage_warehouse: str,
cloud_storage_access_key: str = 'panda-user',
cloud_storage_secret_key: str = 'panda-secret',
cloud_storage_region: str = 'panda-region',
cloud_storage_api_endpoint: str = "http://minio-s3:9000",
node: ClusterNode | None = None):
super(IcebergRESTCatalog, self).__init__(ctx,
num_nodes=0 if node else 1)
self.cloud_storage_access_key = cloud_storage_access_key
self.cloud_storage_secret_key = cloud_storage_secret_key
self.cloud_storage_region = cloud_storage_region
self.cloud_storage_api_endpoint = cloud_storage_api_endpoint
self.cloud_storage_warehouse = cloud_storage_warehouse

self.catalog_url = None

def _make_env(self):
env = dict()
env["AWS_ACCESS_KEY_ID"] = self.cloud_storage_access_key
env["AWS_SECRET_ACCESS_KEY"] = self.cloud_storage_secret_key
env["AWS_REGION"] = self.cloud_storage_region
env["CATALOG_WAREHOUSE"] = f"s3a://{self.cloud_storage_warehouse}"
# This also takes org.apache.iceberg.aws.s3.S3FileIO
# to use a wrapper around S3 based catalog.
env["CATALOG_CATALOG__IMPL"] = IcebergRESTCatalog.DEFAULT_CATALOG_IMPL
env["CATALOG_URI"] = IcebergRESTCatalog.DEFAULT_CATALOG_JDBC_URI
env["CATALOG_JDBC_USER"] = IcebergRESTCatalog.DEFAULT_CATALOG_DB_USER
env["CATALOG_JDBC_PASSWORD"] = IcebergRESTCatalog.DEFAULT_CATALOG_DB_PASS
env["CATALOG_S3_ENDPOINT"] = self.cloud_storage_api_endpoint
env["CATALOG_IO__IMPL"] = "org.apache.iceberg.aws.s3.S3FileIO"
return env

def _cmd(self):
java = "/opt/java/java-17"
envs = self._make_env()
env = " ".join(f"{k}={v}" for k, v in envs.items())
return f"{env} {java} -jar {IcebergRESTCatalog.JAR_PATH} \
1>> {IcebergRESTCatalog.LOG_FILE} 2>> {IcebergRESTCatalog.LOG_FILE} &"

def client(self, catalog_name="default"):
assert self.catalog_url
return load_catalog(
catalog_name, **{
"uri": self.catalog_url,
"s3.endpoint": self.cloud_storage_api_endpoint,
"s3.access-key-id": self.cloud_storage_access_key,
"s3.secret-access-key": self.cloud_storage_secret_key,
"s3.region": self.cloud_storage_region,
})

def start_node(self, node, timeout_sec=60, **kwargs):
node.account.ssh("mkdir -p %s" % IcebergRESTCatalog.PERSISTENT_ROOT,
allow_fail=False)
cmd = self._cmd()
self.logger.info(
f"Starting Iceberg REST catalog service on {node.name} with command {cmd}"
)
node.account.ssh(cmd, allow_fail=False)
self.catalog_url = f"http://{node.account.hostname}:8181"

def wait_node(self, node, timeout_sec=None):
check_cmd = f"pyiceberg --uri http://localhost:8181 list"

def _ready():
out = node.account.ssh_output(check_cmd)
status_code = int(out.decode('utf-8'))
self.logger.info(f"health check result status code: {status_code}")
return status_code == 200

wait_until(_ready,
timeout_sec=timeout_sec,
backoff_sec=0.4,
err_msg="Error waiting for Iceberg REST catalog to start",
retry_on_exc=True)
return True

def stop_node(self, node, allow_fail=False, **_):

node.account.kill_java_processes(IcebergRESTCatalog.JAR,
allow_fail=allow_fail)

def _stopped():
out = node.account.ssh_output("jcmd").decode('utf-8')
return not (IcebergRESTCatalog.JAR in out)

wait_until(_stopped,
timeout_sec=10,
backoff_sec=1,
err_msg="Error stopping Iceberg REST catalog")

def clean_node(self, node, **_):
self.stop_node(node, allow_fail=True)
node.account.remove(IcebergRESTCatalog.PERSISTENT_ROOT,
allow_fail=True)
8 changes: 8 additions & 0 deletions tests/rptest/tests/datalake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
36 changes: 36 additions & 0 deletions tests/rptest/tests/datalake/iceberg_rest_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import uuid
from rptest.archival.s3_client import S3Client
from rptest.services.apache_iceberg_catalog import IcebergRESTCatalog
from rptest.tests.redpanda_test import RedpandaTest


class IcebergRESTCatalogTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super(IcebergRESTCatalogTest, self).__init__(test_ctx, *args, **kwargs)
self.s3_client = S3Client(region="panda-region",
access_key="panda-user",
secret_key="panda-secret",
endpoint="http://minio-s3:9000",
logger=self.logger)
self.warehouse = f"warehouse-{uuid.uuid1()}"
self.catalog_service = IcebergRESTCatalog(
test_ctx, cloud_storage_warehouse=self.warehouse)

def setUp(self):
self.s3_client.create_bucket(self.warehouse)
self.catalog_service.start()
return super().setUp()

def tearDown(self):
self.catalog_service.stop()
self.s3_client.empty_and_delete_bucket(self.warehouse)
return super().tearDown()
66 changes: 66 additions & 0 deletions tests/rptest/tests/datalake/iceberg_rest_catalog_smoke_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from rptest.services.cluster import cluster

from rptest.tests.datalake.iceberg_rest_catalog import IcebergRESTCatalogTest
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
StructType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform


class IcebergRESTCatalogSmokeTest(IcebergRESTCatalogTest):
def __init__(self, test_ctx, *args, **kwargs):
super(IcebergRESTCatalogSmokeTest, self).__init__(test_ctx,
num_brokers=1,
*args,
extra_rp_conf={},
**kwargs)

@cluster(num_nodes=2)
def test_basic(self):

catalog = self.catalog_service.client()
namespace = "test_ns"
catalog.create_namespace(namespace)
catalog.list_tables(namespace)

schema = Schema(
NestedField(field_id=1,
name="datetime",
field_type=TimestampType(),
required=True),
NestedField(field_id=2,
name="symbol",
field_type=StringType(),
required=True),
NestedField(field_id=3,
name="bid",
field_type=FloatType(),
required=False),
NestedField(field_id=4,
name="ask",
field_type=DoubleType(),
required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(field_id=4,
name="created_by",
field_type=StringType(),
required=False), ),
required=False,
),
)
table = catalog.create_table(identifier=f"{namespace}.bids",
schema=schema,
location=f"s3a://{self.warehouse}/bids")
self.logger.info(f">>> {table}")

assert "bids" in [t[1] for t in catalog.list_tables(namespace)]

0 comments on commit 41595d8

Please sign in to comment.