-
Notifications
You must be signed in to change notification settings - Fork 53
/
local_watcher.py
1376 lines (1219 loc) · 59.4 KB
/
local_watcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# coding: utf-8
import errno
import re
import sqlite3
import sys
from logging import getLogger
from os.path import basename, splitext
from pathlib import Path
from queue import Queue
from threading import Lock
from time import mktime, sleep
from typing import TYPE_CHECKING, Any, Dict, Optional, Set, Tuple
from PyQt5.QtCore import pyqtSignal
from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
from ...client.local import FileInfo
from ...constants import LINUX, MAC, ROOT, UNACCESSIBLE_HASH, WINDOWS
from ...exceptions import ThreadInterrupt
from ...objects import DocPair, Metrics
from ...options import Options
from ...utils import current_milli_time, force_decode, is_generated_tmp_file
from ...utils import normalize_event_filename as normalize
from ..activity import tooltip
from ..workers import EngineWorker, Worker
if WINDOWS:
import watchdog.observers as ob
# Monkey-patch Watchdog to:
# - Set the Windows hack delay to 0 in WindowsApiEmitter,
# otherwise we might miss some events
# - Increase the ReadDirectoryChangesW buffer size
ob.read_directory_changes.WATCHDOG_TRAVERSE_MOVED_DIR_DELAY = 0
ob.winapi.BUFFER_SIZE = 8192
if TYPE_CHECKING:
from ..dao.sqlite import EngineDAO # noqa
from ..engine import Engine # noqa
__all__ = ("DriveFSEventHandler", "LocalWatcher", "WIN_MOVE_RESOLUTION_PERIOD")
log = getLogger(__name__)
# Windows 2s between resolution of delete event
WIN_MOVE_RESOLUTION_PERIOD = 2000
TEXT_EDIT_TMP_FILE_PATTERN = r".*\.rtf\.sb\-(\w)+\-(\w)+$"
def is_text_edit_tmp_file(name: str) -> bool:
return bool(re.match(TEXT_EDIT_TMP_FILE_PATTERN, name))
class LocalWatcher(EngineWorker):
localScanFinished = pyqtSignal()
rootMoved = pyqtSignal(Path)
rootDeleted = pyqtSignal()
docDeleted = pyqtSignal(Path)
fileAlreadyExists = pyqtSignal(Path, Path)
def __init__(self, engine: "Engine", dao: "EngineDAO") -> None:
super().__init__(engine, dao, name="LocalWatcher")
self.local = self.engine.local
self.lock = Lock()
self.watchdog_queue: Queue = Queue()
# Delay for the scheduled recursive scans of
# a created / modified / moved folder under Windows
self._windows_folder_scan_delay = 10000 # 10 seconds
if WINDOWS:
log.info(
"Windows detected so delete event will be delayed "
f"by {WIN_MOVE_RESOLUTION_PERIOD} ms"
)
self._metrics = {
"last_local_scan_time": -1,
"new_files": 0,
"update_files": 0,
"delete_files": 0,
"last_event": 0,
}
self._event_handler: Optional[DriveFSEventHandler] = None
self._observer: Optional[Observer] = None
self._root_observer: Optional[Observer] = None
self._delete_events: Dict[str, Tuple[int, DocPair]] = {}
self._folder_scan_events: Dict[Path, Tuple[float, DocPair]] = {}
def _execute(self) -> None:
try:
if not self.local.exists(ROOT):
self.rootDeleted.emit()
return
self._setup_watchdog()
self._scan()
if LINUX:
self._update_local_status()
if WINDOWS:
# Check dequeue and folder scan only every 100 loops (1s)
now = current_milli_time()
self._win_delete_interval = self._win_folder_scan_interval = now
while "working":
self._interact()
sleep(1)
while not self.watchdog_queue.empty():
self.handle_watchdog_event(self.watchdog_queue.get())
if WINDOWS:
self._win_delete_check()
self._win_folder_scan_check()
# If there are a _lot_ of FS events, it is better to let Qt handling
# some app events. Else the GUI will not be responsive enough.
self._interact()
if WINDOWS:
self._win_delete_check()
self._win_folder_scan_check()
except ThreadInterrupt:
raise
finally:
with self.lock:
self._stop_watchdog()
def _update_local_status(self) -> None:
"""Fetch State of each local file then update sync status."""
local = self.local
send_sync_status = self.engine.manager.osi.send_sync_status
doc_pairs = self.dao.get_states_from_partial_local(ROOT)
# Skip the first as it is the ROOT
for doc_pair in doc_pairs[1:]:
abs_path = local.abspath(doc_pair.local_path)
send_sync_status(doc_pair, abs_path)
def win_queue_empty(self) -> bool:
return not self._delete_events
def get_win_queue_size(self) -> int:
return len(self._delete_events)
def _win_delete_check(self) -> None:
elapsed = current_milli_time() - WIN_MOVE_RESOLUTION_PERIOD
if self._win_delete_interval >= elapsed:
return
with self.lock:
self._win_dequeue_delete()
self._win_delete_interval = current_milli_time()
@tooltip("Dequeue delete")
def _win_dequeue_delete(self) -> None:
try:
for evt in self._delete_events.copy().values():
evt_time, evt_pair = evt
if current_milli_time() - evt_time < WIN_MOVE_RESOLUTION_PERIOD:
log.info(
"Win: ignoring delete event as waiting for move resolution "
f"period expiration: {evt!r}"
)
continue
if not self.local.exists(evt_pair.local_path):
log.info(f"Win: handling watchdog delete for event: {evt!r}")
self._handle_watchdog_delete(evt_pair)
else:
remote_id = self.local.get_remote_id(evt_pair.local_path)
if not remote_id or remote_id == evt_pair.remote_ref:
log.info(
f"Win: ignoring delete event as file still exists: {evt!r}"
)
else:
log.info(f"Win: handling watchdog delete for event: {evt!r}")
self._handle_watchdog_delete(evt_pair)
log.info(f"Win: dequeuing delete event: {evt!r}")
del self._delete_events[evt_pair.remote_ref]
except ThreadInterrupt:
raise
except Exception:
log.exception("Win: dequeuing deletion error")
def win_folder_scan_empty(self) -> bool:
return not self._folder_scan_events
def get_win_folder_scan_size(self) -> int:
return len(self._folder_scan_events)
def _win_folder_scan_check(self) -> None:
elapsed = current_milli_time() - self._windows_folder_scan_delay
if self._win_folder_scan_interval >= elapsed:
return
with self.lock:
self._win_dequeue_folder_scan()
self._win_folder_scan_interval = current_milli_time()
@tooltip("Dequeue folder scan")
def _win_dequeue_folder_scan(self) -> None:
try:
events = self._folder_scan_events.copy().items()
for local_path, (evt_time, evt_pair) in events:
delay = current_milli_time() - evt_time
if delay < self._windows_folder_scan_delay:
log.info(
"Win: ignoring folder to scan as waiting for folder scan "
f"delay expiration: {local_path!r}"
)
continue
if not self.local.exists(local_path):
log.info(
"Win: dequeuing folder scan event as folder "
f"doesn't exist: {local_path!r}"
)
self._folder_scan_events.pop(local_path, None)
continue
local_info = self.local.try_get_info(local_path)
if not local_info:
log.debug(
"Win: dequeuing folder scan event as folder "
f"doesn't exist: {local_path!r}"
)
self._folder_scan_events.pop(local_path, None)
continue
log.info(f"Win: handling folder to scan: {local_path!r}")
self.scan_pair(local_path)
local_info = self.local.try_get_info(local_path)
mtime = (
mktime(local_info.last_modification_time.timetuple())
if local_info
else 0
)
if mtime > evt_time:
log.info(
f"Re-schedule scan as the folder has been modified since last check: {evt_pair}"
)
self._folder_scan_events[local_path] = (mtime, evt_pair)
else:
log.info(f"Win: dequeuing folder scan event: {evt_pair!r}")
self._folder_scan_events.pop(local_path, None)
except ThreadInterrupt:
raise
except Exception:
log.exception("Win: dequeuing folder scan error")
@tooltip("Full local scan")
def _scan(self) -> None:
# If synchronization features are disabled, we just need to emit that specific
# signal to let the Remote Watcher start its own thread and the Queue Manager.
if not Options.synchronization_enabled:
self.localScanFinished.emit()
return
log.info("Full scan started")
start_ms = current_milli_time()
to_pause = not self.engine.queue_manager.is_paused()
if to_pause:
self._suspend_queue()
self._delete_files: Dict[str, DocPair] = {}
self._protected_files: Dict[str, bool] = {}
info = self.local.get_info(ROOT)
self._scan_recursive(info)
self._scan_handle_deleted_files()
self._metrics["last_local_scan_time"] = current_milli_time() - start_ms
log.info(f"Full scan finished in {self._metrics['last_local_scan_time']}ms")
if to_pause:
self.engine.queue_manager.resume()
self.localScanFinished.emit()
def _scan_handle_deleted_files(self) -> None:
for deleted in self._delete_files:
if deleted in self._protected_files:
continue
self.dao.delete_local_state(self._delete_files[deleted])
self._delete_files = {}
def get_metrics(self) -> Metrics:
metrics = super().get_metrics()
if self._event_handler:
metrics["fs_events"] = self._event_handler.counter
return {**metrics, **self._metrics}
def _suspend_queue(self) -> None:
queue = self.engine.queue_manager
queue.suspend()
for processor in queue.get_processors_on(ROOT, exact_match=False):
processor.stop()
def scan_pair(self, local_path: Path) -> None:
to_pause = not self.engine.queue_manager.is_paused()
if to_pause:
self._suspend_queue()
info = self.local.get_info(local_path)
self._scan_recursive(info, recursive=False)
self._scan_handle_deleted_files()
if to_pause:
self.engine.queue_manager.resume()
def empty_events(self) -> bool:
ret = self.watchdog_queue.empty()
if WINDOWS:
ret &= self.win_queue_empty()
ret &= self.win_folder_scan_empty()
return ret
def get_creation_time(self, child_full_path: Path) -> int:
if WINDOWS:
return int(child_full_path.stat().st_ctime)
stat = child_full_path.stat()
# Try inode number as on HFS seems to be increasing
if MAC and hasattr(stat, "st_ino"):
return stat.st_ino
if hasattr(stat, "st_birthtime"):
return stat.st_birthtime
return 0
def _scan_recursive(self, info: FileInfo, recursive: bool = True) -> None:
if recursive:
# Don't interact if only one level
self._interact()
dao, client = self.dao, self.local
# Load all children from DB
log.debug(f"Fetching DB local children of {info.path!r}")
db_children = dao.get_local_children(info.path)
# Create a list of all children by their name
to_scan = []
to_scan_new = []
children = {child.local_name: child for child in db_children}
# Load all children from FS
# detect recently deleted children
log.debug(f"Fetching FS children info of {info.path!r}")
try:
fs_children_info = client.get_children_info(info.path)
except OSError:
# The folder has been deleted in the mean time
return
# Get remote children to be able to check if a local child found
# during the scan is really a new item or if it is just the result
# of a remote creation performed on the file system but not yet
# updated in the DB as for its local information.
remote_children: Set[str] = set()
parent_remote_id = client.get_remote_id(info.path)
if parent_remote_id:
pairs_ = dao.get_new_remote_children(parent_remote_id)
remote_children = {pair.remote_name for pair in pairs_}
# recursively update children
for child_info in fs_children_info:
child_name = child_info.path.name
child_type = "folder" if child_info.folderish else "file"
if child_name not in children:
try:
remote_id = client.get_remote_id(child_info.path)
if not remote_id:
# Avoid IntegrityError: do not insert a new pair state
# if item is already referenced in the DB
if child_name in remote_children:
log.info(
f"Skip potential new {child_type} as it is the "
f"result of a remote creation: {child_info.path!r}"
)
continue
log.info(f"Found new {child_type} {child_info.path!r}")
self._metrics["new_files"] += 1
dao.insert_local_state(child_info, info.path)
else:
log.info(
"Found potential moved file "
f"{child_info.path!r}[{remote_id}]"
)
doc_pair = dao.get_normal_state_from_remote(remote_id)
if doc_pair and client.exists(doc_pair.local_path):
if (
not client.is_case_sensitive()
and str(doc_pair.local_path).lower()
== str(child_info.path).lower()
):
log.info(
"Case renaming on a case insensitive filesystem, "
f"update info and ignore: {doc_pair!r}"
)
if doc_pair.local_name in children:
del children[doc_pair.local_name]
doc_pair.local_state = "moved"
dao.update_local_state(doc_pair, child_info)
continue
# possible move-then-copy case, NXDRIVE-471
child_full_path = client.abspath(child_info.path)
child_creation_time = self.get_creation_time(
child_full_path
)
doc_full_path = client.abspath(doc_pair.local_path)
doc_creation_time = self.get_creation_time(doc_full_path)
log.debug(
f"child_cre_time={child_creation_time}, "
f"doc_cre_time={doc_creation_time}"
)
if not doc_pair:
log.info(
f"Cannot find reference for {child_info.path!r} in "
"database, put it in locally_created state"
)
self._metrics["new_files"] += 1
dao.insert_local_state(child_info, info.path)
self._protected_files[remote_id] = True
elif doc_pair.processor > 0:
log.info(
f"Skip pair as it is being processed: {doc_pair!r}"
)
continue
elif doc_pair.local_path == child_info.path:
log.info(
f"Skip pair as it is not a real move: {doc_pair!r}"
)
continue
elif not client.exists(doc_pair.local_path) or (
client.exists(doc_pair.local_path)
and child_creation_time < doc_creation_time
):
# If file exists at old location, and the file
# at the original location is newer, it is
# moved to the new location earlier then copied
# back
log.info("Found moved file")
doc_pair.local_state = "moved"
dao.update_local_state(doc_pair, child_info)
self._protected_files[doc_pair.remote_ref] = True
if (
client.exists(doc_pair.local_path)
and child_creation_time < doc_creation_time
):
# Need to put back the new created - need to
# check maybe if already there
log.debug(
"Found a moved file that has been copy/pasted "
f"back: {doc_pair.local_path!r}"
)
client.remove_remote_id(doc_pair.local_path)
dao.insert_local_state(
client.get_info(doc_pair.local_path),
doc_pair.local_path.parent,
)
else:
# File still exists - must check the remote_id
old_remote_id = client.get_remote_id(doc_pair.local_path)
if old_remote_id == remote_id:
# Local copy paste
log.info("Found a copy-paste of document")
client.remove_remote_id(child_info.path)
dao.insert_local_state(child_info, info.path)
else:
# Moved and renamed
log.info(f"Moved and renamed: {doc_pair!r}")
old_pair = dao.get_normal_state_from_remote(
old_remote_id
)
if old_pair is not None:
old_pair.local_state = "moved"
# Check digest also
digest = child_info.get_digest()
if old_pair.local_digest != digest:
old_pair.local_digest = digest
dao.update_local_state(
old_pair, client.get_info(doc_pair.local_path)
)
self._protected_files[old_pair.remote_ref] = True
doc_pair.local_state = "moved"
# Check digest also
digest = child_info.get_digest()
if doc_pair.local_digest != digest:
doc_pair.local_digest = digest
dao.update_local_state(doc_pair, child_info)
self._protected_files[doc_pair.remote_ref] = True
if child_info.folderish:
to_scan_new.append(child_info)
except ThreadInterrupt:
raise
except Exception:
log.exception(
f"Error during recursive scan of {child_info.path!r}, "
"ignoring until next full scan"
)
continue
else:
child_pair = children.pop(child_name)
try:
last_mtime = child_info.last_modification_time.strftime(
"%Y-%m-%d %H:%M:%S"
)
if (
child_pair.processor == 0
and child_pair.last_local_updated is not None
and last_mtime != child_pair.last_local_updated.split(".")[0]
):
log.debug(f"Update file {child_info.path!r}")
remote_ref = client.get_remote_id(child_pair.local_path)
if remote_ref and not child_pair.remote_ref:
log.info(
"Possible race condition between remote and local "
f"scan, let's refresh pair: {child_pair!r}"
)
refreshed = dao.get_state_from_id(child_pair.id)
if refreshed:
child_pair = refreshed
if not child_pair.remote_ref:
log.info(
"Pair not yet handled by remote scan "
"(remote_ref is None) but existing remote_id "
f"xattr, let's set it to None: {child_pair!r}"
)
client.remove_remote_id(child_pair.local_path)
remote_ref = ""
if remote_ref != child_pair.remote_ref:
# Load correct doc_pair | Put the others one back
# to children
log.warning(
"Detected file substitution: "
f"{child_pair.local_path!r} "
f"({remote_ref}/{child_pair.remote_ref})"
)
if not remote_ref:
if not child_info.folderish:
# Alternative stream or xattr can have
# been removed by external software or user
digest = child_info.get_digest()
if child_pair.local_digest != digest:
child_pair.local_digest = digest
child_pair.local_state = "modified"
"""
NXDRIVE-668: Here we might be in the case
of a new folder/file with the same name
as the old name of a renamed folder/file,
typically:
- initial state: subfolder01
- rename subfolder01 to subfolder02
- create subfolder01
=> substitution will be detected when scanning
subfolder01, so we need to set the remote ID
and update the local state to avoid performing
a wrong locally_created operation leading to
an IntegrityError. This is true for folders
and files.
"""
client.set_remote_id(
child_pair.local_path, child_pair.remote_ref
)
dao.update_local_state(child_pair, child_info)
if child_info.folderish:
to_scan.append(child_info)
continue
old_pair = dao.get_normal_state_from_remote(remote_ref)
if old_pair is None:
dao.insert_local_state(child_info, info.path)
else:
old_pair.local_state = "moved"
# Check digest also
digest = child_info.get_digest()
if old_pair.local_digest != digest:
old_pair.local_digest = digest
dao.update_local_state(old_pair, child_info)
self._protected_files[old_pair.remote_ref] = True
self._delete_files[child_pair.remote_ref] = child_pair
if not child_info.folderish:
digest = child_info.get_digest()
if child_pair.local_digest != digest:
child_pair.local_digest = digest
child_pair.local_state = "modified"
self._metrics["update_files"] += 1
dao.update_local_state(child_pair, child_info)
if child_info.folderish:
to_scan.append(child_info)
except Exception as e:
log.exception(f"Error with pair {child_pair!r}, increasing error")
self.increase_error(child_pair, "SCAN RECURSIVE", exception=e)
continue
for deleted in children.values():
if (
deleted.pair_state == "remotely_created"
or deleted.remote_state == "created"
):
continue
log.info(f"Found deleted file {deleted.local_path!r}")
# May need to count the children to be ok
self._metrics["delete_files"] += 1
if not deleted.remote_ref:
dao.remove_state(deleted)
else:
self._delete_files[deleted.remote_ref] = deleted
self.remove_void_transfers(deleted)
for child_info in to_scan_new:
self._scan_recursive(child_info)
if not recursive:
return
for child_info in to_scan:
self._scan_recursive(child_info)
@tooltip("Setup watchdog")
def _setup_watchdog(self) -> None:
base: Path = self.local.base_folder
log.info(f"Watching FS modification on {base!r}")
# Filter out all ignored suffixes. It will handle custom ones too.
ignore_patterns = [f"*{suffix}" for suffix in Options.ignored_suffixes]
# The root local folder watcher
self._root_observer = Observer()
self._root_event_handler = DriveFSRootEventHandler(
self, base.name, ignore_patterns=ignore_patterns
)
self._root_observer.schedule(self._root_event_handler, str(base.parent))
# The contents of the root local folder
self._observer = Observer()
self._event_handler = DriveFSEventHandler(self, ignore_patterns=ignore_patterns)
self._observer.schedule(self._event_handler, str(base), recursive=True)
if Options.synchronization_enabled:
self._root_observer.start()
self._observer.start()
def _stop_watchdog(self) -> None:
if not Options.synchronization_enabled:
return
if self._observer:
log.info("Stopping the FS Observer thread")
try:
self._observer.stop()
self._observer.join()
except Exception:
log.warning("Cannot stop the FS observer")
finally:
del self._observer
else:
log.info("No existing FS observer reference")
if self._root_observer:
log.info("Stopping the FS root Observer thread")
try:
self._root_observer.stop()
self._root_observer.join()
except Exception:
log.warning("Cannot stop the FS root observer")
finally:
del self._root_observer
else:
log.info("No existing FS root observer reference")
def _handle_watchdog_delete(self, doc_pair: DocPair) -> None:
self.remove_void_transfers(doc_pair)
# Ask for deletion confirmation if needed
abspath = self.local.abspath(doc_pair.local_path)
if not abspath.parent.exists():
log.debug(f"Deleted event on inexistent file: {abspath!r}")
return
log.debug(f"Deleting file: {abspath!r}")
if self.engine.manager.dao.get_bool("show_deletion_prompt", default=True):
self.docDeleted.emit(doc_pair.local_path)
else:
self.engine.delete_doc(doc_pair.local_path)
def _handle_delete_on_known_pair(self, doc_pair: DocPair) -> None:
"""Handle watchdog deleted event on a known doc pair."""
if WINDOWS:
# Delay on Windows the delete event
log.info(f"Add pair to delete events: {doc_pair!r}")
with self.lock:
self._delete_events[doc_pair.remote_ref] = (
current_milli_time(),
doc_pair,
)
return
# In case of case sensitive can be an issue
if self.local.exists(doc_pair.local_path):
remote_id = self.local.get_remote_id(doc_pair.local_path)
if not remote_id or remote_id == doc_pair.remote_ref:
# This happens on update, don't do anything
return
self._handle_watchdog_delete(doc_pair)
def _handle_move_on_known_pair(
self, doc_pair: DocPair, evt: FileSystemEvent, rel_path: Path
) -> None:
"""Handle a watchdog move event on a known doc pair."""
# Ignore move to Office tmp file
dest_filename = basename(evt.dest_path)
ignore, _ = is_generated_tmp_file(dest_filename)
if ignore:
log.info(f"Ignoring file: {evt.dest_path!r}")
return
dao, client = self.dao, self.local
src_path = normalize(evt.dest_path)
rel_path = client.get_path(src_path)
pair = dao.get_state_from_local(rel_path)
remote_ref = client.get_remote_id(rel_path)
if pair and pair.remote_ref == remote_ref:
local_info = client.try_get_info(rel_path)
if local_info:
digest = local_info.get_digest()
# Drop event if digest hasn't changed, can be the case
# if only file permissions have been updated
if not doc_pair.folderish and pair.local_digest == digest:
log.debug(
f"Dropping watchdog event [{evt.event_type}] as digest "
f"has not changed for {rel_path!r}"
)
# If pair are the same don't drop it. It can happen
# in case of server rename on a document.
if doc_pair.id != pair.id:
dao.remove_state(doc_pair)
return
pair.local_digest = digest
pair.local_state = "modified"
dao.update_local_state(pair, local_info)
dao.remove_state(doc_pair)
log.info(
f"Substitution file: remove pair({doc_pair!r}) "
f"mark({pair!r}) as modified"
)
return
local_info = client.try_get_info(rel_path)
if not local_info:
return
if is_text_edit_tmp_file(local_info.name):
log.info(
f"Ignoring move to TextEdit tmp file {local_info.name!r} "
f"for {doc_pair!r}"
)
return
old_local_path = None
rel_parent_path = client.get_path(src_path.parent)
# Ignore inner movement
versioned = False
remote_parent_ref = client.get_remote_id(rel_parent_path)
if (
doc_pair.remote_name == local_info.name
and doc_pair.remote_parent_ref == remote_parent_ref
and rel_parent_path == doc_pair.local_path.parent
):
log.info(
"The pair was moved but it has been canceled manually, "
f"setting state to synchronized: {doc_pair!r}"
)
doc_pair.local_state = "synchronized"
else:
log.info(f"Detect move for {local_info.name!r} ({doc_pair!r})")
if doc_pair.local_state != "created":
doc_pair.local_state = "moved"
old_local_path = doc_pair.local_path
versioned = True
self.remove_void_transfers(doc_pair)
dao.update_local_state(doc_pair, local_info, versioned=versioned)
# Reflect local path changes of all impacted children in the database
if doc_pair.folderish:
if LINUX:
# This does not make it on GNU/Linux, and it would break
# test_move_and_copy_paste_folder_original_location_from_child_stopped().
# The call to dao.replace_local_paths() is revelant on macOS and Windows only.
# See NXDRIVE-1690 for more information.
return
dao.replace_local_paths(doc_pair.local_path, local_info.path)
if (
WINDOWS
and old_local_path is not None
and self._windows_folder_scan_delay > 0
and old_local_path in self._folder_scan_events
):
with self.lock:
log.info(
"Update folders to scan queue: move "
f"from {old_local_path!r} to {rel_path!r}"
)
self._folder_scan_events.pop(old_local_path, None)
t = mktime(local_info.last_modification_time.timetuple())
self._folder_scan_events[rel_path] = t, doc_pair
def _handle_watchdog_event_on_known_pair(
self, doc_pair: DocPair, evt: FileSystemEvent, rel_path: Path
) -> None:
log.debug(f"Watchdog event {evt!r} on known pair {doc_pair!r}")
dao = self.dao
acquired_pair = None
try:
acquired_pair = dao.acquire_state(self.thread_id, doc_pair.id)
if acquired_pair:
if evt.event_type == "deleted":
self._handle_delete_on_known_pair(doc_pair)
else:
self._handle_watchdog_event_on_known_acquired_pair(
acquired_pair, evt, rel_path
)
else:
log.debug(f"Don't update as in process {doc_pair!r}")
except sqlite3.OperationalError:
log.debug(f"Don't update as cannot acquire {doc_pair!r}")
finally:
dao.release_state(self.thread_id)
# TODO: This piece of code is only useful on Windows when creating a file inside a read-only folder.
# TODO: Remove everything with NXDRIVE-1095.
if WINDOWS and acquired_pair:
refreshed_pair = dao.get_state_from_id(acquired_pair.id)
if refreshed_pair and refreshed_pair.pair_state not in (
"synchronized",
"unsynchronized",
):
log.debug(
"Re-queuing acquired, released and refreshed "
f"state {refreshed_pair!r}"
)
dao._queue_pair_state(
refreshed_pair.id,
refreshed_pair.folderish,
refreshed_pair.pair_state,
pair=refreshed_pair,
)
def _handle_watchdog_event_on_known_acquired_pair(
self, doc_pair: DocPair, evt: FileSystemEvent, rel_path: Path
) -> None:
client = self.local
dao = self.dao
local_info = client.try_get_info(rel_path)
if not local_info:
return
if evt.event_type == "created":
# NXDRIVE-471 case maybe
remote_ref = client.get_remote_id(rel_path)
if not remote_ref:
log.info(
"Created event on a known pair with no remote_ref, this should "
f"only happen in case of a quick move and copy-paste: {doc_pair!r}"
)
if local_info.get_digest() == doc_pair.local_digest:
return
log.info(
"Created event on a known pair with no remote_ref "
f"but with different digest: {doc_pair!r}"
)
else:
# NXDRIVE-509
log.info(
f"Created event on a known pair with a remote_ref: {doc_pair!r}"
)
# Unchanged folder
if doc_pair.folderish:
# Unchanged folder, only update last_local_updated
dao.update_local_modification_time(doc_pair, local_info)
return
# We can't allow this branch to be taken for big files
# because computing their digest will explode everything.
# This code is taken _a lot_ when copying big files, so it
# makes sens to bypass this check.
if (
local_info.size < Options.big_file * 1024 * 1024
and doc_pair.local_state == "synchronized"
):
digest = local_info.get_digest()
# Unchanged digest, can be the case if only the last
# modification time or file permissions have been updated
if doc_pair.local_digest == digest:
log.info(
f"Digest has not changed for {rel_path!r} (watchdog event "
f"[{evt.event_type}]), only update last_local_updated"
)
if not local_info.remote_ref and doc_pair.remote_ref:
client.set_remote_id(rel_path, doc_pair.remote_ref)
dao.update_local_modification_time(doc_pair, local_info)
return
doc_pair.local_digest = digest
doc_pair.local_state = "modified"
if evt.event_type == "modified":
# Handle files that take some time to be fully copied
ongoing_copy = False
if local_info.size != doc_pair.size:
# Check the pair state as:
# - a synced document can be modified and we need to handle it
# - a conflicted file can be manually resolved using the local version and we need to handle it too
if doc_pair.pair_state not in ("synchronized", "locally_resolved"):
log.debug("Size has changed (copy must still be running)")
doc_pair.local_digest = UNACCESSIBLE_HASH
ongoing_copy = True
elif doc_pair.local_digest == UNACCESSIBLE_HASH:
log.debug("Unaccessible hash (copy must still be running)")
ongoing_copy = True
if ongoing_copy:
if not local_info.remote_ref and doc_pair.remote_ref:
client.set_remote_id(rel_path, doc_pair.remote_ref)
local_info.remote_ref = doc_pair.remote_ref
self.remove_void_transfers(doc_pair)
return
if doc_pair.remote_ref and doc_pair.remote_ref != local_info.remote_ref:
original_pair = dao.get_normal_state_from_remote(local_info.remote_ref)
original_info = None
if original_pair:
original_info = client.try_get_info(original_pair.local_path)
if (
MAC
and original_info
and original_info.remote_ref == local_info.remote_ref
):
log.info(
"macOS has postponed overwriting of xattr, "
f"need to reset remote_ref for {doc_pair!r}"
)
# We are in a copy/paste situation with OS overriding
# the xattribute
client.set_remote_id(doc_pair.local_path, doc_pair.remote_ref)
# This happens on overwrite through Windows Explorer
if not original_info:
client.set_remote_id(doc_pair.local_path, doc_pair.remote_ref)
self.remove_void_transfers(doc_pair)
# Update state
dao.update_local_state(doc_pair, local_info)
def handle_watchdog_root_event(self, evt: FileSystemEvent) -> None:
if evt.event_type == "deleted":
log.warning("Root has been deleted")
self.rootDeleted.emit()
elif evt.event_type == "moved":
dst = normalize(evt.dest_path)
log.warning(f"Root has been moved to {dst!r}")
self.rootMoved.emit(dst)
@tooltip("Handle watchdog event")
def handle_watchdog_event(self, evt: FileSystemEvent) -> None:
self._metrics["last_event"] = current_milli_time()
if not evt.src_path:
log.warning(f"Skipping event without a source path: {evt!r}")
return
if WINDOWS and ":" in splitext(evt.src_path)[1]:
# An event on the NTFS stream ("c:\folder\file.ext:nxdrive"), it should not happen.
# The cause is not yet known, need more data to understand how it happens.
log.warning(f"Skipping event on the NTFS stream: {evt!r}")
return
dao, client = self.dao, self.local
dst_path = getattr(evt, "dest_path", "")
evt_log = f"Handling watchdog event [{evt.event_type}] on {evt.src_path!r}"