Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](autoinc) Fix AutoIncrementGenerator and add more logs about auto-increment column #37306

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,42 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
client->getAutoIncrementRange(result, request);
});
}
LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length=" << result.length
<< "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << " ms]";

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, request to non-master FE, discard all "
"auto_increment ranges in _buffers. retry_time={}",
retry_times);
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

if (!_rpc_status.ok()) {
LOG(WARNING)
<< "Failed to fetch auto-incremnt range, encounter rpc failure. retry_time="
<< retry_times << ", errmsg=" << _rpc_status.to_string();
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"result.length={}, retry_time={}",
length, result.length, retry_times);
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
_rpc_status = Status::RpcError<true>(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class AutoIncrementGenerator implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(AutoIncrementGenerator.class);

public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 500000;

Expand All @@ -62,6 +61,7 @@ public AutoIncrementGenerator(long dbId, long tableId, long columnId, long nextI
this.tableId = tableId;
this.columnId = columnId;
this.nextId = nextId;
this.batchEndId = -1;
}

public void setEditLog(EditLog editLog) {
Expand All @@ -70,27 +70,34 @@ public void setEditLog(EditLog editLog) {

public synchronized void applyChange(long columnId, long batchNextId) {
if (this.columnId == columnId && batchEndId < batchNextId) {
LOG.info("[auto-inc] AutoIncrementGenerator applyChange, db_id={}, table_id={}, column_id={}, "
+ "batchNextId={}", dbId, tableId, columnId, batchNextId);
nextId = batchNextId;
batchEndId = batchNextId;
}
}

public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long length, long lowerBound) throws UserException {
LOG.info("[getAutoIncrementRange request][col:{}][length:{}], [{}]", columnId, length, this.columnId);
LOG.info("[auto-inc] getAutoIncrementRange request, db_id={}, table_id={}, column_id={}, length={}", dbId,
tableId, columnId, length);
if (this.columnId != columnId) {
throw new UserException("column dosen't exist, columnId=" + columnId);
}
long startId = Math.max(nextId, lowerBound);
long startId = nextId;
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId);
long newBatchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, newBatchEndId);
editLog.logUpdateAutoIncrementId(info);
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
batchEndId = newBatchEndId;
LOG.info("[auto-inc] update batchEndId to {}, db_id={}, table_id={}, column_id={}",
newBatchEndId, dbId, tableId, columnId);
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
nextId = endId;
LOG.info("[auto-inc] getAutoIncrementRange result, db_id={}, table_id={}, column_id={}, start={}, length:{}",
dbId, tableId, columnId, startId, length);
return Pair.of(startId, length);
}

Expand All @@ -100,12 +107,16 @@ public void write(DataOutput out) throws IOException {
}

public static AutoIncrementGenerator read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
AutoIncrementGenerator res = GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
LOG.info("[auto-inc] read AutoIncrementGenerator db_id={}, table_id={}, column_id={}, nextId={}, "
+ "batchEndId={}", res.dbId, res.tableId, res.columnId, res.nextId, res.batchEndId);
return res;
}

@Override
public void gsonPostProcess() throws IOException {
nextId = batchEndId;
LOG.info("[auto-inc] AutoIncrementGenerator set nextId to batchEndId={}", batchEndId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2619,9 +2619,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos
@Override
public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get auto-increement range request: {}, backend: {}", request, clientAddr);
}
LOG.info("[auto-inc] receive getAutoIncrementRange request: {}, backend: {}", request, clientAddr);

TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
Expand All @@ -2631,7 +2629,7 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to getAutoIncrementRange:{}, request:{}, backend:{}",
LOG.error("[auto-inc] failed to getAutoIncrementRange:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
Expand All @@ -2644,19 +2642,16 @@ public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeReques
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
long lowerBound = -1;
if (request.isSetLowerBound()) {
lowerBound = request.getLowerBound();
}
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, lowerBound);
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, -1);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
LOG.warn("failed to get auto-increment range of column {}: {}", request.getColumnId(), e.getMessage());
LOG.warn("[auto-inc] failed to get auto-increment range of db_id={}, table_id={}, column_id={}, errmsg={}",
request.getDbId(), request.getTableId(), request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
LOG.warn("[auto-inc] catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
}
Expand Down
Loading