Skip to content
This repository has been archived by the owner on Dec 1, 2017. It is now read-only.

Commit

Permalink
Fix #372 - Conflicted copies should not be generated if the client ha…
Browse files Browse the repository at this point in the history
…s a newer version of the file
  • Loading branch information
gideonthomas committed Oct 20, 2014
1 parent b10bd30 commit 61d03fd
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 107 deletions.
82 changes: 59 additions & 23 deletions client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var SyncManager = require('./sync-manager.js');
var SyncFileSystem = require('./sync-filesystem.js');
var Filer = require('../../lib/filer.js');
var EventEmitter = require('events').EventEmitter;
var resolvePath = require('../../lib/sync-path-resolver.js').resolveFromArray;

var MakeDrive = {};
module.exports = MakeDrive;
Expand Down Expand Up @@ -117,6 +118,9 @@ function createFS(options) {
var autoSync;
var pathCache;

// Path needed to be upstreamed during a downstream sync
var upstreamPath;

// State of the sync connection
sync.SYNC_DISCONNECTED = "SYNC DISCONNECTED";
sync.SYNC_CONNECTING = "SYNC CONNECTING";
Expand Down Expand Up @@ -151,6 +155,30 @@ function createFS(options) {
manager = null;
}

function requestSync(path) {
// If we're not connected (or are already syncing), ignore this request
if(sync.state === sync.SYNC_DISCONNECTED || sync.state === sync.SYNC_ERROR) {
sync.emit('error', new Error('Invalid state. Expected ' + sync.SYNC_CONNECTED + ', got ' + sync.state));
return;
}

// If there were no changes to the filesystem and
// no path was passed to sync, ignore this request
if(!fs.pathToSync && !path) {
return;
}

// If a path was passed sync using it
if(path) {
return manager.syncPath(path);
}

// Cache the path that needs to be synced for error recovery
pathCache = fs.pathToSync;
fs.pathToSync = null;
manager.syncPath(pathCache);
}

// Turn on auto-syncing if its not already on
sync.auto = function(interval) {
var syncInterval = interval|0 > 0 ? interval|0 : 15 * 1000;
Expand Down Expand Up @@ -180,7 +208,7 @@ function createFS(options) {
sync.onError = function(err) {
// Regress to the path that needed to be synced but failed
// (likely because of a sync LOCK)
fs.pathToSync = pathCache;
fs.pathToSync = upstreamPath || pathCache;
sync.state = sync.SYNC_ERROR;
sync.emit('error', err);
};
Expand All @@ -200,21 +228,9 @@ function createFS(options) {

// Request that a sync begin.
sync.request = function() {
// If we're not connected (or are already syncing), ignore this request
if(sync.state === sync.SYNC_DISCONNECTED || sync.state === sync.SYNC_ERROR) {
sync.emit('error', new Error('Invalid state. Expected ' + sync.SYNC_CONNECTED + ', got ' + sync.state));
return;
}

// If there were no changes to the filesystem, ignore this request
if(!fs.pathToSync) {
return;
}

// Cache the path that needs to be synced for error recovery
pathCache = fs.pathToSync;
fs.pathToSync = null;
manager.syncPath(pathCache);
// We do not expose a prototype that allows
// the path to be specified
requestSync();
};

// Try to connect to the server.
Expand All @@ -234,34 +250,44 @@ function createFS(options) {
// Upgrade connection state to `connecting`
sync.state = sync.SYNC_CONNECTING;

function downstreamSyncCompleted() {
function downstreamSyncCompleted(paths, needUpstream) {
// Re-wire message handler functions for regular syncing
// now that initial downstream sync is completed.
sync.onSyncing = function() {
sync.state = sync.SYNC_SYNCING;
sync.emit('syncing');
};

sync.onCompleted = function(paths) {
sync.onCompleted = function(paths, needUpstream) {
// If changes happened to the files that needed to be synced
// during the sync itself, they will be overwritten
// https://github.com/mozilla/makedrive/issues/129 and
// https://github.com/mozilla/makedrive/issues/3
// https://github.com/mozilla/makedrive/issues/129

function complete() {
sync.state = sync.SYNC_CONNECTED;
sync.emit('completed');
}

if(!paths) {
if(!paths && !needUpstream) {
return complete();
}

// Changes in the client are newer (determined during
// the sync) and need to be upstreamed
if(needUpstream) {
upstreamPath = resolvePath(needUpstream);
complete();
return requestSync(upstreamPath);
}

// If changes happened during a downstream sync
// Change the path that needs to be synced
manager.resetUnsynced(paths, function(err) {
if(err) {
return sync.onError(err);
}

upstreamPath = null;
complete();
});
};
Expand All @@ -285,6 +311,16 @@ function createFS(options) {
}

sync.emit('connected');

// If the downstream was completed and some
// versions of files were not synced as they were
// newer on the client, upstream them
if(needUpstream) {
upstreamPath = resolvePath(needUpstream);
requestSync(upstreamPath);
} else {
upstreamPath = null;
}
}

function connect(token) {
Expand All @@ -302,9 +338,9 @@ function createFS(options) {
sync.onSyncing = function() {
// do nothing, wait for onCompleted()
};
sync.onCompleted = function() {
sync.onCompleted = function(paths, needUpstream) {
// Downstream sync is done, finish connect() setup
downstreamSyncCompleted();
downstreamSyncCompleted(paths, needUpstream);
};
});
}
Expand Down
54 changes: 50 additions & 4 deletions client/src/message-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var deserializeDiff = require('../../lib/diff').deserialize;
var states = require('./sync-states');
var steps = require('./sync-steps');
var dirname = require('../../lib/filer').Path.dirname;
var async = require('../../lib/async-lite');
var fsUtils = require('../../lib/fs-utils');

function onError(syncManager, err) {
syncManager.session.step = steps.FAILED;
Expand Down Expand Up @@ -124,9 +126,49 @@ function handleResponse(syncManager, data) {
}

function handlePatchAckResponse() {
var syncedPaths = data.content.syncedPaths;
session.state = states.READY;
session.step = steps.SYNCED;
sync.onCompleted(data.content.syncedPaths);

function stampChecksum(path, callback) {
fs.stat(path, function(err, stats) {
if(err) {
if(err.code !== 'ENOENT') {
return callback(err);
}

return callback();
}

if(!stats.isFile()) {
return callback();
}

rsyncUtils.getChecksum(fs, path, function(err, checksum) {
if(err) {
return callback(err);
}

fsUtils.setChecksum(fs, path, checksum, function(err) {
if(err) {
return callback(err);
}

callback();
});
});
});
}

// Store the checksums of the upstream synced files
// as xattrs on the file
async.eachSeries(syncedPaths, stampChecksum, function(err) {
if(err) {
return onError(syncManager, err);
}

sync.onCompleted(data.content.syncedPaths);
});
}

function handlePatchResponse() {
Expand All @@ -150,9 +192,11 @@ function handleResponse(syncManager, data) {
return onError(syncManager, err);
}

var size = rsyncOptions.size || 5;
if(paths.needsUpstream.length) {
session.needsUpstream = paths.needsUpstream;
}

rsyncUtils.generateChecksums(fs, paths.synced, function(err, checksums) {
rsyncUtils.generateChecksums(fs, paths.synced, true, function(err, checksums) {
if(err) {
var message = SyncMessage.response.reset;
syncManager.send(message.stringify());
Expand All @@ -169,7 +213,9 @@ function handleResponse(syncManager, data) {
function handleVerificationResponse() {
session.srcList = null;
session.step = steps.SYNCED;
sync.onCompleted();
var needsUpstream = session.needsUpstream;
delete session.needsUpstream;
sync.onCompleted(null, needsUpstream);
}

function handleUpstreamResetResponse() {
Expand Down
29 changes: 29 additions & 0 deletions lib/rsync/diff.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ module.exports = function diff(fs, path, checksumList, options, callback) {
this.modified = modifiedTime;
}

// Compute the checksum for the file/link and
// append it to the diffNode.
function appendChecksum(diffNode, diffPath, callback) {
rsyncUtils.getChecksum(fs, diffPath, function(err, checksums) {
if(err) {
return callback(err);
}

diffNode.checksum = checksums;
diffList.push(diffNode);

callback(null, diffList);
});
}

function diffsForLink(checksumNode, callback) {
var checksumNodePath = checksumNode.path;
var diffNode = new DiffNode(checksumNodePath, checksumNode.type, checksumNode.modified);
Expand Down Expand Up @@ -52,6 +67,13 @@ module.exports = function diff(fs, path, checksumList, options, callback) {
}

diffNode.diffs = rsyncUtils.rollData(data, checksumNode.checksums, options.size);

// If versions are enabled, add the checksum
// field to the diffNode for version comparison
if(options.versions) {
return appendChecksum(diffNode, linkContents, callback);
}

diffList.push(diffNode);

callback(null, diffList);
Expand All @@ -77,6 +99,13 @@ module.exports = function diff(fs, path, checksumList, options, callback) {
}

diffNode.diffs = rsyncUtils.rollData(data, checksumNode.checksums, options.size);

// If versions are enabled, add the checksum
// field to the diffNode for version comparison
if(options.versions) {
return appendChecksum(diffNode, checksumNodePath, callback);
}

diffList.push(diffNode);

callback(null, diffList);
Expand Down
60 changes: 55 additions & 5 deletions lib/rsync/patch.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ module.exports = function patch(fs, path, diffList, options, callback) {

var paths = {
synced: [],
failed: []
failed: [],
needsUpstream: []
};
var pathsToSync = extractPathsFromDiffs(diffList);

Expand Down Expand Up @@ -152,7 +153,7 @@ module.exports = function patch(fs, path, diffList, options, callback) {
return handleError(err, callback);
}

// Do not generate a conflicted copy for an unsynced file
// Generate a conflicted copy only for an unsynced file
if(!unsynced) {
return callback();
}
Expand Down Expand Up @@ -180,6 +181,41 @@ module.exports = function patch(fs, path, diffList, options, callback) {
var diffLength = diffNode.diffs ? diffNode.diffs.length : 0;
var filePath = diffNode.path;

// Compare the version of the file when it was last
// synced with the version of the diffNode by comparing
// checksums and modified times.
// If they match, the file is not patched and needs to
// be upstreamed
function compareVersions(data) {
fs.stat(filePath, function(err, stats) {
if(err) {
return handleError(err, callback);
}

// If the file was modified before the
// diffNode's modified time, it is outdated
// and needs to be patched
if(stats.mtime <= diffNode.modified) {
return applyPatch(getPatchedData(data));
}

fsUtils.getChecksum(fs, filePath, function(err, checksum) {
if(err) {
return handleError(err, callback);
}

// If the last synced checksum matches the
// diffNode's checksum, ignore the patch
if(checksum === diffNode.checksum) {
paths.needsUpstream.push(filePath);
return callback(null, paths);
}

applyPatch(getPatchedData(data));
});
});
}

function updateModifiedTime() {
fs.utimes(filePath, diffNode.modified, diffNode.modified, function(err) {
if(err) {
Expand Down Expand Up @@ -251,11 +287,25 @@ module.exports = function patch(fs, path, diffList, options, callback) {
}

fs.readFile(filePath, function(err, data) {
if(err && err.code !== 'ENOENT') {
return handleError(err, callback);
if(err) {
if(err.code !== 'ENOENT') {
return handleError(err, callback);
}

// Patch a non-existent file i.e. create it
return applyPatch(getPatchedData(new Buffer(0)));
}

// If version comparing is not enabled, apply
// the patch directly
if(!options.versions) {
return applyPatch(getPatchedData(data));
}

applyPatch(getPatchedData(data || new Buffer(0)));
// Check the last synced checksum with
// the given checksum and don't patch if they
// match
compareVersions(data);
});
}

Expand Down
Loading

0 comments on commit 61d03fd

Please sign in to comment.