Skip to content

Commit

Permalink
fix: a possible race condition that we used table schema out of the l…
Browse files Browse the repository at this point in the history
…ock. (#1575)

Also clean up the code to reconnect after 10MB.
  • Loading branch information
yirutang authored Mar 14, 2022
1 parent 6412fb2 commit b587638
Showing 1 changed file with 13 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,41 +134,21 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
// of JSON data.
long currentRequestSize = 0;
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
synchronized (this) {
this.totalMessageSize += currentRequestSize;
this.absTotal += currentRequestSize;
// Reconnect on every 9.5MB.
if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) {
streamWriter.close();
// Create a new underlying StreamWriter aka establish a new connection.
this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build();
this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize;
this.absTotal += currentRequestSize;
// Allow first request to pass.
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt
// processing
// of JSON data.
long currentRequestSize = 0;
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
}
LOG.fine(
"Sending a total of:"
+ this.totalMessageSize
+ " "
+ currentRequestSize
+ " "
+ this.absTotal);
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
return appendResponseFuture;
Expand Down

0 comments on commit b587638

Please sign in to comment.