Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into apachegh-37437-make…
Browse files Browse the repository at this point in the history
…arrayofnulls
  • Loading branch information
jorisvandenbossche committed Oct 5, 2023
2 parents 534e279 + 02de3c1 commit 409d8c0
Show file tree
Hide file tree
Showing 70 changed files with 2,481 additions and 1,307 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ jobs:
restore-keys: ${{ matrix.image }}-
- name: Setup Python
run: |
sudo apt install -y --no-install-recommends python3 python3-pip
sudo apt update
sudo apt install -y --no-install-recommends python3 python3-dev python3-pip
- name: Setup Archery
run: python3 -m pip install -e dev/archery[docker]
- name: Execute Docker Build
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ jobs:
submodules: recursive
- name: Setup Python
run: |
sudo apt install -y --no-install-recommends python3 python3-pip
sudo apt update
sudo apt install -y --no-install-recommends python3 python3-dev python3-pip
- name: Setup Archery
run: python3 -m pip install -e dev/archery[docker]
- name: Execute Docker Build
Expand Down
6 changes: 0 additions & 6 deletions .github/workflows/java_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ jobs:
fi
echo $PREFIX
archery crossbow download-artifacts -f java-jars -t binaries $PREFIX
- name: Cache Repo
uses: actions/cache@v3
with:
path: repo
key: java-nightly-${{ github.run_id }}
restore-keys: java-nightly
- name: Sync from Remote
uses: ./arrow/.github/actions/sync-nightlies
with:
Expand Down
4 changes: 3 additions & 1 deletion ci/docker/conda-integration.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ ARG go=1.19.13
# Install Archery and integration dependencies
COPY ci/conda_env_archery.txt /arrow/ci/

# Pin Python until pythonnet is made compatible with 3.12
# (https://github.com/pythonnet/pythonnet/pull/2249)
RUN mamba install -q -y \
--file arrow/ci/conda_env_archery.txt \
"python>=3.7" \
"python < 3.12" \
numpy \
compilers \
maven=${maven} \
Expand Down
5 changes: 5 additions & 0 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1

# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ set(ARROW_DOC_DIR "share/doc/${PROJECT_NAME}")
set(BUILD_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/build-support")

set(ARROW_LLVM_VERSIONS
"17.0"
"16.0"
"15.0"
"14.0"
Expand Down
24 changes: 14 additions & 10 deletions cpp/cmake_modules/FindLLVMAlt.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,20 @@ if(LLVM_FOUND)
target_link_libraries(LLVM::LLVM_LIBS INTERFACE LLVM)
else()
# Find the libraries that correspond to the LLVM components
llvm_map_components_to_libnames(LLVM_LIBS
core
mcjit
native
ipo
bitreader
target
linker
analysis
debuginfodwarf)
set(LLVM_TARGET_COMPONENTS
analysis
bitreader
core
debuginfodwarf
ipo
linker
mcjit
native
target)
if(LLVM_VERSION_MAJOR GREATER_EQUAL 14)
list(APPEND LLVM_TARGET_COMPONENTS passes)
endif()
llvm_map_components_to_libnames(LLVM_LIBS ${LLVM_TARGET_COMPONENTS})
target_link_libraries(LLVM::LLVM_LIBS INTERFACE ${LLVM_LIBS})

if(TARGET LLVMSupport AND NOT ARROW_ZSTD_USE_SHARED)
Expand Down
49 changes: 40 additions & 9 deletions cpp/src/arrow/compute/kernels/codegen_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
bool saw_date32 = false;
bool saw_date64 = false;
bool saw_duration = false;
bool saw_time_since_midnight = false;
const TypeHolder* end = begin + count;
for (auto it = begin; it != end; it++) {
auto id = it->type->id();
Expand All @@ -271,6 +272,18 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
finest_unit = std::max(finest_unit, ty.unit());
continue;
}
case Type::TIME32: {
const auto& type = checked_cast<const Time32Type&>(*it->type);
finest_unit = std::max(finest_unit, type.unit());
saw_time_since_midnight = true;
continue;
}
case Type::TIME64: {
const auto& type = checked_cast<const Time64Type&>(*it->type);
finest_unit = std::max(finest_unit, type.unit());
saw_time_since_midnight = true;
continue;
}
case Type::DURATION: {
const auto& ty = checked_cast<const DurationType&>(*it->type);
finest_unit = std::max(finest_unit, ty.unit());
Expand All @@ -282,15 +295,33 @@ TypeHolder CommonTemporal(const TypeHolder* begin, size_t count) {
}
}

if (timezone) {
// At least one timestamp seen
return timestamp(finest_unit, *timezone);
} else if (saw_date64) {
return date64();
} else if (saw_date32) {
return date32();
} else if (saw_duration) {
return duration(finest_unit);
bool saw_timestamp_or_date = timezone || saw_date64 || saw_date32 || saw_duration;

if (saw_time_since_midnight && saw_timestamp_or_date) {
// Cannot find common type
return TypeHolder(nullptr);
}
if (saw_timestamp_or_date) {
if (timezone) {
// At least one timestamp seen
return timestamp(finest_unit, *timezone);
} else if (saw_date64) {
return date64();
} else if (saw_date32) {
return date32();
} else if (saw_duration) {
return duration(finest_unit);
}
}
if (saw_time_since_midnight) {
switch (finest_unit) {
case TimeUnit::SECOND:
case TimeUnit::MILLI:
return time32(finest_unit);
case TimeUnit::MICRO:
case TimeUnit::NANO:
return time64(finest_unit);
}
}
return TypeHolder(nullptr);
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/compute/kernels/codegen_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ TEST(TestDispatchBest, CommonTemporal) {
args = {timestamp(TimeUnit::SECOND, "America/Phoenix"),
timestamp(TimeUnit::SECOND, "UTC")};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);

args = {time32(TimeUnit::SECOND), time32(TimeUnit::MILLI)};
AssertTypeEqual(*time32(TimeUnit::MILLI), *CommonTemporal(args.data(), args.size()));

args = {time32(TimeUnit::SECOND), time64(TimeUnit::NANO)};
AssertTypeEqual(*time64(TimeUnit::NANO), *CommonTemporal(args.data(), args.size()));

args = {date32(), time32(TimeUnit::SECOND)};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);

args = {timestamp(TimeUnit::SECOND), time32(TimeUnit::SECOND)};
ASSERT_EQ(CommonTemporal(args.data(), args.size()), nullptr);
}

TEST(TestDispatchBest, CommonTemporalResolution) {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
return custom_open_();
}

Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
if (filesystem_) {
return filesystem_->OpenInputFileAsync(file_info_);
}

if (buffer_) {
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(
std::make_shared<io::BufferReader>(buffer_));
}

// TODO(GH-37962): custom_open_ should not block
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
}

int64_t FileSource::Size() const {
if (filesystem_) {
return file_info_.size();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;

/// \brief Get the size (in bytes) of the file or buffer
/// If the file is compressed this should be the compressed (on-disk) size.
Expand Down
50 changes: 28 additions & 22 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,29 +479,35 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();

auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);

return source.OpenAsync().Then(
[=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties),
metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only
// type)> It *wouldn't* be safe to const_cast reader except that
// here we know there are no other waiters on the reader.
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
reader)),
std::move(arrow_properties), &arrow_reader));

return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}

Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,31 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingDuratio
CountRowGroupsInFragment(fragment, {0}, expr);
}

TEST_P(TestParquetFileFormatScan,
PredicatePushdownRowGroupFragmentsUsingTimestampColumn) {
// GH-37799: Parquet arrow will change TimeUnit::SECOND to TimeUnit::MILLI
// because parquet LogicalType doesn't support SECOND.
for (auto time_unit : {TimeUnit::MILLI, TimeUnit::SECOND}) {
auto table = TableFromJSON(schema({field("t", time32(time_unit))}),
{
R"([{"t": 1}])",
R"([{"t": 2}, {"t": 3}])",
});
TableBatchReader table_reader(*table);
ARROW_SCOPED_TRACE("time_unit=", time_unit);
ASSERT_OK_AND_ASSIGN(
auto source,
ParquetFormatHelper::Write(
&table_reader, ArrowWriterProperties::Builder().store_schema()->build())
.As<FileSource>());
SetSchema({field("t", time32(time_unit))});
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(source));

auto expr = equal(field_ref("t"), literal(::arrow::Time32Scalar(1, time_unit)));
CountRowGroupsInFragment(fragment, {0}, expr);
}
}

// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {
Expand Down
Loading

0 comments on commit 409d8c0

Please sign in to comment.