From e561064836cca946f39e7bfab5b01c25833716f4 Mon Sep 17 00:00:00 2001 From: vegetableysm Date: Mon, 4 Sep 2023 19:51:52 +0800 Subject: [PATCH] Fix a reading table bug on spark. Signed-off-by: vegetableysm --- .../v6d/hive/ql/io/VineyardInputFormat.java | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java b/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java index b5cc8bdf..eedb21a3 100644 --- a/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java +++ b/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java @@ -79,15 +79,26 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { FileStatus fileStatus = fs.getFileStatus(tableFilePath); byte[] buffer = new byte[(int) fileStatus.getLen()]; int len = in.read(buffer, 0, (int) fileStatus.getLen()); - if (len == -1) { + /* + * Here must check with the condition of len <= 0, rather than len == -1. + * Because Spark will create a empty file, which will cause the len == 0. + */ + if (len <= 0) { continue; } String[] objectIDs = new String(buffer, StandardCharsets.UTF_8).split("\n"); for (val objectID : objectIDs) { - ObjectID tableID = ObjectID.fromString(objectID); - Table table = - (Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID)); - numBatches += table.getBatches().size(); + try { + ObjectID tableID = ObjectID.fromString(objectID); + Table table = + (Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID)); + numBatches += table.getBatches().size(); + } catch (Exception e) { + // Skip some invalid file. + Context.println("Skip invalid file:" + tableFilePath); + Context.println("File content:" + new String(buffer, StandardCharsets.UTF_8)); + break; + } } } // TODO: would generating a split for each record batch be better? @@ -133,18 +144,25 @@ class VineyardRecordReader implements RecordReader { FileStatus fileStatus = fs.getFileStatus(tableFilePath); byte[] buffer = new byte[(int) fileStatus.getLen()]; int len = in.read(buffer, 0, (int) fileStatus.getLen()); - if (len == -1) { + if (len <= 0) { continue; } String[] objectIDs = new String(buffer, StandardCharsets.UTF_8).split("\n"); for (val objectID : objectIDs) { - ObjectID tableID = ObjectID.fromString(objectID); - Table table = - (Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID)); - for (val batch : table.getBatches()) { - recordTotal += batch.getRowCount(); - this.batches[this.recordBatchIndex++] = batch; - schema = table.getSchema().getSchema(); + try { + ObjectID tableID = ObjectID.fromString(objectID); + Table table = + (Table) ObjectFactory.getFactory().resolve(client.getMetaData(tableID)); + for (val batch : table.getBatches()) { + recordTotal += batch.getRowCount(); + this.batches[this.recordBatchIndex++] = batch; + schema = table.getSchema().getSchema(); + } + } catch (Exception e) { + // Skip some invalid file. + Context.println("Skip invalid file:" + tableFilePath); + Context.println("File content:" + new String(buffer, StandardCharsets.UTF_8)); + break; } } }