Skip to content

Commit

Permalink
[PLAT-15035] Add support to sync gflags secret mount location to actu…
Browse files Browse the repository at this point in the history
…al gflag file used by services

Summary:
Added support in k8s_parent.py to regularly sync gflag template file to actual gflag file
being used by the master/tserver service. The syncing task is done by a background thread which runs
it every 20 seconds.

Test Plan:
Manually verified the background job was syncing the the tempalte file and actual gflag
file.

Reviewers: anijhawan, sanketh

Reviewed By: anijhawan

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D37469
  • Loading branch information
kv83821-yb committed Aug 29, 2024
1 parent 417092a commit e3a1a36
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
public class EditKubernetesUniverse extends KubernetesTaskBase {

static final int DEFAULT_WAIT_TIME_MS = 10000;
static final int WAIT_FOR_MASTER_ADDRESSES_CHANGE_SECS = 120;
private final OperatorStatusUpdater kubernetesStatus;
private final YbcManager ybcManager;

Expand Down Expand Up @@ -706,11 +705,6 @@ public void createMasterAddressesUpdateTask(
universe.isYbcEnabled(),
universe.getUniverseDetails().getYbcSoftwareVersion());

// Wait for gflags change to be reflected on mounted locations
createWaitForDurationSubtask(
universe, Duration.ofSeconds(WAIT_FOR_MASTER_ADDRESSES_CHANGE_SECS))
.setSubTaskGroupType(SubTaskGroupType.ConfigureUniverse);

Set<NodeDetails> mastersToModify =
Stream.concat(
mastersToAdd.stream(),
Expand Down Expand Up @@ -750,11 +744,6 @@ public void createMasterAddressesUpdateTask(
universe.isYbcEnabled(),
universe.getUniverseDetails().getYbcSoftwareVersion());

// Wait for gflags change to be reflected on mounted locations
createWaitForDurationSubtask(
universe, Duration.ofSeconds(WAIT_FOR_MASTER_ADDRESSES_CHANGE_SECS))
.setSubTaskGroupType(SubTaskGroupType.ConfigureUniverse);

// Set flag in memory for tserver
createSetFlagInMemoryTasks(
tserversToModify,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.yugabyte.yw.models.helpers.DeviceInfo;
import com.yugabyte.yw.models.helpers.NodeDetails;
import com.yugabyte.yw.models.helpers.PlacementInfo;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -726,6 +727,11 @@ public void upgradePodsNonRestart(
enableYbc));
});
getRunnableTask().addSubTaskGroup(helmUpgrade);
// Wait for gflags change to be reflected on mounted locations
createWaitForDurationSubtask(
taskParams().getUniverseUUID(),
Duration.ofSeconds(KubernetesUtil.WAIT_FOR_GFLAG_SYNC_SECS))
.setSubTaskGroupType(SubTaskGroupType.ConfigureUniverse);
}

private Map<UUID, ServerType> getServersToUpdateAzMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5301,9 +5301,13 @@ protected SubTaskGroup createWaitForClockSyncTasks(
}

protected SubTaskGroup createWaitForDurationSubtask(Universe universe, Duration waitTime) {
return createWaitForDurationSubtask(universe.getUniverseUUID(), waitTime);
}

protected SubTaskGroup createWaitForDurationSubtask(UUID universeUUID, Duration waitTime) {
SubTaskGroup subTaskGroup = createSubTaskGroup("WaitForDuration");
WaitForDuration.Params params = new WaitForDuration.Params();
params.setUniverseUUID(universe.getUniverseUUID());
params.setUniverseUUID(universeUUID);
params.waitTime = waitTime;

WaitForDuration task = createTask(WaitForDuration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class KubernetesUtil {

public static String MIN_VERSION_NON_RESTART_GFLAGS_UPGRADE_SUPPORT_PREVIEW = "2.23.0.0-b539";
public static String MIN_VERSION_NON_RESTART_GFLAGS_UPGRADE_SUPPORT_STABLE = "2024.2.0.0-b999";
// Kubelet secret sync time + k8s_parent template sync time.
public static final int WAIT_FOR_GFLAG_SYNC_SECS = 90;

public static boolean isNonRestartGflagsUpgradeSupported(String universeSoftwareVersion) {
return Util.compareYBVersions(
Expand Down
62 changes: 62 additions & 0 deletions scripts/k8s_parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import glob
import traceback
import threading
import re
import random
import string
# Stop event to signal the background thread to stop.:
bg_thread_stop_event = threading.Event()
child_process = None
Expand Down Expand Up @@ -52,6 +55,10 @@
"PG_UNIX_SOCKET_LOCK_FILE_PATH_GLOB",
"/tmp/.yb.*/.s.PGSQL.*.lock") # Default pattern
MASTER_BINARY = "/home/yugabyte/bin/yb-master"
GFLAGS_MASTER_TEMPLATE_FILE = "/opt/master/conf/server.conf.template"
GFLAGS_MASTER_GENERATED_FILE = "/tmp/yugabyte/master/conf/server.conf"
GFLAGS_TSERVER_TEMPLATE_FILE = "/opt/tserver/conf/server.conf.template"
GFLAGS_TSERVER_GENERATED_FILE = "/tmp/yugabyte/tserver/conf/server.conf"


def signal_handler(signum, frame):
Expand Down Expand Up @@ -115,6 +122,55 @@ def delete_pg_lock_files():
logging.info(f"Deleted: {file_path}")


def replace_var_env(match):
var = match.group(1)
if var in os.environ:
return os.environ[var]

return ""


def random_string(length):
return ''.join(random.choice(string.ascii_lowercase) for i in range(length))


def background_sub_env_gflags(sleep_time_seconds, command):
logging.info("Starting gflags file template substitution")
infile = GFLAGS_MASTER_TEMPLATE_FILE if is_master(command) else GFLAGS_TSERVER_TEMPLATE_FILE
outfile = GFLAGS_MASTER_GENERATED_FILE if is_master(command) else GFLAGS_TSERVER_GENERATED_FILE
tmp_outfile = outfile + '.' + random_string(8)
if not os.path.exists(infile):
logging.info("Template gflags file does not exist, ignoring substitution")
return
try:
os.makedirs(os.path.dirname(outfile), exist_ok=True)
os.makedirs(os.path.dirname(tmp_outfile), exist_ok=True)
except Exception as e:
logging.error("Error while creating gflag file: {}, traceback: {}"
.format(e, traceback.format_exc()))
global bg_thread_stop_event
while not bg_thread_stop_event.is_set():
logging.info("Substituting env to gflag template")
try:
with open(infile, 'r') as instream:
content = instream.read()
content = re.sub("\\${(.*?)}", replace_var_env, content)
# Safe create the gflags file by creating tmp file and moving it
with open(tmp_outfile, 'w') as outstream:
outstream.write(content)
os.replace(tmp_outfile, outfile)
try:
os.remove(tmp_outfile)
except FileNotFoundError:
pass
except Exception as e:
logging.error(
"Error while substituting env to gflag template: {}, traceback: {}"
.format(e, traceback.format_exc()))
logging.info("Sleeping for {} seconds".format(sleep_time_seconds))
time.sleep(sleep_time_seconds)


def background_copy_cores_wrapper(dst, sleep_time_seconds):
logging.info("Starting Collection")
global bg_thread_stop_event
Expand Down Expand Up @@ -214,6 +270,7 @@ def is_master(command):
level=logging.INFO,
)
core_collection_interval = 30 # Seconds
subs_env_gflags_interval = 20 # Seconds
command = sys.argv[1:]
if len(command) < 1:
logging.critical("No command to run")
Expand All @@ -233,6 +290,11 @@ def is_master(command):
target=background_copy_cores_wrapper, args=(cores_dir, core_collection_interval))
copy_cores_thread.daemon = True
copy_cores_thread.start()
# Substitute environment varibles to gflag template file once in 20s
subs_env_gflags_thread = threading.Thread(
target=background_sub_env_gflags, args=(subs_env_gflags_interval, command))
subs_env_gflags_thread.daemon = True
subs_env_gflags_thread.start()
# Delete PG Unix Socket Lock files that can be left after a previous
# ungraceful exit of the container.
if not is_master(command):
Expand Down

0 comments on commit e3a1a36

Please sign in to comment.