Skip to content

Commit

Permalink
Merge pull request #9587 from rouault/geoparquet_fixes
Browse files Browse the repository at this point in the history
[Backport 3.8] Geoparquet related fixes
  • Loading branch information
rouault authored Mar 31, 2024
2 parents 634264c + a554f27 commit 6ab1e61
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 24 deletions.
5 changes: 5 additions & 0 deletions ogr/ogrsf_frmts/arrow_common/ogr_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ class OGRArrowLayer CPL_NON_FINAL
int GetNextArrowArray(struct ArrowArrayStream *,
struct ArrowArray *out) override;

virtual void IncrFeatureIdx()
{
++m_nFeatureIdx;
}

public:
virtual ~OGRArrowLayer() override;

Expand Down
20 changes: 11 additions & 9 deletions ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3497,7 +3497,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
break;
}

m_nFeatureIdx++;
IncrFeatureIdx();
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand Down Expand Up @@ -3574,7 +3574,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
}
}

if (nParts != 0 && m_sFilterEnvelope.Intersects(sEnvelope))
if (nParts != 0 && !m_sFilterEnvelope.Intersects(sEnvelope))
{
break;
}
Expand All @@ -3585,7 +3585,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
break;
}

m_nFeatureIdx++;
IncrFeatureIdx();
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand Down Expand Up @@ -3627,7 +3627,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
break;
}

m_nFeatureIdx++;
IncrFeatureIdx();
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand All @@ -3649,7 +3649,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
break;
}

m_nFeatureIdx++;
IncrFeatureIdx();
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand All @@ -3665,7 +3665,7 @@ inline OGRFeature *OGRArrowLayer::GetNextRawFeature()
if (m_iFIDArrowColumn < 0)
poFeature->SetFID(m_nFeatureIdx);

m_nFeatureIdx++;
IncrFeatureIdx();
m_nIdxInBatch++;

return poFeature;
Expand Down Expand Up @@ -3877,7 +3877,6 @@ inline OGRErr OGRArrowLayer::GetExtent(int iGeomField, OGREnvelope *psExtent,
}
}

m_nFeatureIdx++;
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand Down Expand Up @@ -3977,7 +3976,6 @@ inline OGRErr OGRArrowLayer::GetExtent(int iGeomField, OGREnvelope *psExtent,
}
}

m_nFeatureIdx++;
m_nIdxInBatch++;
if (m_nIdxInBatch == m_poBatch->num_rows())
{
Expand Down Expand Up @@ -4407,7 +4405,11 @@ inline int OGRArrowLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
OverrideArrowRelease(m_poArrowDS, out_array);

const auto nFeatureIdxCur = m_nFeatureIdx;
m_nFeatureIdx += m_nIdxInBatch;
// TODO: We likely have an issue regarding FIDs based on m_nFeatureIdx
// when m_iFIDArrowColumn < 0, only a subset of row groups is
// selected, and this batch goes accross non consecutive row groups.
for (int64_t i = 0; i < m_nIdxInBatch; ++i)
IncrFeatureIdx();

if (m_poAttrQuery || m_poFilterGeom)
{
Expand Down
10 changes: 9 additions & 1 deletion ogr/ogrsf_frmts/parquet/ogr_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ class OGRParquetLayer final : public OGRParquetLayerBase
std::vector<int> m_anMapGeomFieldIndexToParquetColumn{};
bool m_bHasMissingMappingToParquet = false;

std::vector<int64_t> m_anSelectedGroupsStartFeatureIdx{};
//! Contains pairs of (selected feature idx, total feature idx) break points.
std::vector<std::pair<int64_t, int64_t>> m_asFeatureIdxRemapping{};
//! Iterator over m_asFeatureIdxRemapping
std::vector<std::pair<int64_t, int64_t>>::iterator
m_oFeatureIdxRemappingIter{};
//! Feature index among the potentially restricted set of selected row gropus
int64_t m_nFeatureIdxSelected = 0;
std::vector<int> m_anRequestedParquetColumns{}; // only valid when
// m_bIgnoredFields is set
#ifdef DEBUG
Expand Down Expand Up @@ -120,6 +126,8 @@ class OGRParquetLayer final : public OGRParquetLayerBase

bool FastGetExtent(int iGeomField, OGREnvelope *psExtent) const override;

void IncrFeatureIdx() override;

public:
OGRParquetLayer(OGRParquetDataset *poDS, const char *pszLayerName,
std::unique_ptr<parquet::arrow::FileReader> &&arrow_reader,
Expand Down
56 changes: 42 additions & 14 deletions ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ OGRParquetLayer::OGRParquetLayer(
EstablishFeatureDefn();
CPLAssert(static_cast<int>(m_aeGeomEncoding.size()) ==
m_poFeatureDefn->GetGeomFieldCount());

m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
}

/************************************************************************/
Expand Down Expand Up @@ -922,6 +924,13 @@ void OGRParquetLayer::ResetReading()
m_poRecordBatchReader.reset();
}
OGRParquetLayerBase::ResetReading();
m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
m_nFeatureIdxSelected = 0;
if (!m_asFeatureIdxRemapping.empty())
{
m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
++m_oFeatureIdxRemappingIter;
}
}

/************************************************************************/
Expand Down Expand Up @@ -1028,6 +1037,25 @@ static IsConstraintPossibleRes IsConstraintPossible(int nOperation, T v, T min,
return IsConstraintPossibleRes::YES;
}

/************************************************************************/
/* IncrFeatureIdx() */
/************************************************************************/

void OGRParquetLayer::IncrFeatureIdx()
{
++m_nFeatureIdxSelected;
++m_nFeatureIdx;
if (m_iFIDArrowColumn < 0 && !m_asFeatureIdxRemapping.empty() &&
m_oFeatureIdxRemappingIter != m_asFeatureIdxRemapping.end())
{
if (m_nFeatureIdxSelected == m_oFeatureIdxRemappingIter->first)
{
m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
++m_oFeatureIdxRemappingIter;
}
}
}

/************************************************************************/
/* ReadNextBatch() */
/************************************************************************/
Expand All @@ -1048,7 +1076,7 @@ bool OGRParquetLayer::ReadNextBatch()

if (m_poRecordBatchReader == nullptr)
{
m_anSelectedGroupsStartFeatureIdx.clear();
m_asFeatureIdxRemapping.clear();

bool bIterateEverything = false;
std::vector<int> anSelectedGroups;
Expand All @@ -1074,7 +1102,8 @@ bool OGRParquetLayer::ReadNextBatch()
OGRFieldType eType = OFTMaxType;
OGRFieldSubType eSubType = OFSTNone;
std::string osMinTmp, osMaxTmp;
GIntBig nFeatureIdx = 0;
int64_t nFeatureIdxSelected = 0;
int64_t nFeatureIdxTotal = 0;

for (int iRowGroup = 0;
iRowGroup < nNumGroups && !bIterateEverything; ++iRowGroup)
Expand Down Expand Up @@ -1152,9 +1181,9 @@ bool OGRParquetLayer::ReadNextBatch()
if (iOGRField == OGR_FID_INDEX &&
m_iFIDParquetColumn < 0)
{
sMin.Integer64 = nFeatureIdx;
sMin.Integer64 = nFeatureIdxTotal;
sMax.Integer64 =
nFeatureIdx +
nFeatureIdxTotal +
poRowGroup->metadata()->num_rows() - 1;
eType = OFTInteger64;
}
Expand Down Expand Up @@ -1311,26 +1340,32 @@ bool OGRParquetLayer::ReadNextBatch()
if (bSelectGroup)
{
// CPLDebug("PARQUET", "Selecting row group %d", iRowGroup);
m_anSelectedGroupsStartFeatureIdx.push_back(nFeatureIdx);
m_asFeatureIdxRemapping.emplace_back(
std::make_pair(nFeatureIdxSelected, nFeatureIdxTotal));
anSelectedGroups.push_back(iRowGroup);
nFeatureIdxSelected += poRowGroup->metadata()->num_rows();
}

nFeatureIdx += poRowGroup->metadata()->num_rows();
nFeatureIdxTotal += poRowGroup->metadata()->num_rows();
}
}

if (bIterateEverything)
{
m_anSelectedGroupsStartFeatureIdx.clear();
m_asFeatureIdxRemapping.clear();
m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
if (!CreateRecordBatchReader(0))
return false;
}
else
{
m_oFeatureIdxRemappingIter = m_asFeatureIdxRemapping.begin();
if (anSelectedGroups.empty())
{
return false;
}
m_nFeatureIdx = m_oFeatureIdxRemappingIter->second;
++m_oFeatureIdxRemappingIter;
if (!CreateRecordBatchReader(anSelectedGroups))
{
return false;
Expand Down Expand Up @@ -1362,13 +1397,6 @@ bool OGRParquetLayer::ReadNextBatch()
m_poBatch.reset();
return false;
}
if (!m_anSelectedGroupsStartFeatureIdx.empty())
{
CPLAssert(
m_iRecordBatch <
static_cast<int>(m_anSelectedGroupsStartFeatureIdx.size()));
m_nFeatureIdx = m_anSelectedGroupsStartFeatureIdx[m_iRecordBatch];
}
} while (poNextBatch->num_rows() == 0);

SetBatch(poNextBatch);
Expand Down

0 comments on commit 6ab1e61

Please sign in to comment.