From 84c82d0563887e9bb1689b6b2e9159ae47f5c53f Mon Sep 17 00:00:00 2001 From: oakes Date: Sun, 27 Oct 2013 18:23:19 -0400 Subject: [PATCH] Add I2PSnark modifications --- .../apps/org/klomp/snark/I2PSnarkUtil.java | 39 +++++++++-- .../apps/org/klomp/snark/PeerCoordinator.java | 21 ++++-- common/java/apps/org/klomp/snark/PeerID.java | 2 +- common/java/apps/org/klomp/snark/Snark.java | 18 +++++ .../apps/org/klomp/snark/SnarkManager.java | 41 +++++++++--- .../apps/org/klomp/snark/StorageListener.java | 2 +- .../apps/org/klomp/snark/TrackerClient.java | 9 +++ .../klomp/snark/dht/CustomQueryHandler.java | 14 ++++ .../apps/org/klomp/snark/dht/DHTNodes.java | 11 +++- .../java/apps/org/klomp/snark/dht/KRPC.java | 66 +++++++++++++++---- .../apps/org/klomp/snark/dht/NodeInfo.java | 11 +++- 11 files changed, 199 insertions(+), 35 deletions(-) create mode 100644 common/java/apps/org/klomp/snark/dht/CustomQueryHandler.java diff --git a/common/java/apps/org/klomp/snark/I2PSnarkUtil.java b/common/java/apps/org/klomp/snark/I2PSnarkUtil.java index d103853..461bc75 100644 --- a/common/java/apps/org/klomp/snark/I2PSnarkUtil.java +++ b/common/java/apps/org/klomp/snark/I2PSnarkUtil.java @@ -2,6 +2,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -34,6 +35,8 @@ import org.klomp.snark.dht.DHT; import org.klomp.snark.dht.KRPC; +import org.klomp.snark.dht.NodeInfo; +import org.klomp.snark.dht.CustomQueryHandler; /** * I2P specific helpers for I2PSnark @@ -66,6 +69,10 @@ public class I2PSnarkUtil { private boolean _areFilesPublic; private List _openTrackers; private DHT _dht; + private InputStream _myPrivateKeyStream; + private NodeInfo _myNodeInfo; + private CustomQueryHandler _customQueryHandler; + private Runnable _dhtInitCallback; private static final int EEPGET_CONNECT_TIMEOUT = 45*1000; private static final int EEPGET_CONNECT_TIMEOUT_SHORT = 5*1000; @@ -181,6 +188,19 @@ public void setStartupDelay(int minutes) { _configured = true; } + public void setDHTNode(InputStream privateKeyStream, NodeInfo nodeInfo) { + _myPrivateKeyStream = privateKeyStream; + _myNodeInfo = nodeInfo; + } + + public void setDHTCustomQueryHandler(CustomQueryHandler handler) { + _customQueryHandler = handler; + } + + public void setDHTInitCallback(Runnable callback) { + _dhtInitCallback = callback; + } + public String getI2CPHost() { return _i2cpHost; } public int getI2CPPort() { return _i2cpPort; } public Map getI2CPOptions() { return _opts; } @@ -255,11 +275,19 @@ synchronized public boolean connect() { opts.setProperty("i2p.streaming.disableRejectLogging", "true"); if (opts.getProperty("i2p.streaming.answerPings") == null) opts.setProperty("i2p.streaming.answerPings", "false"); - _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); + if (_myPrivateKeyStream != null) { + _manager = I2PSocketManagerFactory.createManager(_myPrivateKeyStream, _i2cpHost, _i2cpPort, opts); + } else { + _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); + } _connecting = false; } - if (_shouldUseDHT && _manager != null && _dht == null) - _dht = new KRPC(_context, _baseName, _manager.getSession()); + if (_shouldUseDHT && _manager != null && _dht == null) { + _dht = new KRPC(_context, _baseName, _manager.getSession(), _myNodeInfo, _customQueryHandler); + if (_dhtInitCallback != null) { + _dhtInitCallback.run(); + } + } return (_manager != null); } @@ -600,7 +628,10 @@ public boolean shouldUseOpenTrackers() { public synchronized void setUseDHT(boolean yes) { _shouldUseDHT = yes; if (yes && _manager != null && _dht == null) { - _dht = new KRPC(_context, _baseName, _manager.getSession()); + _dht = new KRPC(_context, _baseName, _manager.getSession(), _myNodeInfo, _customQueryHandler); + if (_dhtInitCallback != null) { + _dhtInitCallback.run(); + } } else if (!yes && _dht != null) { _dht.stop(); _dht = null; diff --git a/common/java/apps/org/klomp/snark/PeerCoordinator.java b/common/java/apps/org/klomp/snark/PeerCoordinator.java index c436516..80c10b1 100644 --- a/common/java/apps/org/klomp/snark/PeerCoordinator.java +++ b/common/java/apps/org/klomp/snark/PeerCoordinator.java @@ -134,6 +134,9 @@ class PeerCoordinator implements PeerListener private final CoordinatorListener listener; private final I2PSnarkUtil _util; private final Random _random; + + /** Maintain connections with peers even after the torrent has finished. */ + private boolean persistent = false; /** * @param metainfo null if in magnet mode @@ -377,7 +380,7 @@ public boolean needPeers() public boolean needOutboundPeers() { //return wantedBytes != 0 && needPeers(); // minus two to make it a little easier for new peers to get in on large swarms - return wantedBytes != 0 && + return (wantedBytes != 0 || persistent) && !halted && peers.size() < getMaxConnections() - 2 && (storage == null || !storage.isChecking()); @@ -407,6 +410,14 @@ private int getMaxConnections() { //return (max + 2) / 3; } + public void setPersistent(boolean isPersistent) { + persistent = isPersistent; + } + + public boolean getPersistent() { + return persistent; + } + public boolean halted() { return halted; } public void halt() @@ -688,7 +699,7 @@ public boolean gotBitField(Peer peer, BitField bitfield) } } } - return rv; + return rv || (wantedBytes == 0 && persistent); } /** @@ -1031,8 +1042,10 @@ public boolean gotPiece(Peer peer, PartialPiece pp) } if (done) { - for (Peer p : toDisconnect) { - p.disconnect(true); + if (!persistent) { + for (Peer p : toDisconnect) { + p.disconnect(true); + } } // put msg on the console if partial, since Storage won't do it diff --git a/common/java/apps/org/klomp/snark/PeerID.java b/common/java/apps/org/klomp/snark/PeerID.java index 16dc922..d1bfc41 100644 --- a/common/java/apps/org/klomp/snark/PeerID.java +++ b/common/java/apps/org/klomp/snark/PeerID.java @@ -42,7 +42,7 @@ * and the PeerID is not required. * Equality is now determined solely by the dest hash. */ -class PeerID implements Comparable +public class PeerID implements Comparable { private byte[] id; private Destination address; diff --git a/common/java/apps/org/klomp/snark/Snark.java b/common/java/apps/org/klomp/snark/Snark.java index ba4dfc2..3358d0f 100644 --- a/common/java/apps/org/klomp/snark/Snark.java +++ b/common/java/apps/org/klomp/snark/Snark.java @@ -663,6 +663,10 @@ public Storage getStorage() { return storage; } + public String getDataDir() { + return rootDataDir; + } + /** * @since 0.8.4 */ @@ -1199,6 +1203,20 @@ public void storageCompleted(Storage storage) completeListener.torrentComplete(this); } + public void setPersistent(boolean isPersistent) { + if (coordinator != null) { + coordinator.setPersistent(isPersistent); + } + } + + public boolean getPersistent() { + if (coordinator != null) { + return coordinator.getPersistent(); + } + + return false; + } + public void setWantedPieces(Storage storage) { coordinator.setWantedPieces(); diff --git a/common/java/apps/org/klomp/snark/SnarkManager.java b/common/java/apps/org/klomp/snark/SnarkManager.java index 3fe2b2a..80ab89e 100644 --- a/common/java/apps/org/klomp/snark/SnarkManager.java +++ b/common/java/apps/org/klomp/snark/SnarkManager.java @@ -178,11 +178,17 @@ public SnarkManager(I2PAppContext ctx, String ctxPath, String ctxName) { * for i2cp host/port or i2psnark.dir */ public void start() { + start(true); + } + + public void start(boolean runDirMonitor) { _running = true; _peerCoordinatorSet = new PeerCoordinatorSet(); _connectionAcceptor = new ConnectionAcceptor(_util, _peerCoordinatorSet); - _monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true); - _monitor.start(); + if (runDirMonitor) { + _monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true); + _monitor.start(); + } // only if default instance if ("i2psnark".equals(_contextName)) // delay until UpdateManager is there @@ -900,6 +906,10 @@ public Snark getTorrentByInfoHash(byte[] infohash) { * @throws RuntimeException via Snark.fatal() */ private void addTorrent(String filename, boolean dontAutoStart) { + addTorrent(filename, dontAutoStart, this, getDataDir().getPath()); + } + + private void addTorrent(String filename, boolean dontAutoStart, CompleteListener listener, String dataDir) { if ((!dontAutoStart) && !_util.connected()) { addMessage(_("Connecting to I2P")); boolean ok = _util.connect(); @@ -916,7 +926,6 @@ private void addTorrent(String filename, boolean dontAutoStart) { addMessage(_("Error: Could not add the torrent {0}", filename) + ": " + ioe); return; } - File dataDir = getDataDir(); Snark torrent = null; synchronized (_snarks) { torrent = _snarks.get(filename); @@ -980,9 +989,9 @@ private void addTorrent(String filename, boolean dontAutoStart) { } else { // TODO load saved closest DHT nodes and pass to the Snark ? // This may take a LONG time - torrent = new Snark(_util, filename, null, -1, null, null, this, + torrent = new Snark(_util, filename, null, -1, null, null, listener, _peerCoordinatorSet, _connectionAcceptor, - false, dataDir.getPath()); + false, dataDir); loadSavedFilePriorities(torrent); synchronized (_snarks) { _snarks.put(filename, torrent); @@ -1042,15 +1051,21 @@ public void addMagnet(String name, byte[] ih, String trackerURL, boolean updateS * to save it across restarts, in case we don't get * the metadata before shutdown? * @param listener to intercept callbacks, should pass through to this + * @param dataDir directory to store the downloaded files * @return the new Snark or null on failure * @throws RuntimeException via Snark.fatal() * @since 0.9.4 */ public Snark addMagnet(String name, byte[] ih, String trackerURL, boolean updateStatus, boolean autoStart, CompleteListener listener) { + return addMagnet(name, ih, trackerURL, updateStatus, shouldAutoStart(), this, getDataDir().getPath()); + } + + public Snark addMagnet(String name, byte[] ih, String trackerURL, boolean updateStatus, + boolean autoStart, CompleteListener listener, String dataDir) { Snark torrent = new Snark(_util, name, ih, trackerURL, listener, _peerCoordinatorSet, _connectionAcceptor, - false, getDataDir().getPath()); + false, dataDir); synchronized (_snarks) { Snark snark = getTorrentByInfoHash(ih); @@ -1133,6 +1148,10 @@ public void addDownloader(Snark torrent) { * @since 0.8.4 */ public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, boolean dontAutoStart) throws IOException { + addTorrent(metainfo, bitfield, filename, dontAutoStart, this, getDataDir().getPath()); + } + + public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, boolean dontAutoStart, CompleteListener listener, String dataDir) throws IOException { // prevent interference by DirMonitor synchronized (_snarks) { Snark snark = getTorrentByInfoHash(metainfo.getInfoHash()); @@ -1145,7 +1164,7 @@ public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, bo try { locked_writeMetaInfo(metainfo, filename, areFilesPublic()); // hold the lock for a long time - addTorrent(filename, dontAutoStart); + addTorrent(filename, dontAutoStart, listener, dataDir); } catch (IOException ioe) { addMessage(_("Failed to copy torrent file to {0}", filename)); _log.error("Failed to write torrent file", ioe); @@ -1193,7 +1212,7 @@ public void copyAndAddTorrent(File fromfile, String filename) throws IOException private static void locked_writeMetaInfo(MetaInfo metainfo, String filename, boolean areFilesPublic) throws IOException { File file = new File(filename); if (file.exists()) - throw new IOException("Cannot overwrite an existing .torrent file: " + file.getPath()); + return; OutputStream out = null; try { if (areFilesPublic) @@ -1577,6 +1596,10 @@ public void updateStatus(Snark snark) { * @since 0.8.4 */ public String gotMetaInfo(Snark snark) { + return gotMetaInfo(snark, getDataDir().getPath()); + } + + public String gotMetaInfo(Snark snark, String dataDir) { MetaInfo meta = snark.getMetaInfo(); Storage storage = snark.getStorage(); if (meta != null && storage != null) { @@ -1591,7 +1614,7 @@ public String gotMetaInfo(Snark snark) { String name = storage.getBaseName(); try { // _snarks must use canonical - name = (new File(getDataDir(), storage.getBaseName() + ".torrent")).getCanonicalPath(); + name = (new File(dataDir, storage.getBaseName() + ".torrent")).getCanonicalPath(); // put the announce URL in the file String announce = snark.getTrackerURL(); if (announce != null) diff --git a/common/java/apps/org/klomp/snark/StorageListener.java b/common/java/apps/org/klomp/snark/StorageListener.java index d484907..d76a497 100644 --- a/common/java/apps/org/klomp/snark/StorageListener.java +++ b/common/java/apps/org/klomp/snark/StorageListener.java @@ -23,7 +23,7 @@ /** * Callback used when Storage changes. */ -interface StorageListener +public interface StorageListener { /** * Called when the storage creates a new file of a given length. diff --git a/common/java/apps/org/klomp/snark/TrackerClient.java b/common/java/apps/org/klomp/snark/TrackerClient.java index be854a4..c2f9862 100644 --- a/common/java/apps/org/klomp/snark/TrackerClient.java +++ b/common/java/apps/org/klomp/snark/TrackerClient.java @@ -84,6 +84,7 @@ public class TrackerClient implements Runnable { private final static long MIN_TRACKER_ANNOUNCE_INTERVAL = 15*60*1000; private final static long MIN_DHT_ANNOUNCE_INTERVAL = 10*60*1000; public static final int PORT = 6881; + public static final boolean DHT_ONLY = true; private final I2PSnarkUtil _util; private final MetaInfo meta; @@ -448,6 +449,10 @@ else if ((!runStarted) && _runCount < MAX_CONSEC_FAILS) * @return max peers seen */ private int getPeersFromTrackers(List trckrs) { + if (DHT_ONLY) { + return 0; + } + long left = coordinator.getLeft(); // -1 in magnet mode // First time we got a complete download? @@ -588,6 +593,10 @@ private int getPeersFromTrackers(List trckrs) { * @return max peers seen */ private int getPeersFromPEX() { + if (DHT_ONLY) { + return 0; + } + // Get peers from PEX int rv = 0; if (coordinator.needOutboundPeers() && (meta == null || !meta.isPrivate()) && !stop) { diff --git a/common/java/apps/org/klomp/snark/dht/CustomQueryHandler.java b/common/java/apps/org/klomp/snark/dht/CustomQueryHandler.java new file mode 100644 index 0000000..548e711 --- /dev/null +++ b/common/java/apps/org/klomp/snark/dht/CustomQueryHandler.java @@ -0,0 +1,14 @@ +package org.klomp.snark.dht; + +import java.util.Map; + +import org.klomp.snark.bencode.BEValue; + +/** + * Callback used when an unrecognized DHT query arrives. + */ +public interface CustomQueryHandler +{ + Map receiveQuery(String method, Map args); + void receiveResponse(Map args); +} diff --git a/common/java/apps/org/klomp/snark/dht/DHTNodes.java b/common/java/apps/org/klomp/snark/dht/DHTNodes.java index 8207090..c3d553c 100644 --- a/common/java/apps/org/klomp/snark/dht/DHTNodes.java +++ b/common/java/apps/org/klomp/snark/dht/DHTNodes.java @@ -96,8 +96,13 @@ public NodeInfo putIfAbsent(NodeInfo nInfo) { } public NodeInfo remove(NID nid) { - _kad.remove(nid); - return _nodeMap.remove(nid); + NodeInfo ninfo = _nodeMap.get(nid); + if (ninfo != null && !ninfo.getPermanent()) { + _kad.remove(nid); + return _nodeMap.remove(nid); + } + + return null; } public Collection values() { @@ -155,7 +160,7 @@ public void timeReached() { int peerCount = 0; for (Iterator iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) { NodeInfo peer = iter.next(); - if (peer.lastSeen() < now - _expireTime) { + if (peer.lastSeen() < now - _expireTime && !peer.getPermanent()) { iter.remove(); _kad.remove(peer.getNID()); } else { diff --git a/common/java/apps/org/klomp/snark/dht/KRPC.java b/common/java/apps/org/klomp/snark/dht/KRPC.java index 1ea2c99..d4782db 100644 --- a/common/java/apps/org/klomp/snark/dht/KRPC.java +++ b/common/java/apps/org/klomp/snark/dht/KRPC.java @@ -106,6 +106,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private final NID _myNID; /** 20 byte random id + 32 byte Hash + 2 byte port */ private final NodeInfo _myNodeInfo; + /** if not null, run this when receiving an unrecognized query */ + private CustomQueryHandler _customQueryHandler; /** unsigned dgrams */ private final int _rPort; /** signed dgrams */ @@ -160,10 +162,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT { * @param baseName generally "i2psnark" */ public KRPC(I2PAppContext ctx, String baseName, I2PSession session) { + this(ctx, baseName, session, null, null); + } + + public KRPC(I2PAppContext ctx, String baseName, I2PSession session, NodeInfo myNodeInfo, CustomQueryHandler handler) { _context = ctx; _session = session; _log = ctx.logManager().getLog(KRPC.class); _tracker = new DHTTracker(ctx); + _customQueryHandler = handler; _sentQueries = new ConcurrentHashMap(); _outgoingTokens = new ConcurrentHashMap(); @@ -173,17 +180,24 @@ public KRPC(I2PAppContext ctx, String baseName, I2PSession session) { // Construct my NodeInfo // Pick ports over a big range to marginally increase security // If we add a search DHT, adjust to stay out of each other's way - _qPort = TrackerClient.PORT + 10 + ctx.random().nextInt(65535 - 20 - TrackerClient.PORT); - _rPort = _qPort + 1; - if (SECURE_NID) { - _myNID = NodeInfo.generateNID(session.getMyDestination().calculateHash(), _qPort, _context.random()); - _myID = _myNID.getData(); + if (myNodeInfo == null) { + _qPort = TrackerClient.PORT + 10 + ctx.random().nextInt(65535 - 20 - TrackerClient.PORT); + if (SECURE_NID) { + _myNID = NodeInfo.generateNID(session.getMyDestination().calculateHash(), _qPort, _context.random()); + _myID = _myNID.getData(); + } else { + _myID = new byte[NID.HASH_LENGTH]; + ctx.random().nextBytes(_myID); + _myNID = new NID(_myID); + } + _myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort); } else { - _myID = new byte[NID.HASH_LENGTH]; - ctx.random().nextBytes(_myID); - _myNID = new NID(_myID); + _qPort = myNodeInfo.getPort(); + _myNID = myNodeInfo.getNID(); + _myID = _myNID.getData(); + _myNodeInfo = myNodeInfo; } - _myNodeInfo = new NodeInfo(_myNID, session.getMyDestination(), _qPort); + _rPort = _qPort + 1; _dhtFile = new File(ctx.getConfigDir(), baseName + DHT_FILE_SUFFIX); _backupDhtFile = baseName.equals("i2psnark") ? null : new File(ctx.getConfigDir(), "i2psnark" + DHT_FILE_SUFFIX); _knownNodes = new DHTNodes(ctx, _myNID); @@ -214,6 +228,23 @@ public int getRPort() { return _rPort; } + /** + * @return The NodeInfo object + */ + public NodeInfo getNodeInfo(Destination dest) { + if (dest == null) { + return _myNodeInfo; + } + + for (NodeInfo nInfo : _knownNodes.values()) { + if (dest.equals(nInfo.getDestination())) { + return nInfo; + } + } + + return null; + } + /** * Ping. We don't have a NID yet so the node is presumed * to be absent from our DHT. @@ -835,7 +866,7 @@ private boolean sendError(NodeInfo nInfo, MsgID msgID, int err, String msg) { * @param repliable true for all but announce * @return null on error */ - private ReplyWaiter sendQuery(NodeInfo nInfo, Map map, boolean repliable) { + public ReplyWaiter sendQuery(NodeInfo nInfo, Map map, boolean repliable) { if (nInfo.equals(_myNodeInfo)) throw new IllegalArgumentException("wtf don't send to ourselves"); if (_log.shouldLog(Log.DEBUG)) @@ -1122,8 +1153,17 @@ private void receiveQuery(MsgID msgID, Destination dest, int fromPort, String me byte[] token = args.get("token").getBytes(); receiveAnnouncePeer(msgID, ih, token); } else { - if (_log.shouldLog(Log.WARN)) + if (_customQueryHandler != null) { + Map resps = + _customQueryHandler.receiveQuery(method, args); + if (resps != null) { + Map map = new HashMap(); + map.put("r", resps); + sendResponse(nInfo, msgID, map); + } + } else if (_log.shouldLog(Log.WARN)) { _log.warn("Unknown query method rcvd: " + method); + } } } @@ -1162,7 +1202,7 @@ private NodeInfo heardFrom(NodeInfo nInfo) { * Package private for PersistDHT. * @return non-null nodeInfo from DB if present, otherwise the nInfo parameter is returned */ - NodeInfo heardAbout(NodeInfo nInfo) { + public NodeInfo heardAbout(NodeInfo nInfo) { // try to keep ourselves out of the DHT if (nInfo.equals(_myNodeInfo)) return _myNodeInfo; @@ -1327,6 +1367,8 @@ private void receiveResponse(ReplyWaiter waiter, Map response) List peers = values.getList(); List rlist = receivePeers(nInfo, peers); waiter.gotReply(REPLY_PEERS, rlist); + } else if (_customQueryHandler != null && response.size() > 1) { + _customQueryHandler.receiveResponse(response); } else { // a ping response or an announce peer response byte[] nid = response.get("id").getBytes(); diff --git a/common/java/apps/org/klomp/snark/dht/NodeInfo.java b/common/java/apps/org/klomp/snark/dht/NodeInfo.java index c6fce8f..0aa3782 100644 --- a/common/java/apps/org/klomp/snark/dht/NodeInfo.java +++ b/common/java/apps/org/klomp/snark/dht/NodeInfo.java @@ -24,12 +24,13 @@ * @author zzz */ -class NodeInfo extends SimpleDataStructure { +public class NodeInfo extends SimpleDataStructure { private final NID nID; private final Hash hash; private Destination dest; private final int port; + private boolean permanent = false; public static final int LENGTH = NID.HASH_LENGTH + Hash.HASH_LENGTH + 2; @@ -206,6 +207,14 @@ public void setDestination(Destination dest) throws IllegalArgumentException { this.dest = dest; } + public void setPermanent(boolean isPermanent) { + permanent = isPermanent; + } + + public boolean getPermanent() { + return permanent; + } + public int getPort() { return this.port; }