Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Dec 4, 2024
1 parent dbe1922 commit 41ec616
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,12 @@ public boolean reportColumnStats() {
.defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
.parse();
}

public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::valueOf)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ public void reset() {
public CometVector read(CometVector reuse, int numRows) {
delegate.readBatch(numRows);
org.apache.comet.vector.CometVector bv = delegate.currentBatch();
if (reuse == null) {
reuse = vector;
}
reuse.setDelegate(bv);
return reuse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,15 @@ public void setRowGroupInfo(
&& !(readers[i] instanceof CometPositionColumnReader)
&& !(readers[i] instanceof CometDeleteColumnReader)) {
readers[i].reset();
readers[i].setPageReader(
pageStore.getPageReader(((CometColumnReader) readers[i]).getDescriptor()));
readers[i].setPageReader(pageStore.getPageReader(readers[i].getDescriptor()));
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e);
}
}

for (int i = 0; i < readers.length; i++) {
delegate.getColumnReaders()[i] = ((CometColumnReader) this.readers[i]).getDelegate();
delegate.getColumnReaders()[i] = this.readers[i].getDelegate();
}

this.rowStartPosInBatch = rowPosition;
Expand Down Expand Up @@ -154,7 +153,7 @@ void readDeletedColumnIfNecessary(ColumnVector[] columnVectors) {
if (readers[i] instanceof CometDeleteColumnReader) {
CometDeleteColumnReader deleteColumnReader = new CometDeleteColumnReader<>(isDeleted);
deleteColumnReader.setBatchSize(numRowsToRead);
deleteColumnReader.read(null, numRowsToRead);
deleteColumnReader.read(deleteColumnReader.getVector(), numRowsToRead);
columnVectors[i] = deleteColumnReader.getVector();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,109 +45,76 @@ public void setRowIdMapping(int[] rowIdMapping) {

@Override
public boolean isNullAt(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.isNullAt(newRowId);
return super.isNullAt(mapRowId(rowId));
}

@Override
public boolean getBoolean(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getBoolean(newRowId);
return super.getBoolean(mapRowId(rowId));
}

@Override
public byte getByte(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getByte(newRowId);
return super.getByte(mapRowId(rowId));
}

@Override
public short getShort(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getShort(newRowId);
return super.getShort(mapRowId(rowId));
}

@Override
public int getInt(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getInt(newRowId);
return super.getInt(mapRowId(rowId));
}

@Override
public long getLong(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getLong(newRowId);
return super.getLong(mapRowId(rowId));
}

@Override
public float getFloat(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getFloat(newRowId);
return super.getFloat(mapRowId(rowId));
}

@Override
public double getDouble(int rowId) {
int newRowId = rowId;
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getDouble(newRowId);
return super.getDouble(mapRowId(rowId));
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
int newRowId = rowId;
if (isNullAt(newRowId)) {
if (isNullAt(rowId)) {
return null;
}
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getDecimal(newRowId, precision, scale);

return super.getDecimal(mapRowId(rowId), precision, scale);
}

@Override
public UTF8String getUTF8String(int rowId) {
int newRowId = rowId;
if (isNullAt(newRowId)) {
if (isNullAt(rowId)) {
return null;
}
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
}
return super.getUTF8String(newRowId);

return super.getUTF8String(mapRowId(rowId));
}

@Override
public byte[] getBinary(int rowId) {
int newRowId = rowId;
if (isNullAt(newRowId)) {
if (isNullAt(rowId)) {
return null;
}

return super.getBinary(mapRowId(rowId));
}

private int mapRowId(int rowId) {
if (rowIdMapping != null) {
newRowId = rowIdMapping[rowId];
return rowIdMapping[rowId];
}
return super.getBinary(newRowId);

return rowId;
}
}

0 comments on commit 41ec616

Please sign in to comment.