Skip to content

Commit

Permalink
[HER-1546] Springify(5): Update checkpointing to work smoothly with s…
Browse files Browse the repository at this point in the history
…pring-configured crawls

* BdbFrontier.java
    implement Checkpointable; save state via JSON or BDB; restore queues-of-queues on recovery
* WorkQueueFrontier.java
    (deactivateQueue) (retireQueue) protected
* BdbUriUniqFilter.java
    implement Checkpointable; save state via JSON or BDB; restore
* WorkQueue.java
    make peekItem transient -- should not be held through checkpoint
    make retired protected
* Frontier.java, AbstractFrontier.java
    remove/update obsolete code
  • Loading branch information
gojomo committed Nov 20, 2009
1 parent be17d5e commit 50ee78f
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,6 @@ public interface Frontier extends Lifecycle, MultiReporter {
*/
public long disregardedUriCount();

/**
* Total number of bytes contained in all URIs that have been processed.
*
* @return The total amounts of bytes in all processed URIs.
* @deprecated misnomer; consult StatisticsTracker instead
*/
public long totalBytesWritten();

/**
* Load URIs from a file, for scheduling and/or considered-included
* status (if from a recovery log).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.Collection;
Expand All @@ -55,7 +53,6 @@

import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.URIException;
import org.archive.checkpointing.CheckpointRecovery;
import org.archive.crawler.event.CrawlStateEvent;
import org.archive.crawler.framework.CrawlController;
import org.archive.crawler.framework.Frontier;
Expand All @@ -71,11 +68,9 @@
import org.archive.modules.net.ServerCache;
import org.archive.modules.seeds.SeedListener;
import org.archive.modules.seeds.SeedModule;
import org.archive.spring.ConfigPath;
import org.archive.spring.HasKeyedProperties;
import org.archive.spring.KeyedProperties;
import org.archive.util.ArchiveUtils;
import org.archive.util.IoUtils;
import org.archive.util.iterator.LineReadingIterator;
import org.archive.util.iterator.RegexLineIterator;
import org.json.JSONException;
Expand Down Expand Up @@ -127,14 +122,6 @@ public void setMaxRetries(int maxRetries) {
kp.put("maxRetries",maxRetries);
}

protected ConfigPath recoveryDir = new ConfigPath("recovery subdirectory","logs");
public ConfigPath getRecoveryDir() {
return recoveryDir;
}
public void setRecoveryDir(ConfigPath recoveryDir) {
this.recoveryDir = recoveryDir;
}

/**
* Recover log on or off attribute.
*/
Expand Down Expand Up @@ -275,11 +262,7 @@ public String getClassKey(CrawlURI curi) {
/**
* Used when bandwidth constraint are used.
*/
protected long totalProcessedBytes = 0;

protected long processedBytesAfterLastEmittedURI = 0;

protected int lastMaxBandwidthKB = 0;
protected AtomicLong totalProcessedBytes = new AtomicLong(0);

/**
* Crawl replay logger.
Expand Down Expand Up @@ -829,11 +812,6 @@ public long disregardedUriCount() {
return disregardedUriCount.get();
}

/** @deprecated misnomer; use StatisticsTracker figures instead */
public long totalBytesWritten() {
return totalProcessedBytes;
}

/**
* When notified of a seed via the SeedListener interface,
* schedule it.
Expand Down Expand Up @@ -950,7 +928,7 @@ public long importRecoverFormat(File source, boolean applyScope,
DecideRule scope = (applyScope) ? getScope() : null;
FrontierJournal newJournal = getFrontierJournal();
Matcher m = Pattern.compile(acceptTags).matcher("");
BufferedReader br = IoUtils.getBufferedReader(source);
BufferedReader br = ArchiveUtils.getBufferedReader(source);
String read;
int lineCount = 0;
try {
Expand Down Expand Up @@ -1183,36 +1161,6 @@ public String singleLineReport() {
public void reportTo(PrintWriter writer) {
reportTo(null, writer);
}


private void writeObject(ObjectOutputStream out)
throws IOException {
out.defaultWriteObject();
boolean recoveryLogEnabled = getRecoveryLogEnabled();
out.writeBoolean(recoveryLogEnabled);
if (recoveryLogEnabled) {
out.writeUTF(loggerModule.getPath().getFile().getAbsolutePath());
}
}


private void readObject(ObjectInputStream inp)
throws IOException, ClassNotFoundException {
inp.defaultReadObject();
boolean recoveryLogEnabled = inp.readBoolean();
if (recoveryLogEnabled) {
String path = inp.readUTF();
if (inp instanceof CheckpointRecovery) {
CheckpointRecovery cr = (CheckpointRecovery)inp;
path = cr.translatePath(path);
new File(path).mkdirs();
}
initJournal(path);
}
targetState = State.PAUSE;
outbound = new ArrayBlockingQueue<CrawlURI>(outboundCapacity, true);
inbound = new ArrayBlockingQueue<InEvent>(inboundCapacity, true);
}

/**
* Arrange for the given InEvent to be done by the managerThread, via
Expand Down
116 changes: 80 additions & 36 deletions engine/src/main/java/org/archive/crawler/frontier/BdbFrontier.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
*/
package org.archive.crawler.frontier;

import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -38,14 +35,16 @@
import org.apache.commons.collections.Closure;
import org.archive.bdb.BdbModule;
import org.archive.checkpointing.Checkpointable;
import org.archive.checkpointing.RecoverAction;
import org.archive.crawler.framework.Checkpoint;
import org.archive.modules.CrawlURI;
import org.archive.queue.StoredQueue;
import org.archive.util.ArchiveUtils;
import org.archive.util.Supplier;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired;

import com.sleepycat.collections.StoredIterator;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseException;

Expand All @@ -56,7 +55,7 @@
* @author Gordon Mohr
*/
public class BdbFrontier extends WorkQueueFrontier
implements Serializable, Checkpointable {
implements Serializable, Checkpointable, BeanNameAware {
private static final long serialVersionUID = 1L;

private static final Logger logger =
Expand All @@ -83,6 +82,11 @@ public void setBdbModule(BdbModule bdb) {
this.bdb = bdb;
}

String beanName;
public void setBeanName(String name) {
this.beanName = name;
}

boolean dumpPendingAtClose = false;
public boolean getDumpPendingAtClose() {
return dumpPendingAtClose;
Expand Down Expand Up @@ -114,18 +118,16 @@ Queue<String> getRetiredQueues() {
* @return the created BdbMultipleWorkQueues
* @throws DatabaseException
*/
private BdbMultipleWorkQueues createMultipleWorkQueues(boolean recycle)
protected BdbMultipleWorkQueues createMultipleWorkQueues()
throws DatabaseException {
Database db;
if (recycle) {
db = bdb.getDatabase("pending");
} else {
BdbModule.BdbConfig dbConfig = new BdbModule.BdbConfig();
dbConfig.setAllowCreate(!recycle);
// Make database deferred write: URLs that are added then removed
// before a page-out is required need never cause disk IO.
db = bdb.openManagedDatabase("pending", dbConfig, recycle);
}
boolean recycle = (recoveryCheckpoint != null);

BdbModule.BdbConfig dbConfig = new BdbModule.BdbConfig();
dbConfig.setAllowCreate(!recycle);
// Make database deferred write: URLs that are added then removed
// before a page-out is required need never cause disk IO.
db = bdb.openManagedDatabase("pending", dbConfig, recycle);

return new BdbMultipleWorkQueues(db, bdb.getClassCatalog());
}
Expand Down Expand Up @@ -202,43 +204,85 @@ protected boolean workQueueDataOnDisk() {
return true;
}


/**
* Constructor.
*/
public BdbFrontier() {
super();
}

public void checkpoint(File checkpointDir, List<RecoverAction> actions)
throws IOException {
logger.fine("Started syncing already seen as part "
+ "of checkpoint. Can take some time.");
public void startCheckpoint(Checkpoint checkpointInProgress) {}

public void doCheckpoint(Checkpoint checkpointInProgress) {
// An explicit sync on the any deferred write dbs is needed to make the
// db recoverable. Sync'ing the environment doesn't work.
if (this.pendingUris != null) {
this.pendingUris.sync();
this.pendingUris.sync();
// object caches will be sync()d by BdbModule

JSONObject json = new JSONObject();
try {
json.put("queuedUriCount", queuedUriCount.get());
json.put("succeededFetchCount", succeededFetchCount.get());
json.put("failedFetchCount", failedFetchCount.get());
json.put("disregardedUriCount", disregardedUriCount.get());
json.put("totalProcessedBytes", totalProcessedBytes.get());
checkpointInProgress.saveJson(beanName, json);
} catch (JSONException e) {
// impossible
throw new RuntimeException(e);
}
logger.fine("Finished syncing already seen as part of checkpoint.");
}

public void finishCheckpoint(Checkpoint checkpointInProgress) {}

Checkpoint recoveryCheckpoint;
@Autowired(required=false)
public void setRecoveryCheckpoint(Checkpoint checkpoint) {
this.recoveryCheckpoint = checkpoint;
}

@Override
protected void initAllQueues() throws DatabaseException {
this.allQueues = bdb.getObjectCache("allqueues", false, WorkQueue.class);
if (logger.isLoggable(Level.FINE)) {
Iterator<String> i = this.allQueues.keySet().iterator();
boolean isRecovery = (recoveryCheckpoint != null);
this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class);
if(isRecovery) {
JSONObject json = recoveryCheckpoint.loadJson(beanName);
try {
for (; i.hasNext();) {
logger.fine((String) i.next());
queuedUriCount.set(json.getLong("queuedUriCount"));
succeededFetchCount.set(json.getLong("succeededFetchCount"));
failedFetchCount.set(json.getLong("failedFetchCount"));
disregardedUriCount.set(json.getLong("disregardedUriCount"));
totalProcessedBytes.set(json.getLong("totalProcessedBytes"));
} catch (JSONException e) {
throw new RuntimeException(e);
}
// restore WorkQueues to internal management queues
enqueueOrDo(new Recover());
}
}

/**
* Frontier managerThread action to restore the placement of
* all queues to either the 'retired' collection or one of the
* inactive tiers (from which they will become ready/active as
* necessary).
*/
public class Recover extends InEvent {
@Override
public void process() {
// restore WorkQueues to internal management queues
for (String key : allQueues.keySet()) {
WorkQueue q = allQueues.get(key);
q.getOnInactiveQueues().clear();
q.setSessionBalance(0);
if(q.isRetired()) {
getRetiredQueues().add(key);
} else {
deactivateQueue(q);
}
} finally {
StoredIterator.close(i);
}
}
}

@Override
protected void initOtherQueues(boolean recycle) throws DatabaseException {
protected void initOtherQueues() throws DatabaseException {
// small risk of OutOfMemoryError: if 'hold-queues' is false,
// readyClassQueues may grow in size without bound
readyClassQueues = new LinkedBlockingQueue<String>();
Expand All @@ -257,7 +301,7 @@ protected void initOtherQueues(boolean recycle) throws DatabaseException {
snoozedClassQueues = new DelayQueue<DelayedWorkQueue>();

// initialize master map in which other queues live
this.pendingUris = createMultipleWorkQueues(recycle);
this.pendingUris = createMultipleWorkQueues();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class WorkQueue implements Frontier.FrontierGroup,
private long totalBudget = 0;

/** The next item to be returned */
protected CrawlURI peekItem = null;
transient protected CrawlURI peekItem = null;

/** Last URI enqueued */
private String lastQueued;
Expand All @@ -109,7 +109,7 @@ public abstract class WorkQueue implements Frontier.FrontierGroup,
/** Substats for all CrawlURIs in this group */
protected FetchStats substats = new FetchStats();

private boolean retired;
protected boolean retired;

public WorkQueue(final String pClassKey) {
this.classKey = pClassKey;
Expand Down
Loading

0 comments on commit 50ee78f

Please sign in to comment.