Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Replace cached time with system clock in MasterService debug logs #7902

Merged
merged 7 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Enforce 512 byte document ID limit in bulk updates ([#8039](https://github.com/opensearch-project/OpenSearch/pull/8039))
- With only GlobalAggregation in request causes unnecessary wrapping with MultiCollector ([#8125](https://github.com/opensearch-project/OpenSearch/pull/8125))
- Fix mapping char_filter when mapping a hashtag ([#7591](https://github.com/opensearch-project/OpenSearch/pull/7591))
- Precise system clock time in MasterService debug logs ([#7902](https://github.com/opensearch-project/OpenSearch/pull/7902))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,14 @@ private void runTasks(TaskInputs taskInputs) {
return;
}

final long computationStartTime = threadPool.relativeTimeInMillis();
final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);

if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = threadPool.relativeTimeInMillis();
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
Expand All @@ -309,7 +309,7 @@ private void runTasks(TaskInputs taskInputs) {
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.relativeTimeInMillis();
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
Expand All @@ -335,8 +335,8 @@ private void runTasks(TaskInputs taskInputs) {
}
}

private TimeValue getTimeSince(long startTimeMillis) {
return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis));
private TimeValue getTimeSince(long startTimeNanos) {
return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos));
}

protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
Expand All @@ -358,7 +358,7 @@ protected boolean blockingAllowed() {
}

void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) {
final long notificationStartTime = threadPool.relativeTimeInMillis();
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state());

try {
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,19 @@ public long relativeTimeInNanos() {
return cachedTimeThread.relativeTimeInNanos();
}

/**
* Returns a value of nanoseconds that may be used for relative time calculations
* that require the highest precision possible. Performance critical code must use
* either {@link #relativeTimeInNanos()} or {@link #relativeTimeInMillis()} which
* give better performance at the cost of lower precision.
*
* This method should only be used for calculating time deltas. For an epoch based
* timestamp, see {@link #absoluteTimeInMillis()}.
*/
public long preciseRelativeTimeInNanos() {
return System.nanoTime();
}

/**
* Returns the value of milliseconds since UNIX epoch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@
public class MasterServiceTests extends OpenSearchTestCase {

private static ThreadPool threadPool;
private static long relativeTimeInMillis;
private static long timeDiffInMillis;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(MasterServiceTests.class.getName()) {
@Override
public long relativeTimeInMillis() {
return relativeTimeInMillis;
public long preciseRelativeTimeInNanos() {
return timeDiffInMillis * TimeValue.NSEC_PER_MSEC;
}
};
}
Expand All @@ -119,7 +119,7 @@ public static void stopThreadPool() {

@Before
public void randomizeCurrentTime() {
relativeTimeInMillis = randomLongBetween(0L, 1L << 62);
timeDiffInMillis = randomLongBetween(0L, 1L << 50);
}

private ClusterManagerService createClusterManagerService(boolean makeClusterManager) {
Expand Down Expand Up @@ -426,7 +426,7 @@ public void testClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(1).millis();
return currentState;
}

Expand All @@ -441,7 +441,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

Expand All @@ -456,13 +456,13 @@ public void onFailure(String source, Exception e) {}
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis();
timeDiffInMillis += TimeValue.timeValueSeconds(4).millis();
}

@Override
Expand Down Expand Up @@ -1080,12 +1080,12 @@ public void testLongClusterStateUpdateLogging() throws Exception {
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
if (event.source().contains("test5")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
}
if (event.source().contains("test6")) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new OpenSearchException("simulated error during slow publication which should trigger logging");
Expand All @@ -1101,7 +1101,7 @@ public void testLongClusterStateUpdateLogging() throws Exception {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += randomLongBetween(
timeDiffInMillis += randomLongBetween(
0L,
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
);
Expand All @@ -1124,7 +1124,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
Expand All @@ -1143,7 +1143,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return ClusterState.builder(currentState).incrementVersion().build();
Expand All @@ -1162,7 +1162,7 @@ public void onFailure(String source, Exception e) {
clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(
Settings.EMPTY
).millis() + randomLongBetween(1, 1000000);
return currentState;
Expand Down