Skip to content

Commit

Permalink
fix(interactive): fix backup conflict checking, refine log recycling (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed Jun 12, 2024
1 parent 591b092 commit dbbacf2
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ data:
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}
log.recycle.offset.reserve={{ .Values.logRecycleOffsetReserve }}

## Extra Config
{{- if .Values.extraConfig }}
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
fileMetaStorePath: "/etc/groot/my.meta"
logRecycleEnable: true
logRecycleOffsetReserve: 86400

## Store Config
storeDataPath: "/var/lib/graphscope-store"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class CoordinatorConfig {
public static final Config<Long> LOG_RECYCLE_INTERVAL_SECOND =
Config.longConfig("log.recycle.interval.second", 3600L);

public static final Config<Long> LOG_RECYCLE_OFFSET_RESERVE =
Config.longConfig("log.recycle.offset.reserve", 86400);

public static final Config<String> FILE_META_STORE_PATH =
Config.stringConfig("file.meta.store.path", "./meta");
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,16 @@ public static boolean isLockAvailable(Configs configs) {
}
return true;
}

public static boolean isMetaFreshEnough(Configs configs, long delta) {
String dataRoot = StoreConfig.STORE_DATA_PATH.get(configs);
File metaDir = Paths.get(dataRoot, "meta").toAbsolutePath().toFile();
if (metaDir.exists()) {
long lastModified = metaDir.lastModified();
long ts = System.currentTimeMillis();
return ts - lastModified < delta;
}
// not exists also means fresh enough
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ public class LogRecycler {
private ScheduledExecutorService scheduler;
private final boolean recycleEnable;
private final long recycleIntervalSeconds;
private final long recycleOffsetReserve;

public LogRecycler(Configs configs, LogService logService, SnapshotManager snapshotManager) {
this.logService = logService;
this.snapshotManager = snapshotManager;
this.recycleEnable = CoordinatorConfig.LOG_RECYCLE_ENABLE.get(configs);
this.recycleIntervalSeconds = CoordinatorConfig.LOG_RECYCLE_INTERVAL_SECOND.get(configs);
this.recycleOffsetReserve = CoordinatorConfig.LOG_RECYCLE_OFFSET_RESERVE.get(configs);
}

public void start() {
Expand Down Expand Up @@ -84,7 +86,7 @@ private void doRecycle() {
List<Long> queueOffsets = this.snapshotManager.getQueueOffsets();
for (int i = 0; i < queueOffsets.size(); i++) {
long offset = queueOffsets.get(i);
offset = Math.max(offset - 3600, 0); // Leave some spaces
offset = Math.max(offset - recycleOffsetReserve, 0); // Leave some spaces
try {
logService.deleteBeforeOffset(i, offset);
logger.info("recycled queue [{}] offset [{}]", i, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public void init() {
AdminClient admin = getAdmin();
NewTopic newTopic = new NewTopic(this.topic, this.storeCount, this.replicationFactor);
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "-1");
configs.put("retention.bytes", "-1");
// Respect the global settings is enough for us
// configs.put("retention.ms", "-1");
// configs.put("retention.bytes", "-1");
configs.put("max.message.bytes", String.valueOf(this.maxMessageMb * 1024 * 1024));
newTopic.configs(configs);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,17 @@ public static void main(String[] args) throws Exception {
latch.getLeader(),
latch.getState());
latch.await();
// Sleep 5s before check the lock to prevent the leader has not
// Sleep 10s before check the lock to prevent the leader has not
// released the resource yet.
Thread.sleep(5000);
if (Utils.isLockAvailable(conf)) {
logger.info("LOCK is available, node starting");
Thread.sleep(10000);
if (Utils.isLockAvailable(conf) && !Utils.isMetaFreshEnough(conf, 9000)) {
logger.info("LOCK is available and meta stop updating, node starting");
break;
}
latch.close();
logger.info("LOCK is unavailable, the leader may still exists");
logger.info(
"LOCK is unavailable or the meta is still updating, the leader may"
+ " still exists");
// The leader has lost connection but still alive,
// give it another chance
Thread.sleep(60000);
Expand All @@ -95,7 +97,7 @@ public static void main(String[] args) throws Exception {
logger.error("Exception while leader election", e);
throw e;
}
// curator.close();
// curator.close();
}
}
NodeLauncher launcher = new NodeLauncher(node);
Expand Down

0 comments on commit dbbacf2

Please sign in to comment.