-
Notifications
You must be signed in to change notification settings - Fork 0
/
syncer_script.py
154 lines (130 loc) · 5.83 KB
/
syncer_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Script to sync tables between databases. Reads all the rows from
# the table in the source database and all the rows from the table
# in the destination database and attempts to optimistically
# insert/update only the differences in the destination table,
# as well as deleting any extra rows from the destination table.
# If this optimistic approach fails (e.g. rows can't be updated
# because of constraint violations:
# https://nocd.hashnode.dev/updates-order-and-the-binlog-1 ), then
# the script will delete all the rows from the destination table
# and then attempt to insert all the rows from the source table.
#
# Note: Because this script pulls all the rows in a table on the source
# and destination database, it should be used with care and not
# used on very large tables.
#
# Note: This script currently assumes that the tables have the same
# schema (e.g. columns, primary keys, constraints, etc.). It
# does not currently
#
# Sample invocation:
# python3 syncer_script.py \
# --src mysql://root:$MYSQL_SRC_PWD@127.0.0.1:3306/myDatabase \
# --dst mysql://root:$MYSQL_DST_PWD@127.0.0.1:3306/myDatabase
#
# TODO:
# - add table option: --table src_db.src_tbl:dst_db.dst_tbl:pk_name
# - add no-dry-run option: --no-dry-run
# - add verbose option: --verbose
import logging
import re
import pymysql
import pymysql.cursors
import typer
app = typer.Typer()
logging.basicConfig(
format="%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
log = logging.getLogger(__name__)
def sync(dst_tbl, src_tbl, pk, dst_curs, src_curs):
src_curs.execute(f"""SELECT * FROM {src_tbl}""")
src_result = src_curs.fetchall()
# Update/Insert row from src-db into dst
dst_curs.execute(f"""SELECT * FROM {dst_tbl} FOR UPDATE""")
dst_result = dst_curs.fetchall()
diff = [row for row in src_result if row not in dst_result]
for row in diff:
log.info(f"Row id: {row[pk]} missing from dst")
insert_query = f"INSERT INTO {dst_tbl} SET "
insert_args = []
update_query = "ON DUPLICATE KEY UPDATE "
update_args = []
for key, value in row.items():
insert_query += f""" {key} = %s ,"""
update_query += f""" {key} = %s ,"""
insert_args.append(value)
update_args.append(value)
insert_query = insert_query.strip(",")
update_query = update_query.strip(",")
query = insert_query + update_query
args = insert_args + update_args
dst_curs.execute(query, tuple(args))
# Delete row from dst that is not in src-db
dst_curs.execute(f"""SELECT * FROM {dst_tbl}""")
dst_result = dst_curs.fetchall()
diff = [row for row in dst_result if row not in src_result]
for row in diff:
log.info(f"Delete row {row['id']} from dst since missing in src")
dst_curs.execute(
f"""DELETE FROM {dst_tbl} WHERE id = %s LIMIT 1""",
(
row[pk],
),
)
def parse_connection_string(url: str):
mysql_connection_url_regex = 'mysql://(.*?):(.*?)@(.*?):(.*?)/(.*)'
if re.search(mysql_connection_url_regex, url):
user, password, host, port, database = re.match(mysql_connection_url_regex, url).groups()
return user, password, host, port, database
else:
raise typer.BadParameter('Invalid MySQL connection URL')
@app.command(no_args_is_help=True)
def main(
src: str = typer.Option("", help="MySQL connection string for the source "
"database containing the table to read from"),
dst: str = typer.Option("", help="MySQL connection string for the destination "
"database containing the table to update")
):
src_user, src_password, src_host, src_port, src_database = parse_connection_string(src)
dst_user, dst_password, dst_host, dst_port, dst_database = parse_connection_string(dst)
tables = [("syncer_demo.syncer_src", "syncer_demo.syncer_dst", "id")]
src = pymysql.connect(
host=src_host,
port=int(src_port),
user=src_user,
password=src_password,
db=src_database,
cursorclass=pymysql.cursors.DictCursor,
)
dst = pymysql.connect(
host=dst_host,
port=int(dst_port),
user=dst_user,
password=dst_password,
db=dst_database,
cursorclass=pymysql.cursors.DictCursor,
)
with src.cursor() as src_curs, dst.cursor() as dst_curs:
for (src_tbl, dst_tbl, pk) in tables:
try:
dst_curs.execute("START TRANSACTION;")
log.info(f"Starting sync of {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}")
sync(dst_tbl=dst_tbl, src_tbl=src_tbl, pk=pk, dst_curs=dst_curs, src_curs=src_curs)
dst.commit()
log.info(f"Successful sync of {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}")
except Exception as ex:
log.exception(f"Error in sync of {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}: {ex}")
dst.rollback()
try:
log.info(f"Deleting all data and retrying sync of {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}")
dst_curs.execute("START TRANSACTION;")
dst_curs.execute(f"""DELETE FROM {dst_tbl}""")
sync(dst_tbl=dst_tbl, src_tbl=src_tbl, pk=pk, dst_curs=dst_curs, src_curs=src_curs)
dst.commit()
log.info(f"Successful sync of {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}")
except Exception as ex2:
log.exception(f"Error in replacing all data in {src_tbl} from {src_host} to {dst_host} tbl {dst_tbl}: {ex2}")
dst.rollback()
if __name__ == "__main__":
app()