Skip to content

Commit

Permalink
Merge pull request #30 from graphfoundation/story-threadLogging-3.3
Browse files Browse the repository at this point in the history
3.3 - Thread Logging
  • Loading branch information
bradnussbaum authored Feb 14, 2020
2 parents 4733081 + 05fd5da commit 10e4f70
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/main/java/apoc/ApocKernelExtensionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public IndexUpdateTransactionEventHandler.LifeCycle getIndexUpdateLifeCycle() {
public void start() throws Throwable {
ApocConfiguration.initialize(db);
Pools.NEO4J_SCHEDULER = dependencies.scheduler();
ThreadPoolExecutorLogger.LOG = log.getUserLog( ThreadPoolExecutorLogger.class );
registerCustomProcedures();
ttlLifeCycle = new TTLLifeCycle(Pools.NEO4J_SCHEDULER, db, log.getUserLog(TTLLifeCycle.class));
ttlLifeCycle.start();
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/apoc/Pools.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Pools {
static final String CONFIG_JOBS_SCHEDULED_NUM_THREADS = "jobs.scheduled.num_threads";
static final String CONFIG_JOBS_POOL_NUM_THREADS = "jobs.pool.num_threads";
static final String CONFIG_BROKERS_NUM_THREADS = "brokers.num_threads";
static final String CONFIG_DEBUG_LOG_THREADS = "jobs.debug.logs";

public final static int DEFAULT_SCHEDULED_THREADS = Runtime.getRuntime().availableProcessors() / 4;
public final static int DEFAULT_POOL_THREADS = Runtime.getRuntime().availableProcessors() * 2;
Expand Down Expand Up @@ -46,9 +47,8 @@ private Pools() {
public static ExecutorService createDefaultPool() {
int threads = getNoThreadsInDefaultPool();
int queueSize = threads * 25;
return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy());
// new ThreadPoolExecutor.CallerRunsPolicy());
return new ThreadPoolExecutorLogger(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy(), "DEFAULT", threadPoolDebug());
}
static class CallerBlocksPolicy implements RejectedExecutionHandler {
@Override
Expand Down Expand Up @@ -80,7 +80,9 @@ public static int getNoThreadsInBrokerPool() {
}

private static ExecutorService createSinglePool() {
return Executors.newSingleThreadExecutor();
return new ThreadPoolExecutorLogger(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), "SINGLE", threadPoolDebug() );
}

private static ScheduledExecutorService createScheduledPool() {
Expand All @@ -90,8 +92,8 @@ private static ScheduledExecutorService createScheduledPool() {
private static ExecutorService createBrokerPool() {
int threads = getNoThreadsInBrokerPool();
int queueSize = threads * 25;
return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy());
return new ThreadPoolExecutorLogger(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy(), "BROKER", threadPoolDebug() );
}

public static <T> Future<Void> processBatch(List<T> batch, GraphDatabaseService db, Consumer<T> action) {
Expand All @@ -114,4 +116,9 @@ public static <T> T force(Future<T> future) throws ExecutionException {
}
}
}

public static Boolean threadPoolDebug()
{
return Boolean.valueOf( ApocConfiguration.get( CONFIG_DEBUG_LOG_THREADS, "false" ) );
}
}
74 changes: 74 additions & 0 deletions src/main/java/apoc/ThreadPoolExecutorLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package apoc;

import org.neo4j.logging.Log;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorLogger extends ThreadPoolExecutor
{
public static Log LOG;
private Boolean debugLog;
private String poolName;

public ThreadPoolExecutorLogger( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
String poolName, Boolean threadPoolDebug )
{
super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
this.poolName = poolName;
this.debugLog = threadPoolDebug;
}

public ThreadPoolExecutorLogger( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler, String poolName, Boolean threadPoolDebug )
{
super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler );
this.poolName = poolName;
this.debugLog = threadPoolDebug;
}

@Override
protected void beforeExecute( Thread t, Runnable r )
{
if ( LOG != null && debugLog )
{
LOG.debug( "BeforeExecute Logging:\n" +
"Pool: " + this.poolName + "\n" +
"Active Thread Count: " + this.getActiveCount() + "\n" +
"Thread Name: " + t.getName() + "\n" +
"Thread Id: " + t.getId() + "\n" +
"Thread Priority: " + t.getPriority() + "\n"
);
}
super.beforeExecute( t, r );
}

public Log getLog()
{
return LOG;
}

public void setLog( Log log )
{
this.LOG = log;
}

public Map<String,Object> getInfo()
{
Map<String,Object> loggingResult = new HashMap<>( );
loggingResult.put( "poolName", poolName );
loggingResult.put( "activeCount", this.getActiveCount() );
loggingResult.put( "corePoolSize", this.getCorePoolSize() );
loggingResult.put( "poolSize", this.getPoolSize() );
loggingResult.put( "largestPoolSize", this.getLargestPoolSize() );
loggingResult.put( "maximumPoolSize", this.getMaximumPoolSize() );
loggingResult.put( "taskCount", this.getTaskCount() );
loggingResult.put( "completedTaskCount", this.getCompletedTaskCount() );

return loggingResult;
}
}
15 changes: 15 additions & 0 deletions src/main/java/apoc/log/Logging.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package apoc.log;

import apoc.Pools;
import apoc.ThreadPoolExecutorLogger;
import apoc.result.MapResult;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.util.Map;
import java.util.stream.Stream;

/**
* @author bradnussbaum
Expand Down Expand Up @@ -49,4 +54,14 @@ public void debug( @Name( "message" ) String message,
log.debug( message, params );
}

@Procedure
@Description( "apoc.log.threadPools() - logs threading info." )
public Stream<MapResult> threadPools()
{
Map<String,Object> singleInfo = ((ThreadPoolExecutorLogger) Pools.SINGLE).getInfo();
Map<String,Object> defaultInfo = ((ThreadPoolExecutorLogger) Pools.DEFAULT).getInfo();
Map<String,Object> brokerInfo = ((ThreadPoolExecutorLogger) Pools.BROKER).getInfo();

return Stream.of( new MapResult( singleInfo ), new MapResult( defaultInfo ), new MapResult( brokerInfo ) );
}
}

0 comments on commit 10e4f70

Please sign in to comment.