Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Nov 19, 2023
1 parent c8e99fb commit 30c59fb
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.TypeInfoFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
Expand All @@ -40,6 +38,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -62,6 +61,7 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
private final RootAllocator arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
private final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
private final String region;
private final String project;
Expand All @@ -72,7 +72,6 @@ public class MaxComputeJniScanner extends JniScanner {
private MaxComputeColumnValue columnValue;
private long remainBatchRows = 0;
private long totalRows = 0;
private RootAllocator arrowAllocator;
private ArrowRecordReader curReader;
private List<Column> readColumns;
private Map<String, Integer> readColumnsToId;
Expand Down Expand Up @@ -142,11 +141,16 @@ protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields
readColumnsToId.put(fields[i], i);
}
}
}

@VisibleForTesting
protected TableSchema getSchema() {
return curTableScan.getSchema();
// reorder columns
List<Column> columnList = curTableScan.getSchema().getColumns();
columnList.addAll(curTableScan.getSchema().getPartitionColumns());
Map<String, Integer> columnRank = new HashMap<>();
for (int i = 0; i < columnList.size(); i++) {
columnRank.put(columnList.get(i).getName(), i);
}
// Downloading columns data from Max compute only supports the order of table metadata.
// We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode.
readColumns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
}

@Override
Expand All @@ -164,8 +168,6 @@ public void open() throws IOException {
long start = startOffset == -1L ? 0 : startOffset;
long recordCount = session.getRecordCount();
totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount;

arrowAllocator = new RootAllocator(Long.MAX_VALUE);
partitionColumns = session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -219,11 +221,9 @@ private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
case DOUBLE:
odpsType = TypeInfoFactory.DOUBLE;
break;
case DATETIME:
case DATETIMEV2:
odpsType = TypeInfoFactory.DATETIME;
break;
case DATE:
case DATEV2:
odpsType = TypeInfoFactory.DATE;
break;
Expand Down Expand Up @@ -254,7 +254,7 @@ public void close() throws IOException {
startOffset = -1;
splitSize = -1;
if (curReader != null) {
arrowAllocator.close();
arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
curReader.close();
curReader = null;
}
Expand Down Expand Up @@ -296,7 +296,9 @@ private int readVectors(int expectedRows) throws IOException {
Integer readColumnId = readColumnsToId.get(partitionColumn);
if (readColumnId != null && partitionValue != null) {
MaxComputePartitionValue value = new MaxComputePartitionValue(partitionValue);
appendData(readColumnId, value);
for (int i = 0; i < batchRows; i++) {
appendData(readColumnId, value);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public class MaxComputeJniScannerTest {
put("partition_spec", "p1=2022-06");
put("required_fields", "boolean,tinyint,smallint,int,bigint,float,double,"
+ "date,timestamp,char,varchar,string,decimalv2,decimal64,"
+ "decimal18,timestamp4,datev1,datev2,datetimev1,datetimev2");
+ "decimal18,timestamp4,date");
put("columns_types", "boolean#tinyint#smallint#int#bigint#float#double#"
+ "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)#"
+ "decimal(18,5)#timestamp(4)#datev1#datev2#datetimev1#datetimev2(4)");
+ "decimal(18,5)#timestamp(4)#date");
}
};
private MaxComputeJniScanner scanner = new MaxComputeJniScanner(32, paramsMc);
Expand Down

0 comments on commit 30c59fb

Please sign in to comment.