Skip to content

Commit

Permalink
fix dedup bug as of v1.13.8:
Browse files Browse the repository at this point in the history
* v1.13.8 broke collision resolving for non-identical files;
   the correct filename was reserved but not symlinked to
   the original file, leaving a zerobyte file instead.
   See v1.14.3 github release notes for remediation info

* add sanchecks for early detection of index/fs desync;
   saves performance and gives less confusing logs
  • Loading branch information
9001 committed Aug 30, 2024
1 parent 0123399 commit 3da62ec
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 17 deletions.
1 change: 1 addition & 0 deletions copyparty/svchub.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(
self.no_ansi = args.no_ansi
self.logf: Optional[typing.TextIO] = None
self.logf_base_fn = ""
self.is_dut = False # running in unittest; always False
self.stop_req = False
self.stopping = False
self.stopped = False
Expand Down
33 changes: 28 additions & 5 deletions copyparty/up2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ def deferred_init(self) -> None:
if not self.pp and self.args.exit == "idx":
return self.hub.sigterm()

if self.hub.is_dut:
return

Daemon(self._snapshot, "up2k-snapshot")
if have_e2d:
Daemon(self._hasher, "up2k-hasher")
Expand Down Expand Up @@ -1405,7 +1408,7 @@ def _build_dir(
if dts == lmod and dsz == sz and (nohash or dw[0] != "#" or not sz):
continue

t = "reindex [{}] => [{}] ({}/{}) ({}/{})".format(
t = "reindex [{}] => [{}] mtime({}/{}) size({}/{})".format(
top, rp, dts, lmod, dsz, sz
)
self.log(t)
Expand Down Expand Up @@ -2664,11 +2667,19 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
if stat.S_ISLNK(st.st_mode):
# broken symlink
raise Exception()
except:
if st.st_size != dsize:
t = "candidate ignored (db/fs desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
t = t.format(
wark, st.st_size, dsize, st.st_mtime, dtime, dp_abs
)
self.log(t)
raise Exception("desync")
except Exception as ex:
if n4g:
st = os.stat_result((0, -1, -1, 0, 0, 0, 0, 0, 0, 0))
else:
lost.append((cur, dp_dir, dp_fn))
if str(ex) != "desync":
lost.append((cur, dp_dir, dp_fn))
continue

j = {
Expand Down Expand Up @@ -2726,13 +2737,16 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
ptop = None # use cj or job as appropriate

if not job and wark in reg:
# ensure the files haven't been deleted manually
# ensure the files haven't been edited or deleted
path = ""
st = None
rj = reg[wark]
names = [rj[x] for x in ["name", "tnam"] if x in rj]
for fn in names:
path = djoin(rj["ptop"], rj["prel"], fn)
try:
if bos.path.getsize(path) > 0 or not rj["need"]:
st = bos.stat(path)
if st.st_size > 0 or not rj["need"]:
# upload completed or both present
break
except:
Expand All @@ -2743,6 +2757,14 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
del reg[wark]
break

if st and not self.args.nw and not n4g and st.st_size != rj["size"]:
t = "will not dedup (fs index desync): {}, size fs={} db={}, mtime fs={} db={}, file: {}"
t = t.format(
wark, st.st_size, rj["size"], st.st_mtime, rj["lmod"], path
)
self.log(t)
del reg[wark]

if job or wark in reg:
job = job or reg[wark]
if (
Expand Down Expand Up @@ -2850,6 +2872,7 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
return self._handle_json(job, depth + 1)

job["name"] = self._untaken(pdir, job, now)
dst = djoin(job["ptop"], job["prel"], job["name"])

if not self.args.nw:
dvf: dict[str, Any] = vfs.flags
Expand Down
138 changes: 138 additions & 0 deletions tests/test_dedup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function, unicode_literals

import json
import os
import shutil
import tempfile
import unittest
from itertools import product

from copyparty.authsrv import AuthSrv
from copyparty.httpcli import HttpCli
from tests import util as tu
from tests.util import Cfg


class TestDedup(unittest.TestCase):
def setUp(self):
self.td = tu.get_ramdisk()

def tearDown(self):
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)

def reset(self):
td = os.path.join(self.td, "vfs")
if os.path.exists(td):
shutil.rmtree(td)
os.mkdir(td)
os.chdir(td)
return td

def test(self):
quick = True # sufficient for regular smoketests
# quick = False

dirnames = ["d1", "d2"]
filenames = ["f1", "f2"]
files = [
(
"one",
"BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w",
"XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc",
),
(
"two",
"ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj",
"fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou",
),
]
# (data, chash, wark)

# 3072 uploads in total
self.ctr = 3072
self.conn = None
for e2d in [True, False]:
for dn1, fn1, f1 in product(dirnames, filenames, files):
for dn2, fn2, f2 in product(dirnames, filenames, files):
for dn3, fn3, f3 in product(dirnames, filenames, files):
self.reset()
if self.conn:
self.conn.hsrv.hub.up2k.shutdown()
self.args = Cfg(v=[".::A"], a=[], e2d=e2d)
self.asrv = AuthSrv(self.args, self.log)
self.conn = tu.VHttpConn(
self.args, self.asrv, self.log, b"", True
)
self.do_post(dn1, fn1, f1, True)
self.do_post(dn2, fn2, f2, False)
self.do_post(dn3, fn3, f3, False)
if quick:
break

def do_post(self, dn, fn, fi, first):
print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first)))
self.ctr -= 1

data, chash, wark = fi
hs = self.handshake(dn, fn, fi)
self.assertEqual(hs["wark"], wark)

sfn = hs["name"]
if sfn == fn:
print("using original name " + fn)
else:
print(fn + " got renamed to " + sfn)
if first:
raise Exception("wait what")

if hs["hash"]:
self.assertEqual(hs["hash"][0], chash)
self.put_chunk(dn, wark, chash, data)
elif first:
raise Exception("found first; %r, %r" % ((dn, fn, fi), hs))

h, b = self.curl("%s/%s" % (dn, sfn))
self.assertEqual(b, data)

def handshake(self, dn, fn, fi):
hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n"
msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]}
buf = json.dumps(msg).encode("utf-8")
buf = (hdr % (dn, len(buf))).encode("utf-8") + buf
print("HS -->", buf)
HttpCli(self.conn.setbuf(buf)).run()
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
print("HS <--", ret)
return json.loads(ret[1])

def put_chunk(self, dn, wark, chash, data):
msg = [
"POST /%s/ HTTP/1.1" % (dn,),
"Connection: close",
"Content-Type: application/octet-stream",
"Content-Length: 3",
"X-Up2k-Hash: " + chash,
"X-Up2k-Wark: " + wark,
"",
data,
]
buf = "\r\n".join(msg).encode("utf-8")
print("PUT -->", buf)
HttpCli(self.conn.setbuf(buf)).run()
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
self.assertEqual(ret[1], "thank")

def curl(self, url, binary=False):
h = "GET /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
HttpCli(self.conn.setbuf((h % (url,)).encode("utf-8"))).run()
if binary:
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
return [h.decode("utf-8"), b]

return self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)

def log(self, src, msg, c=0):
print(msg)
4 changes: 4 additions & 0 deletions tests/test_dots.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def hdr(query, uname):


class TestDots(unittest.TestCase):
def __init__(self, *a, **ka):
super(TestDots, self).__init__(*a, **ka)
self.is_dut = True

def setUp(self):
self.td = tu.get_ramdisk()

Expand Down
56 changes: 44 additions & 12 deletions tests/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function, unicode_literals

import os
import platform
import re
import shutil
import socket
Expand All @@ -16,9 +15,7 @@

import jinja2

WINDOWS = platform.system() == "Windows"
ANYWIN = WINDOWS or sys.platform in ["msys"]
MACOS = platform.system() == "Darwin"
from copyparty.__init__ import MACOS, WINDOWS, E

J2_ENV = jinja2.Environment(loader=jinja2.BaseLoader) # type: ignore
J2_FILES = J2_ENV.from_string("{{ files|join('\n') }}\nJ2EOT")
Expand All @@ -42,10 +39,11 @@ def eprint(*a, **ka):
# 25% faster; until any tests do symlink stuff


from copyparty.__init__ import E
from copyparty.__main__ import init_E
from copyparty.broker_thr import BrokerThr
from copyparty.ico import Ico
from copyparty.u2idx import U2idx
from copyparty.up2k import Up2k
from copyparty.util import FHC, CachedDict, Garda, Unrecv

init_E(E)
Expand Down Expand Up @@ -119,10 +117,10 @@ class Cfg(Namespace):
def __init__(self, a=None, v=None, c=None, **ka0):
ka = {}

ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol"
ex = "chpw daw dav_auth dav_inf dav_mac dav_rt e2d e2ds e2dsa e2t e2ts e2tsr e2v e2vu e2vp early_ban ed emp exp force_js getmod grid gsel hardlink ih ihead magic never_symlink nid nih no_acode no_athumb no_dav no_db_ip no_dedup no_del no_dupe no_lifetime no_logues no_mv no_pipe no_poll no_readme no_robots no_sb_md no_sb_lg no_scandir no_tarcmp no_thumb no_vthumb no_zip nrand nw og og_no_head og_s_title q rand smb srch_dbg stats uqe vague_403 vc ver write_uplog xdev xlink xvol zs"
ka.update(**{k: False for k in ex.split()})

ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
ex = "dotpart dotsrch hook_v no_dhash no_fastboot no_fpool no_htp no_rescan no_sendfile no_snap no_voldump re_dhash plain_ip"
ka.update(**{k: True for k in ex.split()})

ex = "ah_cli ah_gen css_browser hist js_browser js_other mime mimes no_forget no_hash no_idx nonsus_urls og_tpl og_ua"
Expand All @@ -137,9 +135,12 @@ def __init__(self, a=None, v=None, c=None, **ka0):
ex = "db_act k304 loris re_maxage rproxy rsp_jtr rsp_slp s_wr_slp snap_wri theme themes turbo"
ka.update(**{k: 0 for k in ex.split()})

ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname R RS SR"
ex = "ah_alg bname chpw_db doctitle df exit favico idp_h_usr ipa html_head lg_sbf log_fk md_sbf name og_desc og_site og_th og_title og_title_a og_title_v og_title_i shr tcolor textfiles unlist vname xff_src R RS SR"
ka.update(**{k: "" for k in ex.split()})

ex = "ban_403 ban_404 ban_422 ban_pw ban_url"
ka.update(**{k: "no" for k in ex.split()})

ex = "grp on403 on404 xad xar xau xban xbd xbr xbu xiu xm"
ka.update(**{k: [] for k in ex.split()})

Expand Down Expand Up @@ -221,11 +222,29 @@ def settimeout(self, a):
pass


class VHub(object):
def __init__(self, args, asrv, log):
self.args = args
self.asrv = asrv
self.log = log
self.is_dut = True
self.up2k = Up2k(self)


class VBrokerThr(BrokerThr):
def __init__(self, hub):
self.hub = hub
self.log = hub.log
self.args = hub.args
self.asrv = hub.asrv


class VHttpSrv(object):
def __init__(self, args, asrv, log):
self.args = args
self.asrv = asrv
self.log = log
self.hub = None

self.broker = NullBroker(args, asrv)
self.prism = None
Expand All @@ -252,18 +271,25 @@ def get_u2idx(self):
return self.u2idx


class VHttpSrvUp2k(VHttpSrv):
def __init__(self, args, asrv, log):
super(VHttpSrvUp2k, self).__init__(args, asrv, log)
self.hub = VHub(args, asrv, log)
self.broker = VBrokerThr(self.hub)


class VHttpConn(object):
def __init__(self, args, asrv, log, buf):
def __init__(self, args, asrv, log, buf, use_up2k=False):
self.t0 = time.time()
self.s = VSock(buf)
self.sr = Unrecv(self.s, None) # type: ignore
self.aclose = {}
self.addr = ("127.0.0.1", "42069")
self.args = args
self.asrv = asrv
self.bans = {}
self.freshen_pwd = 0.0
self.hsrv = VHttpSrv(args, asrv, log)

Ctor = VHttpSrvUp2k if use_up2k else VHttpSrv
self.hsrv = Ctor(args, asrv, log)
self.ico = Ico(args)
self.ipa_nm = None
self.lf_url = None
Expand All @@ -279,6 +305,12 @@ def __init__(self, args, asrv, log, buf):
self.u2fh = FHC()

self.get_u2idx = self.hsrv.get_u2idx
self.setbuf(buf)

def setbuf(self, buf):
self.s = VSock(buf)
self.sr = Unrecv(self.s, None) # type: ignore
return self


if WINDOWS:
Expand Down

0 comments on commit 3da62ec

Please sign in to comment.