Skip to content

Commit

Permalink
[branch-2.1] Picks "[fix](autoinc) Fix AutoIncrementGenerator and add…
Browse files Browse the repository at this point in the history
… more logs about auto-increment column #37306" (#37366)

## Proposed changes

picks #37306
  • Loading branch information
bobhan1 authored Jul 6, 2024
1 parent ef59af8 commit 38b3870
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 29 deletions.
25 changes: 15 additions & 10 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,42 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
client->getAutoIncrementRange(result, request);
});
}
LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length
<< "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]";

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, request to non-master FE, discard all "
"auto_increment ranges in _buffers. retry_time={}",
retry_times);
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!_rpc_status.ok()) {
LOG(WARNING)
<< "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time="
<< retry_times << ", errmsg=" << _rpc_status.to_string();
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"result.length={}, retry_time={}",
length, result.length, retry_times);
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
_rpc_status = Status::RpcError<true>(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class AutoIncrementGenerator implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(AutoIncrementGenerator.class);

public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 500000;

Expand All @@ -62,6 +61,7 @@ public AutoIncrementGenerator(long dbId, long tableId, long columnId, long nextI
this.tableId = tableId;
this.columnId = columnId;
this.nextId = nextId;
this.batchEndId = -1;
}

public void setEditLog(EditLog editLog) {
Expand All @@ -70,27 +70,34 @@ public void setEditLog(EditLog editLog) {

public synchronized void applyChange(long columnId, long batchNextId) {
if (this.columnId == columnId && batchEndId < batchNextId) {
LOG.info("[auto-inc] AutoIncrementGenerator applyChange, db_id={}, table_id={}, column_id={}, "
+ "batchNextId={}", dbId, tableId, columnId, batchNextId);
nextId = batchNextId;
batchEndId = batchNextId;
}
}

public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long length, long lowerBound) throws UserException {
LOG.info("[getAutoIncrementRange request][col:{}][length:{}], [{}]", columnId, length, this.columnId);
LOG.info("[auto-inc] getAutoIncrementRange request, db_id={}, table_id={}, column_id={}, length={}", dbId,
tableId, columnId, length);
if (this.columnId != columnId) {
throw new UserException("column dosen't exist, columnId=" + columnId);
}
long startId = Math.max(nextId, lowerBound);
long startId = nextId;
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId);
long newBatchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, newBatchEndId);
editLog.logUpdateAutoIncrementId(info);
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
batchEndId = newBatchEndId;
LOG.info("[auto-inc] update batchEndId to {}, db_id={}, table_id={}, column_id={}",
newBatchEndId, dbId, tableId, columnId);
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
nextId = endId;
LOG.info("[auto-inc] getAutoIncrementRange result, db_id={}, table_id={}, column_id={}, start={}, length:{}",
dbId, tableId, columnId, startId, length);
return Pair.of(startId, length);
}

Expand All @@ -100,12 +107,16 @@ public void write(DataOutput out) throws IOException {
}

public static AutoIncrementGenerator read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
AutoIncrementGenerator res = GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
LOG.info("[auto-inc] read AutoIncrementGenerator db_id={}, table_id={}, column_id={}, nextId={}, "
+ "batchEndId={}", res.dbId, res.tableId, res.columnId, res.nextId, res.batchEndId);
return res;
}

@Override
public void gsonPostProcess() throws IOException {
nextId = batchEndId;
LOG.info("[auto-inc] AutoIncrementGenerator set nextId to batchEndId={}", batchEndId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2767,9 +2767,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos
@Override
public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get auto-increement range request: {}, backend: {}", request, clientAddr);
}
LOG.info("[auto-inc] receive getAutoIncrementRange request: {}, backend: {}", request, clientAddr);

TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
Expand All @@ -2779,7 +2777,7 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}",
LOG.error("[auto-inc] failed to getAutoIncrementRange:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
Expand All @@ -2792,19 +2790,16 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
long lowerBound = -1;
if (request.isSetLowerBound()) {
lowerBound = request.getLowerBound();
}
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, lowerBound);
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, -1);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
LOG.warn("failed to get auto-increment range of column {}: {}", request.getColumnId(), e.getMessage());
LOG.warn("[auto-inc] failed to get auto-increment range of db_id={}, table_id={}, column_id={}, errmsg={}",
request.getDbId(), request.getTableId(), request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
LOG.warn("[auto-inc] catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
}
Expand Down

0 comments on commit 38b3870

Please sign in to comment.