Skip to content

Commit

Permalink
attempt to refactor cython code with separate files for with/without …
Browse files Browse the repository at this point in the history
…encryption
  • Loading branch information
jorisvandenbossche committed Oct 9, 2023
1 parent 2610fbb commit f78a162
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 378 deletions.
21 changes: 13 additions & 8 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,9 @@ if(PYARROW_BUILD_PARQUET)
endif()
if(PYARROW_BUILD_DATASET)
list(APPEND CYTHON_EXTENSIONS _dataset_parquet)
# include this always, even if PYARROW_BUILD_PARQUET_ENCRYPTION is not
# enabled (source file gets replaced with dummies below)
list(APPEND CYTHON_EXTENSIONS _dataset_parquet_encryption)
endif()
endif()

Expand Down Expand Up @@ -713,14 +716,6 @@ endif()
# Error on any warnings not already explicitly ignored.
set(CYTHON_FLAGS "${CYTHON_FLAGS}" "--warning-errors")

if(PYARROW_BUILD_PARQUET_ENCRYPTION)
message(STATUS "Parquet Encryption Enabled")
list(APPEND CYTHON_FLAGS "-E" "PARQUET_ENCRYPTION_ENABLED=1")
else()
message(STATUS "Parquet Encryption is NOT Enabled")
list(APPEND CYTHON_FLAGS "-E" "PARQUET_ENCRYPTION_ENABLED=0")
endif()

foreach(module ${CYTHON_EXTENSIONS})
string(REPLACE "." ";" directories ${module})
list(GET directories -1 module_name)
Expand All @@ -730,6 +725,16 @@ foreach(module ${CYTHON_EXTENSIONS})
set(module_SRC pyarrow/${module_root}.pyx)
set_source_files_properties(${module_SRC} PROPERTIES CYTHON_IS_CXX TRUE)

if(${module_name} STREQUAL "_dataset_parquet_encryption")
if(PYARROW_BUILD_PARQUET_ENCRYPTION)
message(STATUS "Parquet Encryption Enabled")
set(module_SRC pyarrow/_dataset_parquet_encryption.pyx)
else()
message(STATUS "Parquet Encryption is NOT Enabled")
set(module_SRC pyarrow/_dataset_parquet_no_encryption.pyx)
endif()
endif()

cython_add_module(${module_name} ${module_name}_pyx ${module_name}_output ${module_SRC})

if(directories)
Expand Down
181 changes: 17 additions & 164 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.includes.libarrow_dataset_parquet cimport *
from pyarrow._fs cimport FileSystem

IF PARQUET_ENCRYPTION_ENABLED:
from pyarrow.includes.libarrow_parquet_readwrite_encryption cimport *
from pyarrow._parquet_encryption cimport *
ELSE:
from pyarrow.includes.libarrow_parquet_readwrite cimport *


from pyarrow._compute cimport Expression, _bind
from pyarrow._dataset cimport (
_make_file_source,
Expand All @@ -53,7 +46,7 @@ from pyarrow._dataset cimport (
PartitioningFactory,
WrittenFile
)

from pyarrow._dataset_parquet_encryption cimport *

from pyarrow._parquet cimport (
_create_writer_properties, _create_arrow_writer_properties,
Expand All @@ -65,134 +58,6 @@ cdef Expression _true = Expression._scalar(True)

ctypedef CParquetFileWriter* _CParquetFileWriterPtr

IF PARQUET_ENCRYPTION_ENABLED:
cdef class ParquetEncryptionConfig(_Weakrefable):
"""
Core configuration class encapsulating parameters for high-level encryption
within the Parquet framework.
The ParquetEncryptionConfig class serves as a bridge for passing encryption-related
parameters to the appropriate components within the Parquet library. It maintains references
to objects that define the encryption strategy, Key Management Service (KMS) configuration,
and specific encryption configurations for Parquet data.
Parameters
----------
crypto_factory : pyarrow.parquet.encryption.CryptoFactory
Shared pointer to a `CryptoFactory` object. The `CryptoFactory` is responsible for
creating cryptographic components, such as encryptors and decryptors.
kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig
Shared pointer to a `KmsConnectionConfig` object. This object holds the configuration
parameters necessary for connecting to a Key Management Service (KMS).
encryption_config : pyarrow.parquet.encryption.EncryptionConfiguration
Shared pointer to an `EncryptionConfiguration` object. This object defines specific
encryption settings for Parquet data, including the keys assigned to different columns.
Raises
------
ValueError
Raised if `encryption_config` is None.
"""
cdef:
shared_ptr[CParquetEncryptionConfig] c_config

# Avoid mistakenly creating attributes
__slots__ = ()

def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig kms_connection_config,
EncryptionConfiguration encryption_config):

cdef shared_ptr[CEncryptionConfiguration] c_encryption_config

if crypto_factory is None:
raise ValueError("crypto_factory cannot be None")

if kms_connection_config is None:
raise ValueError("kms_connection_config cannot be None")

if encryption_config is None:
raise ValueError("encryption_config cannot be None")

self.c_config.reset(new CParquetEncryptionConfig())

c_encryption_config = pyarrow_unwrap_encryptionconfig(
encryption_config)

self.c_config.get().crypto_factory = pyarrow_unwrap_cryptofactory(crypto_factory)
self.c_config.get().kms_connection_config = pyarrow_unwrap_kmsconnectionconfig(
kms_connection_config)
self.c_config.get().encryption_config = c_encryption_config

@staticmethod
cdef wrap(shared_ptr[CParquetEncryptionConfig] c_config):
cdef ParquetEncryptionConfig python_config = ParquetEncryptionConfig.__new__(ParquetEncryptionConfig)
python_config.c_config = c_config
return python_config

cdef shared_ptr[CParquetEncryptionConfig] unwrap(self):
return self.c_config

cdef class ParquetDecryptionConfig(_Weakrefable):
"""
Core configuration class encapsulating parameters for high-level decryption
within the Parquet framework.
ParquetDecryptionConfig is designed to pass decryption-related parameters to
the appropriate decryption components within the Parquet library. It holds references to
objects that define the decryption strategy, Key Management Service (KMS) configuration,
and specific decryption configurations for reading encrypted Parquet data.
Parameters
----------
crypto_factory : pyarrow.parquet.encryption.CryptoFactory
Shared pointer to a `CryptoFactory` object, pivotal in creating cryptographic
components for the decryption process.
kms_connection_config : pyarrow.parquet.encryption.KmsConnectionConfig
Shared pointer to a `KmsConnectionConfig` object, containing parameters necessary
for connecting to a Key Management Service (KMS) during decryption.
decryption_config : pyarrow.parquet.encryption.DecryptionConfiguration
Shared pointer to a `DecryptionConfiguration` object, specifying decryption settings
for reading encrypted Parquet data.
Raises
------
ValueError
Raised if `decryption_config` is None.
"""

cdef:
shared_ptr[CParquetDecryptionConfig] c_config

# Avoid mistakingly creating attributes
__slots__ = ()

def __cinit__(self, CryptoFactory crypto_factory, KmsConnectionConfig kms_connection_config,
DecryptionConfiguration decryption_config):

cdef shared_ptr[CDecryptionConfiguration] c_decryption_config

if decryption_config is None:
raise ValueError(
"decryption_config cannot be None")

self.c_config.reset(new CParquetDecryptionConfig())

c_decryption_config = pyarrow_unwrap_decryptionconfig(
decryption_config)

self.c_config.get().crypto_factory = pyarrow_unwrap_cryptofactory(crypto_factory)
self.c_config.get().kms_connection_config = pyarrow_unwrap_kmsconnectionconfig(
kms_connection_config)
self.c_config.get().decryption_config = c_decryption_config

@staticmethod
cdef wrap(shared_ptr[CParquetDecryptionConfig] c_config):
cdef ParquetDecryptionConfig python_config = ParquetDecryptionConfig.__new__(ParquetDecryptionConfig)
python_config.c_config = c_config
return python_config

cdef shared_ptr[CParquetDecryptionConfig] unwrap(self):
return self.c_config

cdef class ParquetFileFormat(FileFormat):
"""
Expand Down Expand Up @@ -707,6 +572,8 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
for name, value in kwargs.items():
if name not in self._properties:
raise TypeError("unexpected parquet write option: " + name)
if name == "encryption_properties" and not is_encryption_enabled():
raise NotImplementedError("...")
self._properties[name] = value
if name in arrow_fields:
setters.add(self._set_arrow_properties)
Expand Down Expand Up @@ -825,8 +692,7 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
"""

cdef CParquetFragmentScanOptions* parquet_options
IF PARQUET_ENCRYPTION_ENABLED:
cdef ParquetDecryptionConfig _parquet_decryption_config
cdef object _parquet_decryption_config

# Avoid mistakingly creating attributes
__slots__ = ()
Expand All @@ -846,14 +712,8 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.thrift_string_size_limit = thrift_string_size_limit
if thrift_container_size_limit is not None:
self.thrift_container_size_limit = thrift_container_size_limit

IF PARQUET_ENCRYPTION_ENABLED:
if decryption_config:
self.parquet_decryption_config = decryption_config
ELSE:
if decryption_config is not None:
raise NotImplementedError(
"Encryption is not enabled, but a decryption_config was provided.")
if decryption_config is not None:
self.parquet_decryption_config = decryption_config

cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
Expand All @@ -865,29 +725,22 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
cdef ArrowReaderProperties* arrow_reader_properties(self):
return self.parquet_options.arrow_reader_properties.get()

IF PARQUET_ENCRYPTION_ENABLED:
@property
def parquet_decryption_config(self):
return self._parquet_decryption_config

@parquet_decryption_config.setter
def parquet_decryption_config(self, ParquetDecryptionConfig config):
cdef shared_ptr[CParquetDecryptionConfig] c_config
if not isinstance(config, ParquetDecryptionConfig):
raise ValueError("config must be a ParquetDecryptionConfig")
self._parquet_decryption_config = config
c_config = config.unwrap()
self.parquet_options.parquet_decryption_config = c_config
ELSE:
@property
def parquet_decryption_config(self):
@property
def parquet_decryption_config(self):
if not is_encryption_enabled():
raise NotImplementedError(
"Unable to access encryption features; the code was compiled without the necessary encryption support.")
return self._parquet_decryption_config

@parquet_decryption_config.setter
def parquet_decryption_config(self, ParquetDecryptionConfig config):
@parquet_decryption_config.setter
def parquet_decryption_config(self, config):
if not is_encryption_enabled():
raise NotImplementedError(
"Unable to access encryption features; the code was compiled without the necessary encryption support.")
# raise NotImplementedError(
# "Encryption is not enabled, but a decryption_config was provided.")
set_decryption_config(self.parquet_options, config)
self._parquet_decryption_config = config

@property
def use_buffered_stream(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@
# specific language governing permissions and limitations
# under the License.

# distutils: language = c++
# cython: language_level = 3

from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._parquet cimport *
"""Dataset support for Parquet encryption."""

cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CParquetFileWriteOptions \
"arrow::dataset::ParquetFileWriteOptions"(CFileWriteOptions):
shared_ptr[WriterProperties] writer_properties
shared_ptr[ArrowWriterProperties] arrow_writer_properties
from pyarrow.includes.libarrow_dataset_parquet cimport *

cdef cppclass CParquetFragmentScanOptions \
"arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions):
shared_ptr[CReaderProperties] reader_properties
shared_ptr[ArrowReaderProperties] arrow_reader_properties

cdef bint is_encryption_enabled()
cdef set_decryption_config(CParquetFragmentScanOptions * parquet_options, config)
Loading

0 comments on commit f78a162

Please sign in to comment.