Skip to content

Commit

Permalink
Fix a reading table bug on spark.
Browse files Browse the repository at this point in the history
Signed-off-by: vegetableysm <yuanshumin.ysm@alibaba-inc.com>
  • Loading branch information
vegetableysm committed Sep 4, 2023
1 parent 0e02cd5 commit e561064
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,26 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
FileStatus fileStatus = fs.getFileStatus(tableFilePath);
byte[] buffer = new byte[(int) fileStatus.getLen()];
int len = in.read(buffer, 0, (int) fileStatus.getLen());
if (len == -1) {
/*
* Here must check with the condition of len <= 0, rather than len == -1.
* Because Spark will create a empty file, which will cause the len == 0.
*/
if (len <= 0) {
continue;
}
String[] objectIDs = new String(buffer, StandardCharsets.UTF_8).split("\n");
for (val objectID : objectIDs) {
ObjectID tableID = ObjectID.fromString(objectID);
Table table =
(Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID));
numBatches += table.getBatches().size();
try {
ObjectID tableID = ObjectID.fromString(objectID);
Table table =
(Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID));
numBatches += table.getBatches().size();
} catch (Exception e) {
// Skip some invalid file.
Context.println("Skip invalid file:" + tableFilePath);
Context.println("File content:" + new String(buffer, StandardCharsets.UTF_8));
break;
}
}
}
// TODO: would generating a split for each record batch be better?
Expand Down Expand Up @@ -133,18 +144,25 @@ class VineyardRecordReader implements RecordReader<NullWritable, RowWritable> {
FileStatus fileStatus = fs.getFileStatus(tableFilePath);
byte[] buffer = new byte[(int) fileStatus.getLen()];
int len = in.read(buffer, 0, (int) fileStatus.getLen());
if (len == -1) {
if (len <= 0) {
continue;
}
String[] objectIDs = new String(buffer, StandardCharsets.UTF_8).split("\n");
for (val objectID : objectIDs) {
ObjectID tableID = ObjectID.fromString(objectID);
Table table =
(Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID));
for (val batch : table.getBatches()) {
recordTotal += batch.getRowCount();
this.batches[this.recordBatchIndex++] = batch;
schema = table.getSchema().getSchema();
try {
ObjectID tableID = ObjectID.fromString(objectID);
Table table =
(Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID));
for (val batch : table.getBatches()) {
recordTotal += batch.getRowCount();
this.batches[this.recordBatchIndex++] = batch;
schema = table.getSchema().getSchema();
}
} catch (Exception e) {
// Skip some invalid file.
Context.println("Skip invalid file:" + tableFilePath);
Context.println("File content:" + new String(buffer, StandardCharsets.UTF_8));
break;
}
}
}
Expand Down

0 comments on commit e561064

Please sign in to comment.