Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
devabhishekpal committed Jan 13, 2025
1 parent f10cfe7 commit ae2ea15
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public void run() {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public void run() {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,15 @@ protected ReconScmTask(
* Start underlying start thread.
*/
public synchronized void start() {
try {
if (!isRunning()) {
LOG.info("Starting {} Thread.", getTaskName());
running = true;
taskThread = new Thread(this::run, "Recon-" + getTaskName());
taskThread.setName(getTaskName());
taskThread.setDaemon(true);
taskThread.start();
} else {
LOG.info("{} Thread is already running.", getTaskName());
}
} catch (Exception e) {
LOG.error("Failed to start {} thread due to exception", getTaskName(), e);
if (!isRunning()) {
LOG.info("Starting {} Thread.", getTaskName());
running = true;
taskThread = new Thread(this::run, "Recon-" + getTaskName());
taskThread.setName(getTaskName());
taskThread.setDaemon(true);
taskThread.start();
} else {
LOG.info("{} Thread is already running.", getTaskName());
}
}

Expand Down Expand Up @@ -97,16 +93,10 @@ public ReconTaskStatusUpdater getTaskStatusUpdater() {

protected abstract void run();

protected void initializeAndRunTask() {
try {
taskStatusUpdater.recordRunStart();
runTask();
} catch (Exception e) {
LOG.error("{} encountered exception. ", getTaskName(), e);
taskStatusUpdater.setLastTaskRunStatus(-1);
} finally {
taskStatusUpdater.recordRunCompletion();
}
protected void initializeAndRunTask() throws Exception{
taskStatusUpdater.recordRunStart();
runTask();
taskStatusUpdater.recordRunCompletion();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,24 +564,17 @@ public boolean syncDataFromOM() {

reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.recordRunCompletion();
// Pass on DB update events to tasks that are listening.
reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
omdbUpdatesHandler.getEvents()), omMetadataManager);
} catch (IOException | RocksDBException e) {
LOG.error("Failed to get and apply delta updates with exception", e);
reconTaskUpdater.setLastTaskRunStatus(-1);
fullSnapshot = true;
} catch (InterruptedException intEx) {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumDeltaRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
LOG.warn("Unable to get and apply delta updates from OM.",
e.getMessage());
fullSnapshot = true;
} finally {
// Update timestamp of successful delta updates query.
reconTaskUpdater.recordRunCompletion();
}
}

Expand All @@ -598,6 +591,7 @@ public boolean syncDataFromOM() {
if (success) {
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
reconTaskUpdater.setLastTaskRunStatus(0);
reconTaskUpdater.recordRunCompletion();

// Reinitialize tasks that are listening.
LOG.info("Calling reprocess on Recon tasks.");
Expand All @@ -609,21 +603,19 @@ public boolean syncDataFromOM() {
} else {
metrics.incrNumSnapshotRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
}
} catch (InterruptedException intEx) {
Thread.currentThread().interrupt();
} catch (Exception e) {
metrics.incrNumSnapshotRequestsFailed();
reconTaskUpdater.setLastTaskRunStatus(-1);
reconTaskUpdater.recordRunCompletion();
LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
// Update health status in ReconContext
reconContext.updateHealthStatus(new AtomicBoolean(false));
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
} finally {
reconTaskUpdater.recordRunCompletion();
}
}
printOMDBMetaInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ protected synchronized void run() {
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
taskStatusUpdater.setLastTaskRunStatus(-1);
taskStatusUpdater.recordRunCompletion();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ public interface ReconTaskController {
/**
* Pass on a set of OM DB update events to the registered tasks.
* @param events set of events
* @throws InterruptedException InterruptedException
*/
void consumeOMEvents(OMUpdateEventBatch events,
OMMetadataManager omMetadataManager)
throws InterruptedException;
OMMetadataManager omMetadataManager);

/**
* Pass on the handle to a new OM DB instance to the registered tasks.
* @param omMetadataManager OM Metadata Manager instance
*/
void reInitializeTasks(ReconOMMetadataManager omMetadataManager)
throws InterruptedException;
void reInitializeTasks(ReconOMMetadataManager omMetadataManager);

/**
* Get set of registered tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public synchronized void reInitializeTasks(ReconOMMetadataManager omMetadataMana
try {
return task.call();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
// Wrap the exception with the task name
throw new TaskExecutionException(task.getTaskName(), e);
}
Expand Down

0 comments on commit ae2ea15

Please sign in to comment.