diff --git a/copyparty/svchub.py b/copyparty/svchub.py index fd679332..3bd05489 100644 --- a/copyparty/svchub.py +++ b/copyparty/svchub.py @@ -106,6 +106,7 @@ def __init__( self.no_ansi = args.no_ansi self.logf: Optional[typing.TextIO] = None self.logf_base_fn = "" + self.is_dut = False # running in unittest; always False self.stop_req = False self.stopping = False self.stopped = False diff --git a/copyparty/up2k.py b/copyparty/up2k.py index c4821171..2ae5274a 100644 --- a/copyparty/up2k.py +++ b/copyparty/up2k.py @@ -236,6 +236,9 @@ def deferred_init(self) -> None: if not self.pp and self.args.exit == "idx": return self.hub.sigterm() + if self.hub.is_dut: + return + Daemon(self._snapshot, "up2k-snapshot") if have_e2d: Daemon(self._hasher, "up2k-hasher") @@ -1405,7 +1408,7 @@ def _build_dir( if dts == lmod and dsz == sz and (nohash or dw[0] != "#" or not sz): continue - t = "reindex [{}] => [{}] ({}/{}) ({}/{})".format( + t = "reindex [{}] => [{}] mtime({}/{}) size({}/{})".format( top, rp, dts, lmod, dsz, sz ) self.log(t) @@ -2664,11 +2667,19 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]: if stat.S_ISLNK(st.st_mode): # broken symlink raise Exception() - except: + if st.st_size != dsize: + t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}" + t = t.format( + wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs + ) + self.log(t) + raise Exception("desync") + except Exception as ex: if n4g: st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0)) else: - lost.append((cur, dp_dir, dp_fn)) + if str(ex) != "desync": + lost.append((cur, dp_dir, dp_fn)) continue j = { @@ -2726,13 +2737,16 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]: ptop = None # use cj or job as appropriate if not job and wark in reg: - # ensure the files haven't been deleted manually + # ensure the files haven't been edited or deleted + path = "" + st = None rj = reg[wark] names = [rj[x] for x in ["name", "tnam"] if x in rj] for fn in names: path = djoin(rj["ptop"], rj["prel"], fn) try: - if bos.path.getsize(path) > 0 or not rj["need"]: + st = bos.stat(path) + if st.st_size > 0 or not rj["need"]: # upload completed or both present break except: @@ -2743,6 +2757,14 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]: del reg[wark] break + if st and not self.args.nw and not n4g and st.st_size != rj["size"]: + t = "will not dedup (fs index desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}" + t = t.format( + wark, st.st_size, rj["size"], st.st_mtime, rj["lmod"], path + ) + self.log(t) + del reg[wark] + if job or wark in reg: job = job or reg[wark] if ( @@ -2850,6 +2872,7 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]: return self._handle_json(job, depth + 1) job["name"] = self._untaken(pdir, job, now) + dst = djoin(job["ptop"], job["prel"], job["name"]) if not self.args.nw: dvf: dict[str, Any] = vfs.flags diff --git a/tests/test_dedup.py b/tests/test_dedup.py new file mode 100644 index 00000000..cc480e3b --- /dev/null +++ b/tests/test_dedup.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# coding: utf-8 +from __future__ import print_function, unicode_literals + +import json +import os +import shutil +import tempfile +import unittest +from itertools import product + +from copyparty.authsrv import AuthSrv +from copyparty.httpcli import HttpCli +from tests import util as tu +from tests.util import Cfg + + +class TestDedup(unittest.TestCase): + def setUp(self): + self.td = tu.get_ramdisk() + + def tearDown(self): + os.chdir(tempfile.gettempdir()) + shutil.rmtree(self.td) + + def reset(self): + td = os.path.join(self.td, "vfs") + if os.path.exists(td): + shutil.rmtree(td) + os.mkdir(td) + os.chdir(td) + return td + + def test(self): + quick = True # sufficient for regular smoketests + # quick = False + + dirnames = ["d1", "d2"] + filenames = ["f1", "f2"] + files = [ + ( + "one", + "BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w", + "XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc", + ), + ( + "two", + "ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj", + "fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou", + ), + ] + # (data, chash, wark) + + # 3072 uploads in total + self.ctr = 3072 + self.conn = None + for e2d in [True, False]: + for dn1, fn1, f1 in product(dirnames, filenames, files): + for dn2, fn2, f2 in product(dirnames, filenames, files): + for dn3, fn3, f3 in product(dirnames, filenames, files): + self.reset() + if self.conn: + self.conn.hsrv.hub.up2k.shutdown() + self.args = Cfg(v=[".::A"], a=[], e2d=e2d) + self.asrv = AuthSrv(self.args, self.log) + self.conn = tu.VHttpConn( + self.args, self.asrv, self.log, b"", True + ) + self.do_post(dn1, fn1, f1, True) + self.do_post(dn2, fn2, f2, False) + self.do_post(dn3, fn3, f3, False) + if quick: + break + + def do_post(self, dn, fn, fi, first): + print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first))) + self.ctr -= 1 + + data, chash, wark = fi + hs = self.handshake(dn, fn, fi) + self.assertEqual(hs["wark"], wark) + + sfn = hs["name"] + if sfn == fn: + print("using original name " + fn) + else: + print(fn + " got renamed to " + sfn) + if first: + raise Exception("wait what") + + if hs["hash"]: + self.assertEqual(hs["hash"][0], chash) + self.put_chunk(dn, wark, chash, data) + elif first: + raise Exception("found first; %r, %r" % ((dn, fn, fi), hs)) + + h, b = self.curl("%s/%s" % (dn, sfn)) + self.assertEqual(b, data) + + def handshake(self, dn, fn, fi): + hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n" + msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]} + buf = json.dumps(msg).encode("utf-8") + buf = (hdr % (dn, len(buf))).encode("utf-8") + buf + print("HS -->", buf) + HttpCli(self.conn.setbuf(buf)).run() + ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) + print("HS <--", ret) + return json.loads(ret[1]) + + def put_chunk(self, dn, wark, chash, data): + msg = [ + "POST /%s/ HTTP/1.1" % (dn,), + "Connection: close", + "Content-Type: application/octet-stream", + "Content-Length: 3", + "X-Up2k-Hash: " + chash, + "X-Up2k-Wark: " + wark, + "", + data, + ] + buf = "\r\n".join(msg).encode("utf-8") + print("PUT -->", buf) + HttpCli(self.conn.setbuf(buf)).run() + ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) + self.assertEqual(ret[1], "thank") + + def curl(self, url, binary=False): + h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n" + HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run() + if binary: + h, b = self.conn.s._reply.split(b"\r\n\r\n", 1) + return [h.decode("utf-8"), b] + + return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1) + + def log(self, src, msg, c=0): + print(msg) diff --git a/tests/test_dots.py b/tests/test_dots.py index 62a38f4f..a34795c1 100644 --- a/tests/test_dots.py +++ b/tests/test_dots.py @@ -24,6 +24,10 @@ def hdr(query, uname): class TestDots(unittest.TestCase): + def __init__(self, *a, **ka): + super(TestDots, self).__init__(*a, **ka) + self.is_dut = True + def setUp(self): self.td = tu.get_ramdisk() diff --git a/tests/util.py b/tests/util.py index cf67ed86..2c04cf67 100644 --- a/tests/util.py +++ b/tests/util.py @@ -3,7 +3,6 @@ from __future__ import print_function, unicode_literals import os -import platform import re import shutil import socket @@ -16,9 +15,7 @@ import jinja2 -WINDOWS = platform.system() == "Windows" -ANYWIN = WINDOWS or sys.platform in ["msys"] -MACOS = platform.system() == "Darwin" +from copyparty.__init__ import MACOS, WINDOWS, E J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT") @@ -42,10 +39,11 @@ def eprint(*a, **ka): # 25% faster; until any tests do symlink stuff -from copyparty.__init__ import E from copyparty.__main__ import init_E +from copyparty.broker_thr import BrokerThr from copyparty.ico import Ico from copyparty.u2idx import U2idx +from copyparty.up2k import Up2k from copyparty.util import FHC, CachedDict, Garda, Unrecv init_E(E) @@ -119,10 +117,10 @@ class Cfg(Namespace): def __init__(self, a=None, v=None, c=None, **ka0): ka = {} - ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol" + ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_db_ip no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs" ka.update(**{k: False for k in ex.split()}) - ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip" + ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip" ka.update(**{k: True for k in ex.split()}) ex = "ah_cli ah_gen css_browser hist js_browser js_other mime mimes no_forget no_hash no_idx nonsus_urls og_tpl og_ua" @@ -137,9 +135,12 @@ def __init__(self, a=None, v=None, c=None, **ka0): ex = "db_act k304 loris re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo" ka.update(**{k: 0 for k in ex.split()}) - ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname R RS SR" + ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr ipa html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname xff_src R RS SR" ka.update(**{k: "" for k in ex.split()}) + ex = "ban_403 ban_404 ban_422 ban_pw ban_url" + ka.update(**{k: "no" for k in ex.split()}) + ex = "grp on403 on404 xad xar xau xban xbd xbr xbu xiu xm" ka.update(**{k: [] for k in ex.split()}) @@ -221,11 +222,29 @@ def settimeout(self, a): pass +class VHub(object): + def __init__(self, args, asrv, log): + self.args = args + self.asrv = asrv + self.log = log + self.is_dut = True + self.up2k = Up2k(self) + + +class VBrokerThr(BrokerThr): + def __init__(self, hub): + self.hub = hub + self.log = hub.log + self.args = hub.args + self.asrv = hub.asrv + + class VHttpSrv(object): def __init__(self, args, asrv, log): self.args = args self.asrv = asrv self.log = log + self.hub = None self.broker = NullBroker(args, asrv) self.prism = None @@ -252,18 +271,25 @@ def get_u2idx(self): return self.u2idx +class VHttpSrvUp2k(VHttpSrv): + def __init__(self, args, asrv, log): + super(VHttpSrvUp2k, self).__init__(args, asrv, log) + self.hub = VHub(args, asrv, log) + self.broker = VBrokerThr(self.hub) + + class VHttpConn(object): - def __init__(self, args, asrv, log, buf): + def __init__(self, args, asrv, log, buf, use_up2k=False): self.t0 = time.time() - self.s = VSock(buf) - self.sr = Unrecv(self.s, None) # type: ignore self.aclose = {} self.addr = ("127.0.0.1", "42069") self.args = args self.asrv = asrv self.bans = {} self.freshen_pwd = 0.0 - self.hsrv = VHttpSrv(args, asrv, log) + + Ctor = VHttpSrvUp2k if use_up2k else VHttpSrv + self.hsrv = Ctor(args, asrv, log) self.ico = Ico(args) self.ipa_nm = None self.lf_url = None @@ -279,6 +305,12 @@ def __init__(self, args, asrv, log, buf): self.u2fh = FHC() self.get_u2idx = self.hsrv.get_u2idx + self.setbuf(buf) + + def setbuf(self, buf): + self.s = VSock(buf) + self.sr = Unrecv(self.s, None) # type: ignore + return self if WINDOWS: