Skip to content

Commit

Permalink
[fix](autoinc) Fix AutoIncrementGenerator and add more logs about aut…
Browse files Browse the repository at this point in the history
…o-increment column (#37306)

## Proposed changes

1. when run out batchEndId in memory, we (1) write edit log to update
batchEndId, (2) update batchEndId in memory, (3) update nextId in memory
2. add more logs about auto-increment column.
3. remove useless `lowerBound`

branch-2.1-pick: #37366
  • Loading branch information
bobhan1 authored Jul 6, 2024
1 parent 65695d6 commit 1220324
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
25 changes: 15 additions & 10 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,42 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
client->getAutoIncrementRange(result, request);
});
}
LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length
<< "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]";

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, request to non-master FE, discard all "
"auto_increment ranges in _buffers. retry_time={}",
retry_times);
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!_rpc_status.ok()) {
LOG(WARNING)
<< "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time="
<< retry_times << ", errmsg=" << _rpc_status.to_string();
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"result.length={}, retry_time={}",
length, result.length, retry_times);
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
_rpc_status = Status::RpcError<true>(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class AutoIncrementGenerator implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(AutoIncrementGenerator.class);

public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 500000;

Expand All @@ -62,6 +61,7 @@ public AutoIncrementGenerator(long dbId, long tableId, long columnId, long nextI
this.tableId = tableId;
this.columnId = columnId;
this.nextId = nextId;
this.batchEndId = -1;
}

public void setEditLog(EditLog editLog) {
Expand All @@ -70,27 +70,34 @@ public void setEditLog(EditLog editLog) {

public synchronized void applyChange(long columnId, long batchNextId) {
if (this.columnId == columnId && batchEndId < batchNextId) {
LOG.info("[auto-inc] AutoIncrementGenerator applyChange, db_id={}, table_id={}, column_id={}, "
+ "batchNextId={}", dbId, tableId, columnId, batchNextId);
nextId = batchNextId;
batchEndId = batchNextId;
}
}

public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long length, long lowerBound) throws UserException {
LOG.info("[getAutoIncrementRange request][col:{}][length:{}], [{}]", columnId, length, this.columnId);
LOG.info("[auto-inc] getAutoIncrementRange request, db_id={}, table_id={}, column_id={}, length={}", dbId,
tableId, columnId, length);
if (this.columnId != columnId) {
throw new UserException("column dosen't exist, columnId=" + columnId);
}
long startId = Math.max(nextId, lowerBound);
long startId = nextId;
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId);
long newBatchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, newBatchEndId);
editLog.logUpdateAutoIncrementId(info);
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
batchEndId = newBatchEndId;
LOG.info("[auto-inc] update batchEndId to {}, db_id={}, table_id={}, column_id={}",
newBatchEndId, dbId, tableId, columnId);
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
nextId = endId;
LOG.info("[auto-inc] getAutoIncrementRange result, db_id={}, table_id={}, column_id={}, start={}, length:{}",
dbId, tableId, columnId, startId, length);
return Pair.of(startId, length);
}

Expand All @@ -100,12 +107,16 @@ public void write(DataOutput out) throws IOException {
}

public static AutoIncrementGenerator read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
AutoIncrementGenerator res = GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
LOG.info("[auto-inc] read AutoIncrementGenerator db_id={}, table_id={}, column_id={}, nextId={}, "
+ "batchEndId={}", res.dbId, res.tableId, res.columnId, res.nextId, res.batchEndId);
return res;
}

@Override
public void gsonPostProcess() throws IOException {
nextId = batchEndId;
LOG.info("[auto-inc] AutoIncrementGenerator set nextId to batchEndId={}", batchEndId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2619,9 +2619,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos
@Override
public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get auto-increement range request: {}, backend: {}", request, clientAddr);
}
LOG.info("[auto-inc] receive getAutoIncrementRange request: {}, backend: {}", request, clientAddr);

TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
Expand All @@ -2631,7 +2629,7 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}",
LOG.error("[auto-inc] failed to getAutoIncrementRange:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
Expand All @@ -2644,19 +2642,16 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
long lowerBound = -1;
if (request.isSetLowerBound()) {
lowerBound = request.getLowerBound();
}
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, lowerBound);
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, -1);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
LOG.warn("failed to get auto-increment range of column {}: {}", request.getColumnId(), e.getMessage());
LOG.warn("[auto-inc] failed to get auto-increment range of db_id={}, table_id={}, column_id={}, errmsg={}",
request.getDbId(), request.getTableId(), request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
LOG.warn("[auto-inc] catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
}
Expand Down

0 comments on commit 1220324

Please sign in to comment.