public abstract class WorkQueueFrontier extends AbstractFrontier implements Closeable, org.springframework.context.ApplicationContextAware
Frontier.FrontierGroup, Frontier.State
Modifier and Type | Field and Description |
---|---|
protected ObjectIdentityCache<WorkQueue> |
allQueues
All known queues.
|
protected org.springframework.context.support.AbstractApplicationContext |
appCtx |
protected com.sleepycat.collections.StoredSortedMap<Long,CrawlURI> |
futureUris
URIs scheduled to be re-enqueued at future date
|
protected int |
highestPrecedenceWaiting |
protected Set<WorkQueue> |
inProcessQueues
all per-class queues from whom a URI is outstanding
|
protected TopNSet |
largestQueues
remember keys of small number of largest queues for reporting
|
protected static int |
MAX_SNOOZED_IN_MEMORY |
protected int |
maxQueuesPerReportCategory
truncate reporting of queues at this large but not unbounded number
|
protected int |
precedenceFloor
precedence rank at or below which queues are not crawled
|
protected BlockingQueue<String> |
readyClassQueues
All per-class queues whose first item may be handed out.
|
protected DelayQueue<org.archive.crawler.frontier.DelayedWorkQueue> |
snoozedClassQueues
All per-class queues held in snoozed state, sorted by wake time.
|
protected com.sleepycat.collections.StoredSortedMap<Long,org.archive.crawler.frontier.DelayedWorkQueue> |
snoozedOverflow |
protected AtomicInteger |
snoozedOverflowCount |
protected long |
snoozeLongMs
When a snooze target for a queue is longer than this amount, the queue
will be "long snoozed" instead of "short snoozed".
|
protected UriUniqFilter |
uriUniqFilter
The UriUniqFilter to use, tracking those UURIs which are
already in-process (or processed), and thus should not be
rescheduled.
|
controller, dispositionInProgressLock, dispositionPending, disregardedUriCount, failedFetchCount, futureUriCount, kp, lastReachedState, loggerModule, managerThread, nextOrdinal, outboundLock, preparer, queuedUriCount, recover, scope, seeds, serverCache, sheetOverlaysManager, succeededFetchCount, targetState, totalProcessedBytes
Constructor and Description |
---|
WorkQueueFrontier()
Constructor.
|
Modifier and Type | Method and Description |
---|---|
protected boolean |
activateInactiveQueue()
Activate an inactive queue, if any are available.
|
void |
allNonemptyReportTo(PrintWriter writer)
Compact report of all nonempty queues (one queue per line)
|
void |
allQueuesReportTo(PrintWriter writer)
Compact report of all nonempty queues (one queue per line)
|
protected void |
appendQueueReports(PrintWriter w,
String label,
Iterator<?> iterator,
int total,
int max)
Append queue report to general Frontier report.
|
long |
averageDepth()
Average depth of the last URI in all eligible queues.
|
protected void |
checkFutures()
Check for any future-scheduled URIs now eligible for reenqueuing
|
void |
close()
Release resources only needed when running
|
float |
congestionRatio()
Ratio of number of threads that would theoretically allow
maximum crawl progress (if each was as productive as current
threads), to current number of threads.
|
void |
considerIncluded(CrawlURI curi)
Notify Frontier that it should consider the given UURI as if
already scheduled.
|
protected abstract Queue<String> |
createInactiveQueueForPrecedence(int precedence)
Create an inactiveQueue to hold queue names at the given precedence
|
protected void |
deactivateQueue(WorkQueue wq)
Put the given queue on the inactiveQueues queue
|
long |
deepestUri()
Ordinal position of the 'deepest' URI eligible
for crawling.
|
void |
deleted(CrawlURI curi)
Force logging, etc.
|
long |
deleteURIs(String queueRegex,
String uriRegex)
Delete any URI that matches the given regular expression from the list
of discovered and pending URIs.
|
void |
destroy() |
long |
discoveredUriCount()
(non-Javadoc)
|
protected CrawlURI |
findEligibleURI()
Return the next CrawlURI eligible to be processed (and presumably
visited/fetched) by a a worker thread.
|
void |
forceWakeQueues()
Utility method for advanced users/experimentation: force wake all snoozed
queues -- for example to kick a crawl where connectivity problems have
put all queues in slow-retry-snoozes back to busy-ness.
|
protected void |
forget(CrawlURI curi)
Forget the given CrawlURI.
|
int |
getBalanceReplenishAmount() |
int |
getErrorPenaltyAmount() |
protected abstract SortedMap<Integer,Queue<String>> |
getInactiveQueuesByPrecedence()
Return a sorted map of all queues of WorkQueue keys, keyed by precedence
|
protected Queue<String> |
getInactiveQueuesForPrecedence(int precedence)
Get the queue of inactive uri-queue names at the given precedence.
|
protected int |
getInProcessCount()
The number of CrawlURIs 'in process' (passed to the outbound
queue and not yet finished by returning through the inbound
queue.)
|
int |
getLargestQueuesCount()
remember this many largest queues for reporting's sake; actual tracking
can be somewhat approximate when some queues shrink before others'
sizes are again noted, or if the size is adjusted mid-crawl.
|
protected long |
getMaxInWait()
Maximum amount of time to wait for an inbound update event before
giving up and rechecking on the ability to further fill the outbound
queue.
|
int |
getMaxQueuesPerReportCategory() |
int |
getPrecedenceFloor() |
protected abstract WorkQueue |
getQueueFor(String classKey)
Return the work queue for the given classKey, or null
if no such queue exists.
|
QueuePrecedencePolicy |
getQueuePrecedencePolicy() |
long |
getQueueTotalBudget() |
protected abstract Queue<String> |
getRetiredQueues()
Return queue of all retired queue names.
|
protected int |
getSnoozedCount() |
long |
getSnoozeLongMs() |
protected int |
getTotalEligibleInactiveQueues()
Total of all URIs in inactive queues at precedences above the floor
|
protected int |
getTotalInactiveQueues()
Total of all URIs in inactive queues at all precedences
|
protected int |
getTotalIneligibleInactiveQueues()
Total of all URIs in inactive queues at precedences at or below the floor
|
UriUniqFilter |
getUriUniqFilter() |
protected void |
handleQueue(WorkQueue wq,
boolean forceRetire,
long now,
long delay_ms)
Send an active queue to its next state, based on the supplied
parameters.
|
protected abstract void |
initAllQueues()
Initialize the allQueues field in an implementation-appropriate
way.
|
protected void |
initInternalQueues()
Initializes internal queues.
|
protected abstract void |
initOtherQueues()
Initialize all other internal queues in an implementation-appropriate
way.
|
boolean |
isEmpty()
Return whether frontier is exhausted: all crawlable URIs done (none
waiting or pending).
|
protected void |
processFinish(CrawlURI curi)
Note that the previously emitted CrawlURI has completed
its processing (for now).
|
protected void |
processScheduleAlways(CrawlURI curi)
Accept the given CrawlURI for scheduling, as it has
passed the alreadyIncluded filter.
|
protected void |
processScheduleIfUnique(CrawlURI curi)
Arrange for the given CrawlURI to be visited, if it is not
already scheduled/completed.
|
protected void |
readyQueue(WorkQueue wq)
Put the given queue on the readyClassQueues queue
|
void |
reconsiderRetiredQueues()
Accommodate any changes in retirement-determining settings (like
total-budget or force-retire changes/overlays.
|
protected void |
reenqueueQueue(WorkQueue wq)
Enqueue the given queue to either readyClassQueues or inactiveQueues,
as appropriate.
|
void |
reportTo(PrintWriter writer)
This method compiles a human readable report on the status of the frontier
at the time of the call.
|
protected void |
retireQueue(WorkQueue wq)
Put the given queue on the retiredQueues queue
|
void |
schedule(CrawlURI curi)
Arrange for the given CrawlURI to be visited, if it is not
already enqueued/completed.
|
protected void |
sendToQueue(CrawlURI curi)
Send a CrawlURI to the appropriate subqueue.
|
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setBalanceReplenishAmount(int replenish) |
void |
setErrorPenaltyAmount(int penalty) |
void |
setLargestQueuesCount(int count) |
void |
setMaxQueuesPerReportCategory(int max) |
void |
setPrecedenceFloor(int floor) |
void |
setQueuePrecedencePolicy(QueuePrecedencePolicy policy) |
void |
setQueueTotalBudget(long budget) |
void |
setSnoozeLongMs(long snooze) |
void |
setUriUniqFilter(UriUniqFilter uriUniqFilter) |
String |
shortReportLegend() |
void |
shortReportLineTo(PrintWriter w) |
Map<String,Object> |
shortReportMap() |
void |
start() |
void |
stop() |
protected void |
updateHighestWaiting(int startFrom)
Recalculate the value of thehighest-precedence queue waiting
among inactive queues.
|
protected void |
wakeQueues()
Wake any queues sitting in the snoozed queue whose time has come.
|
protected abstract boolean |
workQueueDataOnDisk()
Returns
true if the WorkQueue implementation of this
Frontier stores its workload on disk instead of relying
on serialization mechanisms. |
addedSeed, beginDisposition, concludedSeedBatch, crawlEnded, decrementQueuedCount, disregardedUriCount, doJournalAdded, doJournalDisregarded, doJournalEmitted, doJournalFinishedFailure, doJournalFinishedSuccess, doJournalReenqueued, doJournalRelocated, endDisposition, failedFetchCount, finalTasks, finished, finishedUriCount, futureUriCount, getClassKey, getCrawlController, getExtract404s, getExtractIndependently, getFrontierJournal, getFrontierPreparer, getKeyedProperties, getLoggerModule, getMaxOutlinks, getMaxRetries, getRecoveryLogEnabled, getRetryDelaySeconds, getScope, getSeeds, getServerCache, getSheetOverlaysManager, importRecoverFormat, importURIs, importURIsSimple, incrementDisregardedUriCount, incrementFailedFetchCount, incrementQueuedUriCount, incrementQueuedUriCount, incrementSucceededFetchCount, isDisregarded, isRunning, log, logNonfatalErrors, managementTasks, needsReenqueuing, next, nonseedLine, noteAboutToEmit, onApplicationEvent, overMaxRetries, pause, prepForFrontier, queuedUriCount, reachedState, receive, requestState, retryDelayFor, run, setCrawlController, setExtract404s, setExtractIndependently, setFrontierPreparer, setLoggerModule, setMaxOutlinks, setMaxRetries, setRecoveryLogEnabled, setRetryDelaySeconds, setScope, setSeeds, setServerCache, setSheetOverlaysManager, shortReportLine, startManagerThread, succeededFetchCount, tally, terminate, unpause
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getGroup, getURIsList
protected long snoozeLongMs
protected org.springframework.context.support.AbstractApplicationContext appCtx
protected int precedenceFloor
protected int maxQueuesPerReportCategory
protected ObjectIdentityCache<WorkQueue> allQueues
protected BlockingQueue<String> readyClassQueues
protected Set<WorkQueue> inProcessQueues
protected transient DelayQueue<org.archive.crawler.frontier.DelayedWorkQueue> snoozedClassQueues
protected com.sleepycat.collections.StoredSortedMap<Long,org.archive.crawler.frontier.DelayedWorkQueue> snoozedOverflow
protected AtomicInteger snoozedOverflowCount
protected static int MAX_SNOOZED_IN_MEMORY
protected com.sleepycat.collections.StoredSortedMap<Long,CrawlURI> futureUris
protected transient TopNSet largestQueues
protected int highestPrecedenceWaiting
protected UriUniqFilter uriUniqFilter
public long getSnoozeLongMs()
public void setSnoozeLongMs(long snooze)
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException
setApplicationContext
in interface org.springframework.context.ApplicationContextAware
org.springframework.beans.BeansException
public int getBalanceReplenishAmount()
public void setBalanceReplenishAmount(int replenish)
public int getErrorPenaltyAmount()
public void setErrorPenaltyAmount(int penalty)
public long getQueueTotalBudget()
public void setQueueTotalBudget(long budget)
public QueuePrecedencePolicy getQueuePrecedencePolicy()
public void setQueuePrecedencePolicy(QueuePrecedencePolicy policy)
public int getPrecedenceFloor()
public void setPrecedenceFloor(int floor)
public int getMaxQueuesPerReportCategory()
public void setMaxQueuesPerReportCategory(int max)
public int getLargestQueuesCount()
public void setLargestQueuesCount(int count)
public UriUniqFilter getUriUniqFilter()
public void setUriUniqFilter(UriUniqFilter uriUniqFilter)
public void start()
start
in interface org.springframework.context.Lifecycle
start
in class AbstractFrontier
protected void initInternalQueues() throws IOException, com.sleepycat.je.DatabaseException
QueueAssignmentPolicy.maximumNumberOfKeys()
. Otherwise invokes
initAllQueues()
to actually set up the queues.
Subclasses should invoke this method with recycle set to "true" in
a private readObject method, to restore queues after a checkpoint.recycle
- IOException
com.sleepycat.je.DatabaseException
protected abstract void initAllQueues() throws com.sleepycat.je.DatabaseException
com.sleepycat.je.DatabaseException
protected abstract void initOtherQueues() throws com.sleepycat.je.DatabaseException
com.sleepycat.je.DatabaseException
public void stop()
stop
in interface org.springframework.context.Lifecycle
stop
in class AbstractFrontier
public void destroy()
public void close()
close
in interface Closeable
close
in interface AutoCloseable
protected void processScheduleAlways(CrawlURI curi)
processScheduleAlways
in class AbstractFrontier
caUri
- CrawlURI.public void schedule(CrawlURI curi)
schedule
in interface Frontier
schedule
in class AbstractFrontier
curi
- The URI to schedule.Frontier.schedule(org.archive.modules.CrawlURI)
protected void processScheduleIfUnique(CrawlURI curi)
processScheduleIfUnique
in class AbstractFrontier
curi
- CrawlURI to scheduleFrontier.schedule(org.archive.modules.CrawlURI)
protected void sendToQueue(CrawlURI curi)
curi
- protected void readyQueue(WorkQueue wq)
wq
- protected void deactivateQueue(WorkQueue wq)
wq
- protected Queue<String> getInactiveQueuesForPrecedence(int precedence)
precedence
- protected abstract SortedMap<Integer,Queue<String>> getInactiveQueuesByPrecedence()
protected abstract Queue<String> createInactiveQueueForPrecedence(int precedence)
precedence
- protected void retireQueue(WorkQueue wq)
wq
- protected abstract Queue<String> getRetiredQueues()
public void reconsiderRetiredQueues()
protected abstract WorkQueue getQueueFor(String classKey)
classKey
- key to look forprotected CrawlURI findEligibleURI()
findEligibleURI
in class AbstractFrontier
Frontier.next()
protected void checkFutures()
protected boolean activateInactiveQueue()
protected void updateHighestWaiting(int startFrom)
startFrom
- start looking at this precedence valueprotected void reenqueueQueue(WorkQueue wq)
wq
- protected long getMaxInWait()
AbstractFrontier
getMaxInWait
in class AbstractFrontier
public void forceWakeQueues()
protected void wakeQueues()
protected void processFinish(CrawlURI curi)
processFinish
in class AbstractFrontier
curi
- CrawlURI to finishFrontier.finished(org.archive.modules.CrawlURI)
protected void handleQueue(WorkQueue wq, boolean forceRetire, long now, long delay_ms)
wq
- forceRetire
- now
- delay_ms
- protected void forget(CrawlURI curi)
curi
- The CrawlURI to forgetpublic long discoveredUriCount()
discoveredUriCount
in interface Frontier
Frontier.discoveredUriCount()
public long deleteURIs(String queueRegex, String uriRegex)
Frontier
Any encountered URI that has not been successfully crawled, terminally failed, disregarded or is currently being processed is considered to be a pending URI.
Warning: It is unsafe to make changes to the frontier while this method is executing. The crawler should be in a paused state before invoking it.
deleteURIs
in interface Frontier
match
- String to match.public Map<String,Object> shortReportMap()
shortReportMap
in interface org.archive.util.Reporter
public void shortReportLineTo(PrintWriter w)
shortReportLineTo
in interface org.archive.util.Reporter
w
- Where to write to.protected int getTotalInactiveQueues()
protected int getTotalEligibleInactiveQueues()
protected int getTotalIneligibleInactiveQueues()
public String shortReportLegend()
shortReportLegend
in interface org.archive.util.Reporter
public void reportTo(PrintWriter writer)
reportTo
in interface org.archive.util.Reporter
name
- Name of report.writer
- Where to write to.public void allNonemptyReportTo(PrintWriter writer)
writer
- public void allQueuesReportTo(PrintWriter writer)
writer
- protected void appendQueueReports(PrintWriter w, String label, Iterator<?> iterator, int total, int max)
w
- StringBuffer to append to.iterator
- An iterator overtotal
- max
- public void deleted(CrawlURI curi)
deleted
in interface Frontier
curi
- Deleted CrawlURI.Frontier.deleted(org.archive.modules.CrawlURI)
public void considerIncluded(CrawlURI curi)
Frontier
considerIncluded
in interface Frontier
protected abstract boolean workQueueDataOnDisk()
true
if the WorkQueue implementation of this
Frontier stores its workload on disk instead of relying
on serialization mechanisms.
TODO: rename! (this is a very misleading name) or kill (don't
see any implementations that return false)public long averageDepth()
Frontier
averageDepth
in interface Frontier
protected int getSnoozedCount()
public float congestionRatio()
Frontier
congestionRatio
in interface Frontier
public long deepestUri()
Frontier
deepestUri
in interface Frontier
public boolean isEmpty()
isEmpty
in interface Frontier
isEmpty
in class AbstractFrontier
Frontier.isEmpty()
protected int getInProcessCount()
AbstractFrontier
getInProcessCount
in class AbstractFrontier
Copyright © 2003-2014 Internet Archive. All Rights Reserved.