Skip to content
This repository has been archived by the owner on Nov 13, 2017. It is now read-only.

Commit

Permalink
Add I2PSnark modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
oakes committed Feb 26, 2014
1 parent 677a2bb commit 84c82d0
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 35 deletions.
39 changes: 35 additions & 4 deletions common/java/apps/org/klomp/snark/I2PSnarkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -34,6 +35,8 @@

import org.klomp.snark.dht.DHT;
import org.klomp.snark.dht.KRPC;
import org.klomp.snark.dht.NodeInfo;
import org.klomp.snark.dht.CustomQueryHandler;

/**
* I2P specific helpers for I2PSnark
Expand Down Expand Up @@ -66,6 +69,10 @@ public class I2PSnarkUtil {
private boolean _areFilesPublic;
private List<String> _openTrackers;
private DHT _dht;
private InputStream _myPrivateKeyStream;
private NodeInfo _myNodeInfo;
private CustomQueryHandler _customQueryHandler;
private Runnable _dhtInitCallback;

private static final int EEPGET_CONNECT_TIMEOUT = 45*1000;
private static final int EEPGET_CONNECT_TIMEOUT_SHORT = 5*1000;
Expand Down Expand Up @@ -181,6 +188,19 @@ public void setStartupDelay(int minutes) {
_configured = true;
}

public void setDHTNode(InputStream privateKeyStream, NodeInfo nodeInfo) {
_myPrivateKeyStream = privateKeyStream;
_myNodeInfo = nodeInfo;
}

public void setDHTCustomQueryHandler(CustomQueryHandler handler) {
_customQueryHandler = handler;
}

public void setDHTInitCallback(Runnable callback) {
_dhtInitCallback = callback;
}

public String getI2CPHost() { return _i2cpHost; }
public int getI2CPPort() { return _i2cpPort; }
public Map<String, String> getI2CPOptions() { return _opts; }
Expand Down Expand Up @@ -255,11 +275,19 @@ synchronized public boolean connect() {
opts.setProperty("i2p.streaming.disableRejectLogging", "true");
if (opts.getProperty("i2p.streaming.answerPings") == null)
opts.setProperty("i2p.streaming.answerPings", "false");
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
if (_myPrivateKeyStream != null) {
_manager = I2PSocketManagerFactory.createManager(_myPrivateKeyStream, _i2cpHost, _i2cpPort, opts);
} else {
_manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts);
}
_connecting = false;
}
if (_shouldUseDHT && _manager != null && _dht == null)
_dht = new KRPC(_context, _baseName, _manager.getSession());
if (_shouldUseDHT && _manager != null && _dht == null) {
_dht = new KRPC(_context, _baseName, _manager.getSession(), _myNodeInfo, _customQueryHandler);
if (_dhtInitCallback != null) {
_dhtInitCallback.run();
}
}
return (_manager != null);
}

Expand Down Expand Up @@ -600,7 +628,10 @@ public boolean shouldUseOpenTrackers() {
public synchronized void setUseDHT(boolean yes) {
_shouldUseDHT = yes;
if (yes && _manager != null && _dht == null) {
_dht = new KRPC(_context, _baseName, _manager.getSession());
_dht = new KRPC(_context, _baseName, _manager.getSession(), _myNodeInfo, _customQueryHandler);
if (_dhtInitCallback != null) {
_dhtInitCallback.run();
}
} else if (!yes && _dht != null) {
_dht.stop();
_dht = null;
Expand Down
21 changes: 17 additions & 4 deletions common/java/apps/org/klomp/snark/PeerCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ class PeerCoordinator implements PeerListener
private final CoordinatorListener listener;
private final I2PSnarkUtil _util;
private final Random _random;

/** Maintain connections with peers even after the torrent has finished. */
private boolean persistent = false;

/**
* @param metainfo null if in magnet mode
Expand Down Expand Up @@ -377,7 +380,7 @@ public boolean needPeers()
public boolean needOutboundPeers() {
//return wantedBytes != 0 && needPeers();
// minus two to make it a little easier for new peers to get in on large swarms
return wantedBytes != 0 &&
return (wantedBytes != 0 || persistent) &&
!halted &&
peers.size() < getMaxConnections() - 2 &&
(storage == null || !storage.isChecking());
Expand Down Expand Up @@ -407,6 +410,14 @@ private int getMaxConnections() {
//return (max + 2) / 3;
}

public void setPersistent(boolean isPersistent) {
persistent = isPersistent;
}

public boolean getPersistent() {
return persistent;
}

public boolean halted() { return halted; }

public void halt()
Expand Down Expand Up @@ -688,7 +699,7 @@ public boolean gotBitField(Peer peer, BitField bitfield)
}
}
}
return rv;
return rv || (wantedBytes == 0 && persistent);
}

/**
Expand Down Expand Up @@ -1031,8 +1042,10 @@ public boolean gotPiece(Peer peer, PartialPiece pp)
}

if (done) {
for (Peer p : toDisconnect) {
p.disconnect(true);
if (!persistent) {
for (Peer p : toDisconnect) {
p.disconnect(true);
}
}

// put msg on the console if partial, since Storage won't do it
Expand Down
2 changes: 1 addition & 1 deletion common/java/apps/org/klomp/snark/PeerID.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* and the PeerID is not required.
* Equality is now determined solely by the dest hash.
*/
class PeerID implements Comparable<PeerID>
public class PeerID implements Comparable<PeerID>
{
private byte[] id;
private Destination address;
Expand Down
18 changes: 18 additions & 0 deletions common/java/apps/org/klomp/snark/Snark.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,10 @@ public Storage getStorage() {
return storage;
}

public String getDataDir() {
return rootDataDir;
}

/**
* @since 0.8.4
*/
Expand Down Expand Up @@ -1199,6 +1203,20 @@ public void storageCompleted(Storage storage)
completeListener.torrentComplete(this);
}

public void setPersistent(boolean isPersistent) {
if (coordinator != null) {
coordinator.setPersistent(isPersistent);
}
}

public boolean getPersistent() {
if (coordinator != null) {
return coordinator.getPersistent();
}

return false;
}

public void setWantedPieces(Storage storage)
{
coordinator.setWantedPieces();
Expand Down
41 changes: 32 additions & 9 deletions common/java/apps/org/klomp/snark/SnarkManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,17 @@ public SnarkManager(I2PAppContext ctx, String ctxPath, String ctxName) {
* for i2cp host/port or i2psnark.dir
*/
public void start() {
start(true);
}

public void start(boolean runDirMonitor) {
_running = true;
_peerCoordinatorSet = new PeerCoordinatorSet();
_connectionAcceptor = new ConnectionAcceptor(_util, _peerCoordinatorSet);
_monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true);
_monitor.start();
if (runDirMonitor) {
_monitor = new I2PAppThread(new DirMonitor(), "Snark DirMonitor", true);
_monitor.start();
}
// only if default instance
if ("i2psnark".equals(_contextName))
// delay until UpdateManager is there
Expand Down Expand Up @@ -900,6 +906,10 @@ public Snark getTorrentByInfoHash(byte[] infohash) {
* @throws RuntimeException via Snark.fatal()
*/
private void addTorrent(String filename, boolean dontAutoStart) {
addTorrent(filename, dontAutoStart, this, getDataDir().getPath());
}

private void addTorrent(String filename, boolean dontAutoStart, CompleteListener listener, String dataDir) {
if ((!dontAutoStart) && !_util.connected()) {
addMessage(_("Connecting to I2P"));
boolean ok = _util.connect();
Expand All @@ -916,7 +926,6 @@ private void addTorrent(String filename, boolean dontAutoStart) {
addMessage(_("Error: Could not add the torrent {0}", filename) + ": " + ioe);
return;
}
File dataDir = getDataDir();
Snark torrent = null;
synchronized (_snarks) {
torrent = _snarks.get(filename);
Expand Down Expand Up @@ -980,9 +989,9 @@ private void addTorrent(String filename, boolean dontAutoStart) {
} else {
// TODO load saved closest DHT nodes and pass to the Snark ?
// This may take a LONG time
torrent = new Snark(_util, filename, null, -1, null, null, this,
torrent = new Snark(_util, filename, null, -1, null, null, listener,
_peerCoordinatorSet, _connectionAcceptor,
false, dataDir.getPath());
false, dataDir);
loadSavedFilePriorities(torrent);
synchronized (_snarks) {
_snarks.put(filename, torrent);
Expand Down Expand Up @@ -1042,15 +1051,21 @@ public void addMagnet(String name, byte[] ih, String trackerURL, boolean updateS
* to save it across restarts, in case we don't get
* the metadata before shutdown?
* @param listener to intercept callbacks, should pass through to this
* @param dataDir directory to store the downloaded files
* @return the new Snark or null on failure
* @throws RuntimeException via Snark.fatal()
* @since 0.9.4
*/
public Snark addMagnet(String name, byte[] ih, String trackerURL, boolean updateStatus,
boolean autoStart, CompleteListener listener) {
return addMagnet(name, ih, trackerURL, updateStatus, shouldAutoStart(), this, getDataDir().getPath());
}

public Snark addMagnet(String name, byte[] ih, String trackerURL, boolean updateStatus,
boolean autoStart, CompleteListener listener, String dataDir) {
Snark torrent = new Snark(_util, name, ih, trackerURL, listener,
_peerCoordinatorSet, _connectionAcceptor,
false, getDataDir().getPath());
false, dataDir);

synchronized (_snarks) {
Snark snark = getTorrentByInfoHash(ih);
Expand Down Expand Up @@ -1133,6 +1148,10 @@ public void addDownloader(Snark torrent) {
* @since 0.8.4
*/
public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, boolean dontAutoStart) throws IOException {
addTorrent(metainfo, bitfield, filename, dontAutoStart, this, getDataDir().getPath());
}

public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, boolean dontAutoStart, CompleteListener listener, String dataDir) throws IOException {
// prevent interference by DirMonitor
synchronized (_snarks) {
Snark snark = getTorrentByInfoHash(metainfo.getInfoHash());
Expand All @@ -1145,7 +1164,7 @@ public void addTorrent(MetaInfo metainfo, BitField bitfield, String filename, bo
try {
locked_writeMetaInfo(metainfo, filename, areFilesPublic());
// hold the lock for a long time
addTorrent(filename, dontAutoStart);
addTorrent(filename, dontAutoStart, listener, dataDir);
} catch (IOException ioe) {
addMessage(_("Failed to copy torrent file to {0}", filename));
_log.error("Failed to write torrent file", ioe);
Expand Down Expand Up @@ -1193,7 +1212,7 @@ public void copyAndAddTorrent(File fromfile, String filename) throws IOException
private static void locked_writeMetaInfo(MetaInfo metainfo, String filename, boolean areFilesPublic) throws IOException {
File file = new File(filename);
if (file.exists())
throw new IOException("Cannot overwrite an existing .torrent file: " + file.getPath());
return;
OutputStream out = null;
try {
if (areFilesPublic)
Expand Down Expand Up @@ -1577,6 +1596,10 @@ public void updateStatus(Snark snark) {
* @since 0.8.4
*/
public String gotMetaInfo(Snark snark) {
return gotMetaInfo(snark, getDataDir().getPath());
}

public String gotMetaInfo(Snark snark, String dataDir) {
MetaInfo meta = snark.getMetaInfo();
Storage storage = snark.getStorage();
if (meta != null && storage != null) {
Expand All @@ -1591,7 +1614,7 @@ public String gotMetaInfo(Snark snark) {
String name = storage.getBaseName();
try {
// _snarks must use canonical
name = (new File(getDataDir(), storage.getBaseName() + ".torrent")).getCanonicalPath();
name = (new File(dataDir, storage.getBaseName() + ".torrent")).getCanonicalPath();
// put the announce URL in the file
String announce = snark.getTrackerURL();
if (announce != null)
Expand Down
2 changes: 1 addition & 1 deletion common/java/apps/org/klomp/snark/StorageListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Callback used when Storage changes.
*/
interface StorageListener
public interface StorageListener
{
/**
* Called when the storage creates a new file of a given length.
Expand Down
9 changes: 9 additions & 0 deletions common/java/apps/org/klomp/snark/TrackerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class TrackerClient implements Runnable {
private final static long MIN_TRACKER_ANNOUNCE_INTERVAL = 15*60*1000;
private final static long MIN_DHT_ANNOUNCE_INTERVAL = 10*60*1000;
public static final int PORT = 6881;
public static final boolean DHT_ONLY = true;

private final I2PSnarkUtil _util;
private final MetaInfo meta;
Expand Down Expand Up @@ -448,6 +449,10 @@ else if ((!runStarted) && _runCount < MAX_CONSEC_FAILS)
* @return max peers seen
*/
private int getPeersFromTrackers(List<TCTracker> trckrs) {
if (DHT_ONLY) {
return 0;
}

long left = coordinator.getLeft(); // -1 in magnet mode

// First time we got a complete download?
Expand Down Expand Up @@ -588,6 +593,10 @@ private int getPeersFromTrackers(List<TCTracker> trckrs) {
* @return max peers seen
*/
private int getPeersFromPEX() {
if (DHT_ONLY) {
return 0;
}

// Get peers from PEX
int rv = 0;
if (coordinator.needOutboundPeers() && (meta == null || !meta.isPrivate()) && !stop) {
Expand Down
14 changes: 14 additions & 0 deletions common/java/apps/org/klomp/snark/dht/CustomQueryHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.klomp.snark.dht;

import java.util.Map;

import org.klomp.snark.bencode.BEValue;

/**
* Callback used when an unrecognized DHT query arrives.
*/
public interface CustomQueryHandler
{
Map<String, Object> receiveQuery(String method, Map<String, BEValue> args);
void receiveResponse(Map<String, BEValue> args);
}
11 changes: 8 additions & 3 deletions common/java/apps/org/klomp/snark/dht/DHTNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ public NodeInfo putIfAbsent(NodeInfo nInfo) {
}

public NodeInfo remove(NID nid) {
_kad.remove(nid);
return _nodeMap.remove(nid);
NodeInfo ninfo = _nodeMap.get(nid);
if (ninfo != null && !ninfo.getPermanent()) {
_kad.remove(nid);
return _nodeMap.remove(nid);
}

return null;
}

public Collection<NodeInfo> values() {
Expand Down Expand Up @@ -155,7 +160,7 @@ public void timeReached() {
int peerCount = 0;
for (Iterator<NodeInfo> iter = DHTNodes.this.values().iterator(); iter.hasNext(); ) {
NodeInfo peer = iter.next();
if (peer.lastSeen() < now - _expireTime) {
if (peer.lastSeen() < now - _expireTime && !peer.getPermanent()) {
iter.remove();
_kad.remove(peer.getNID());
} else {
Expand Down
Loading

0 comments on commit 84c82d0

Please sign in to comment.