Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cluster_helper.py to support syncing from cluster #498

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.idea
bin/*
data
__pycache__
bin
.DS_Store

*.log
*.rdb
*.aof
34 changes: 17 additions & 17 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@ echo "try build for current platform"
go build -v -trimpath -o "$BIN_DIR/redis-shake" "./cmd/redis-shake"
echo "build success"

for g in "linux" "darwin"; do
for a in "amd64" "arm64"; do
echo "try build GOOS=$g GOARCH=$a"
export GOOS=$g
export GOARCH=$a
go build -v -trimpath -o "$BIN_DIR/redis-shake-$g-$a" "./cmd/redis-shake"
unset GOOS
unset GOARCH
echo "build success"
done
done

cp sync.toml "$BIN_DIR"
cp restore.toml "$BIN_DIR"

if [ "$1" == "dist" ]; then
echo "[ DIST ]"
cd bin
cp -r ../filters ./
tar -czvf ./redis-shake.tar.gz ./sync.toml ./restore.toml ./redis-shake-* ./filters
for g in "linux" "darwin"; do
for a in "amd64" "arm64"; do
echo "try build GOOS=$g GOARCH=$a"
export GOOS=$g
export GOARCH=$a
go build -v -trimpath -o "$BIN_DIR/redis-shake-$g-$a" "./cmd/redis-shake"
unset GOOS
unset GOARCH
echo "build success"
done
done
cp sync.toml "$BIN_DIR"
cp restore.toml "$BIN_DIR"
cp -r filters "$BIN_DIR"
cp -r scripts/cluster_helper "$BIN_DIR"
cd "$BIN_DIR"
tar -czvf ./redis-shake.tar.gz ./sync.toml ./restore.toml ./redis-shake-* ./filters ./cluster_helper
rm -rf ./filters
cd ..
fi
1 change: 1 addition & 0 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {

// start statistics
if config.Config.Advanced.MetricsPort != 0 {
statistics.Metrics.Address = config.Config.Source.Address
go func() {
log.Infof("metrics url: http://localhost:%d", config.Config.Advanced.MetricsPort)
mux := http.NewServeMux()
Expand Down
3 changes: 3 additions & 0 deletions internal/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
)

type metrics struct {
// info
Address string `json:"address"`

// entries
EntryId uint64 `json:"entry_id"`
AllowEntriesCount uint64 `json:"allow_entries_count"`
Expand Down
179 changes: 179 additions & 0 deletions scripts/cluster_helper/cluster_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#!/usr/bin/env python3
# encoding: utf-8
import datetime
import os
import shutil
import signal
import sys
import time
import requests
import toml
from pathlib import Path
import redis
from launcher import Launcher

USAGE = """
cluster_helper is a helper script to start many redis-shake for syncing from cluster.

Usage:
$ python3 cluster_helper.py ./bin/redis-shake sync.toml
"""

REDIS_SHAKE_PATH = ""
SLEEP_SECONDS = 5
stopped = False
toml_template = {}


class Shake:
def __init__(self):
self.metrics_port = 0
self.launcher = None


nodes = {}


def parse_args():
if len(sys.argv) != 3:
print(USAGE)
exit(1)
global REDIS_SHAKE_PATH, toml_template

# 1. check redis-shake path
REDIS_SHAKE_PATH = sys.argv[1]
if not Path(REDIS_SHAKE_PATH).is_file():
print(f"redis-shake path [{REDIS_SHAKE_PATH}] is not a file")
print(USAGE)
exit(1)
print(f"redis-shake path: {REDIS_SHAKE_PATH}")
REDIS_SHAKE_PATH = os.path.abspath(REDIS_SHAKE_PATH)
print(f"redis-shake abs path: {REDIS_SHAKE_PATH}")

# 2. check and load toml file
toml_template = toml.load(sys.argv[2])
print(toml_template)
if "username" not in toml_template["source"]:
toml_template["source"]["username"]=""
if "password" not in toml_template["source"]:
toml_template["source"]["password"]=""
if "tls" not in toml_template["source"]:
toml_template["source"]["tls"]=False
if "advanced" not in toml_template:
toml_template["advanced"] = {}


def stop():
for shake in nodes.values():
shake.launcher.stop()
exit(0)


def loop():
last_allow_entries_count = {address: 0 for address in nodes.keys()}
last_disallow_entries_count = {address: 0 for address in nodes.keys()}
while True:
if stopped:
stop()
print(f"================ {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ================")

metrics = []
for address, shake in nodes.items():
try:
ret = requests.get(f"http://localhost:{shake.metrics_port}").json()
metrics.append(ret)
except requests.exceptions.RequestException as e:
print(f"get metrics from [{address}] failed: {e}")

for metric in sorted(metrics, key=lambda x: x["address"]):
address = metric['address']
if metric['rdb_file_size'] == 0:
print(f"{metric['address']} shaking...")
elif metric['rdb_received_size'] < metric['rdb_file_size']:
print(f"{metric['address']} receiving rdb. "
f"percent=[{metric['rdb_received_size'] / metric['rdb_file_size'] * 100:.2f}]%, "
f"rdbFileSize=[{metric['rdb_file_size'] / 1024 / 1024 / 1024:.3f}]G, "
f"rdbReceivedSize=[{metric['rdb_received_size'] / 1024 / 1024 / 1024:.3f}]G")
elif metric['rdb_send_size'] < metric['rdb_file_size']:
print(f"{metric['address']} syncing rdb. "
f"percent=[{metric['rdb_send_size'] / metric['rdb_file_size'] * 100:.2f}]%, "
f"allowOps=[{(metric['allow_entries_count'] - last_allow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"disallowOps=[{(metric['disallow_entries_count'] - last_disallow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"entryId=[{metric['entry_id']}], "
f"InQueueEntriesCount=[{metric['in_queue_entries_count']}], "
f"unansweredBytesCount=[{metric['unanswered_bytes_count']}]bytes, "
f"rdbFileSize=[{metric['rdb_file_size'] / 1024 / 1024 / 1024:.3f}]G, "
f"rdbSendSize=[{metric['rdb_send_size'] / 1024 / 1024 / 1024:.3f}]G")
else:
print(f"{metric['address']} syncing aof. "
f"allowOps=[{(metric['allow_entries_count'] - last_allow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"disallowOps=[{(metric['disallow_entries_count'] - last_disallow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"entryId=[{metric['entry_id']}], "
f"InQueueEntriesCount=[{metric['in_queue_entries_count']}], "
f"unansweredBytesCount=[{metric['unanswered_bytes_count']}]bytes, "
f"diff=[{metric['aof_received_offset'] - metric['aof_applied_offset']}], "
f"aofReceivedOffset=[{metric['aof_received_offset']}], "
f"aofAppliedOffset=[{metric['aof_applied_offset']}]")
last_allow_entries_count[address] = metric['allow_entries_count']
last_disallow_entries_count[address] = metric['disallow_entries_count']

time.sleep(SLEEP_SECONDS)


def main():
parse_args()

# parse args
address = toml_template["source"]["address"]
host, port = address.split(":")
username = toml_template["source"]["username"]
password = toml_template["source"]["password"]
tls = toml_template["source"]["tls"]
print(f"host: {host}, port: {port}, username: {username}, password: {password}, tls: {tls}")
cluster = redis.RedisCluster(host=host, port=port, username=username, password=password, ssl=tls)
print("cluster nodes:", cluster.cluster_nodes())

# parse cluster nodes
for address, node in cluster.cluster_nodes().items():
if "master" in node["flags"]:
nodes[address] = Shake()
print(f"addresses:")
for k in nodes.keys():
print(k)

# create workdir and start redis-shake
if os.path.exists("data"):
shutil.rmtree("data")
os.mkdir("data")
os.chdir("data")
start_port = 11007
for address in nodes.keys():
workdir = address.replace(".", "_").replace(":", "_")

os.mkdir(workdir)
tmp_toml = toml_template
tmp_toml["source"]["address"] = address
start_port += 1
tmp_toml["advanced"]["metrics_port"] = start_port

with open(f"{workdir}/sync.toml", "w") as f:
toml.dump(tmp_toml, f)

# start redis-shake
launcher = Launcher(args=[REDIS_SHAKE_PATH, f"sync.toml"], work_dir=workdir)
nodes[address].launcher = launcher
nodes[address].metrics_port = start_port

signal.signal(signal.SIGINT, signal_handler)
print("start syncing...")
loop()


def signal_handler(sig, frame):
global stopped
print("\nYou pressed Ctrl+C!")
stopped = True


if __name__ == '__main__':
main()
34 changes: 34 additions & 0 deletions scripts/cluster_helper/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import signal
import subprocess
from pathlib import Path


class Launcher:
def __init__(self, args, work_dir):
self.started = True
self.args = args
self.work_dir = work_dir
if not os.path.exists(work_dir):
Path(self.work_dir).mkdir(parents=True, exist_ok=True)
self.stdout_file = open(work_dir + "/stdout", 'a')
self.stderr_file = open(work_dir + "/stderr", 'a')
self.process = subprocess.Popen(self.args, stdout=self.stdout_file,
stderr=self.stderr_file, cwd=self.work_dir,
encoding="utf-8")

def __del__(self):
assert not self.started, "Every Launcher should be closed manually! work_dir:" + self.work_dir

def get_pid(self):
return self.process.pid

def stop(self):
if self.started:
self.started = False
print(f"Waiting for process {self.process.pid} to exit...")
self.stdout_file.close()
self.stderr_file.close()
self.process.send_signal(signal.SIGINT)
self.process.wait()
print(f"process {self.process.pid} exited.")
3 changes: 3 additions & 0 deletions scripts/cluster_helper/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
redis==4.3.4
requests==2.27.1
toml==0.10.2