Skip to content

Commit

Permalink
Refine type check logic
Browse files Browse the repository at this point in the history
  • Loading branch information
banmoy committed Sep 27, 2022
1 parent 1dcfa25 commit 43825f2
Showing 1 changed file with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,30 @@ private void setValueToFlinkRows(int rowIndex, int column, Object obj) {
private void genFlinkRows() {
for (int i = 0; i < fieldVectors.size(); i++) {
FieldVector fieldVector = fieldVectors.get(i);
StarRocksToFlinkTrans translators = null;
Column column = starRocksSchema.getColumn(i);
boolean nullable = true;
if (column != null) {
ColumnRichInfo richInfo = columnRichInfos.get(selectedColumns[i].getColumnIndexInFlinkTable());
nullable = richInfo.getDataType().getLogicalType().isNullable();
LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot();
String srType = DataUtil.clearBracket(column.getType());
if (Const.DataTypeRelationMap.containsKey(flinkTypeRoot)
&& Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) {
translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType);
}
if (column == null) {
throw new RuntimeException("Can't find StarRocks' column at index " + i);
}

// TODO make sure what's going on here
if (translators == null) {
ColumnRichInfo richInfo = columnRichInfos.get(selectedColumns[i].getColumnIndexInFlinkTable());
boolean nullable = richInfo.getDataType().getLogicalType().isNullable();
LogicalTypeRoot flinkTypeRoot = richInfo.getDataType().getLogicalType().getTypeRoot();
String srType = DataUtil.clearBracket(column.getType());

if (!Const.DataTypeRelationMap.containsKey(flinkTypeRoot)) {
throw new RuntimeException(
"Flink type not supported for column " + column.getName() +
", Flink type is -> [" + flinkTypeRoot.toString() + "]");
}
if (!Const.DataTypeRelationMap.get(flinkTypeRoot).containsKey(srType)) {
throw new RuntimeException(
"Flink type not support when convert data from starrocks to flink, " +
"type is -> [" + fieldVector.getMinorType().toString() + "]"
"StarRocks type can not convert to Flink type for column " + column.getName() +
", StarRocks type is -> [" + srType + "], " +
"Flink type is -> [" + flinkTypeRoot.toString() + "]"
);
}

StarRocksToFlinkTrans translators = Const.DataTypeRelationMap.get(flinkTypeRoot).get(srType);
Object[] result = translators.transToFlinkData(fieldVector.getMinorType(), fieldVector, rowCountOfBatch, i, nullable);
for (int ri = 0; ri < result.length; ri ++) {
setValueToFlinkRows(ri, i, result[ri]);
Expand Down

0 comments on commit 43825f2

Please sign in to comment.