diff --git a/pywxdump/__init__.py b/pywxdump/__init__.py index f2a98846..a72388ca 100644 --- a/pywxdump/__init__.py +++ b/pywxdump/__init__.py @@ -28,4 +28,4 @@ # PYWXDUMP_ROOT_PATH = os.path.dirname(__file__) # db_init = DBPool("DBPOOL_INIT") -__version__ = "3.0.16" +__version__ = "3.0.17" diff --git a/pywxdump/api/api.py b/pywxdump/api/api.py index c7a47cdd..04122db2 100644 --- a/pywxdump/api/api.py +++ b/pywxdump/api/api.py @@ -75,18 +75,39 @@ def init_key(): if not my_wxid: return ReJson(1002, body=f"my_wxid is required: {my_wxid}") + old_merge_save_path = read_session(g.sf, my_wxid, "merge_path") + if os.path.exists(old_merge_save_path): + pmsg = ParsingMSG(old_merge_save_path) + pmsg.close_all_connection() + out_path = os.path.join(g.tmp_path, "decrypted", my_wxid) if my_wxid else os.path.join(g.tmp_path, "decrypted") # 检查文件夹中文件是否被占用 if os.path.exists(out_path): try: shutil.rmtree(out_path) except PermissionError as e: + # 显示堆栈信息 + logging.error(f"{e}", exc_info=True) return ReJson(2001, body=str(e)) code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=out_path) time.sleep(1) if code: - save_session(g.sf, my_wxid, "merge_path", merge_save_path) + # 移动merge_save_path到g.tmp_path/my_wxid + if not os.path.exists(os.path.join(g.tmp_path, my_wxid)): + os.makedirs(os.path.join(g.tmp_path, my_wxid)) + merge_save_path_new = os.path.join(g.tmp_path, my_wxid, "merge_all.db") + shutil.move(merge_save_path, str(merge_save_path_new)) + + # 删除out_path + if os.path.exists(out_path): + try: + shutil.rmtree(out_path) + except PermissionError as e: + # 显示堆栈信息 + logging.error(f"{e}", exc_info=True) + + save_session(g.sf, my_wxid, "merge_path", merge_save_path_new) save_session(g.sf, my_wxid, "wx_path", wx_path) save_session(g.sf, my_wxid, "key", key) save_session(g.sf, my_wxid, "my_wxid", my_wxid) diff --git a/pywxdump/dbpreprocess/dbbase.py b/pywxdump/dbpreprocess/dbbase.py index 6e2aac2b..cae91dcf 100644 --- a/pywxdump/dbpreprocess/dbbase.py +++ b/pywxdump/dbpreprocess/dbbase.py @@ -69,6 +69,13 @@ def close_connection(self): logging.info(f"关闭数据库 - {self._db_path}") self._db_connection = None + def close_all_connection(self): + for db_path in self._connection_pool: + if self._connection_pool[db_path]: + self._connection_pool[db_path].close() + logging.info(f"关闭数据库 - {db_path}") + self._connection_pool[db_path] = None + def show__singleton_instances(self): print(self._singleton_instances) diff --git a/pywxdump/wx_info/merge_db.py b/pywxdump/wx_info/merge_db.py index 8f8e1abf..8449c73d 100644 --- a/pywxdump/wx_info/merge_db.py +++ b/pywxdump/wx_info/merge_db.py @@ -236,60 +236,63 @@ def merge_db(db_paths, save_path="merge.db", CreateTime: int = 0, endCreateTime: # 获取表名 sql = f"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name;" tables = execute_sql(db, sql) - for table in tables: - table = table[0] - if table == "sqlite_sequence": - continue - # 获取表中的字段名 - sql = f"PRAGMA table_info({table})" - columns = execute_sql(db, sql) - if not columns or len(columns) < 1: - continue - col_type = { - (i[1] if isinstance(i[1], str) else i[1].decode(), i[2] if isinstance(i[2], str) else i[2].decode()) for - i in columns} - columns = [i[1] if isinstance(i[1], str) else i[1].decode() for i in columns] - if not columns or len(columns) < 1: - continue - - # 检测表是否存在 - sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'" - out_cursor.execute(sql) - if len(out_cursor.fetchall()) < 1: - # 创建表 - # 拼接创建表的SQL语句 - column_definitions = [] - for column in col_type: - column_name = column[0] if isinstance(column[0], str) else column[0].decode() - column_type = column[1] if isinstance(column[1], str) else column[1].decode() - column_definition = f"{column_name} {column_type}" - column_definitions.append(column_definition) - sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(column_definitions)})" - # sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(columns)})" - out_cursor.execute(sql) + try: + for table in tables: + table = table[0] + if table == "sqlite_sequence": + continue + # 获取表中的字段名 + sql = f"PRAGMA table_info({table})" + columns = execute_sql(db, sql) + if not columns or len(columns) < 1: + continue + col_type = { + (i[1] if isinstance(i[1], str) else i[1].decode(), i[2] if isinstance(i[2], str) else i[2].decode()) for + i in columns} + columns = [i[1] if isinstance(i[1], str) else i[1].decode() for i in columns] + if not columns or len(columns) < 1: + continue - # 创建包含 NULL 值比较的 UNIQUE 索引 - index_name = f"{table}_unique_index" - coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) # 将 NULL 值转换为 '' - sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})" + # 检测表是否存在 + sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'" out_cursor.execute(sql) - - # 获取表中的数据 - if "CreateTime" in columns and CreateTime > 0: - sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table} WHERE CreateTime>? ORDER BY CreateTime" - src_data = execute_sql(db, sql, (CreateTime,)) - else: - sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table}" - src_data = execute_sql(db, sql) - if not src_data or len(src_data) < 1: - continue - # 插入数据 - sql = f"INSERT OR IGNORE INTO {table} ({','.join([i[0] for i in col_type])}) VALUES ({','.join(['?'] * len(columns))})" - try: - out_cursor.executemany(sql, src_data) - except Exception as e: - logging.error(f"error: {alias}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n**********") - outdb.commit() + if len(out_cursor.fetchall()) < 1: + # 创建表 + # 拼接创建表的SQL语句 + column_definitions = [] + for column in col_type: + column_name = column[0] if isinstance(column[0], str) else column[0].decode() + column_type = column[1] if isinstance(column[1], str) else column[1].decode() + column_definition = f"{column_name} {column_type}" + column_definitions.append(column_definition) + sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(column_definitions)})" + # sql = f"CREATE TABLE IF NOT EXISTS {table} ({','.join(columns)})" + out_cursor.execute(sql) + + # 创建包含 NULL 值比较的 UNIQUE 索引 + index_name = f"{table}_unique_index" + coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) # 将 NULL 值转换为 '' + sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})" + out_cursor.execute(sql) + + # 获取表中的数据 + if "CreateTime" in columns and CreateTime > 0: + sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table} WHERE CreateTime>? ORDER BY CreateTime" + src_data = execute_sql(db, sql, (CreateTime,)) + else: + sql = f"SELECT {','.join([i[0] for i in col_type])} FROM {table}" + src_data = execute_sql(db, sql) + if not src_data or len(src_data) < 1: + continue + # 插入数据 + sql = f"INSERT OR IGNORE INTO {table} ({','.join([i[0] for i in col_type])}) VALUES ({','.join(['?'] * len(columns))})" + try: + out_cursor.executemany(sql, src_data) + except Exception as e: + logging.error(f"error: {alias}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n**********") + outdb.commit() + except Exception as e: + logging.error(f"fun(merge_db) error: {alias}\n{e}\n**********") db.close() outdb.close() return save_path