Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.3 - Thread Logging #30

Merged
merged 1 commit into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/apoc/ApocKernelExtensionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public IndexUpdateTransactionEventHandler.LifeCycle getIndexUpdateLifeCycle() {
public void start() throws Throwable {
ApocConfiguration.initialize(db);
Pools.NEO4J_SCHEDULER = dependencies.scheduler();
ThreadPoolExecutorLogger.LOG = log.getUserLog( ThreadPoolExecutorLogger.class );
registerCustomProcedures();
ttlLifeCycle = new TTLLifeCycle(Pools.NEO4J_SCHEDULER, db, log.getUserLog(TTLLifeCycle.class));
ttlLifeCycle.start();
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/apoc/Pools.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class Pools {
static final String CONFIG_JOBS_SCHEDULED_NUM_THREADS = "jobs.scheduled.num_threads";
static final String CONFIG_JOBS_POOL_NUM_THREADS = "jobs.pool.num_threads";
static final String CONFIG_BROKERS_NUM_THREADS = "brokers.num_threads";
static final String CONFIG_DEBUG_LOG_THREADS = "jobs.debug.logs";

public final static int DEFAULT_SCHEDULED_THREADS = Runtime.getRuntime().availableProcessors() / 4;
public final static int DEFAULT_POOL_THREADS = Runtime.getRuntime().availableProcessors() * 2;
Expand Down Expand Up @@ -46,9 +47,8 @@ private Pools() {
public static ExecutorService createDefaultPool() {
int threads = getNoThreadsInDefaultPool();
int queueSize = threads * 25;
return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy());
// new ThreadPoolExecutor.CallerRunsPolicy());
return new ThreadPoolExecutorLogger(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy(), "DEFAULT", threadPoolDebug());
}
static class CallerBlocksPolicy implements RejectedExecutionHandler {
@Override
Expand Down Expand Up @@ -80,7 +80,9 @@ public static int getNoThreadsInBrokerPool() {
}

private static ExecutorService createSinglePool() {
return Executors.newSingleThreadExecutor();
return new ThreadPoolExecutorLogger(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), "SINGLE", threadPoolDebug() );
}

private static ScheduledExecutorService createScheduledPool() {
Expand All @@ -90,8 +92,8 @@ private static ScheduledExecutorService createScheduledPool() {
private static ExecutorService createBrokerPool() {
int threads = getNoThreadsInBrokerPool();
int queueSize = threads * 25;
return new ThreadPoolExecutor(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy());
return new ThreadPoolExecutorLogger(threads / 2, threads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize),
new CallerBlocksPolicy(), "BROKER", threadPoolDebug() );
}

public static <T> Future<Void> processBatch(List<T> batch, GraphDatabaseService db, Consumer<T> action) {
Expand All @@ -114,4 +116,9 @@ public static <T> T force(Future<T> future) throws ExecutionException {
}
}
}

public static Boolean threadPoolDebug()
{
return Boolean.valueOf( ApocConfiguration.get( CONFIG_DEBUG_LOG_THREADS, "false" ) );
}
}
74 changes: 74 additions & 0 deletions src/main/java/apoc/ThreadPoolExecutorLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package apoc;

import org.neo4j.logging.Log;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorLogger extends ThreadPoolExecutor
{
public static Log LOG;
private Boolean debugLog;
private String poolName;

public ThreadPoolExecutorLogger( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
String poolName, Boolean threadPoolDebug )
{
super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue );
this.poolName = poolName;
this.debugLog = threadPoolDebug;
}

public ThreadPoolExecutorLogger( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler, String poolName, Boolean threadPoolDebug )
{
super( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler );
this.poolName = poolName;
this.debugLog = threadPoolDebug;
}

@Override
protected void beforeExecute( Thread t, Runnable r )
{
if ( LOG != null && debugLog )
{
LOG.debug( "BeforeExecute Logging:\n" +
"Pool: " + this.poolName + "\n" +
"Active Thread Count: " + this.getActiveCount() + "\n" +
"Thread Name: " + t.getName() + "\n" +
"Thread Id: " + t.getId() + "\n" +
"Thread Priority: " + t.getPriority() + "\n"
);
}
super.beforeExecute( t, r );
}

public Log getLog()
{
return LOG;
}

public void setLog( Log log )
{
this.LOG = log;
}

public Map<String,Object> getInfo()
{
Map<String,Object> loggingResult = new HashMap<>( );
loggingResult.put( "poolName", poolName );
loggingResult.put( "activeCount", this.getActiveCount() );
loggingResult.put( "corePoolSize", this.getCorePoolSize() );
loggingResult.put( "poolSize", this.getPoolSize() );
loggingResult.put( "largestPoolSize", this.getLargestPoolSize() );
loggingResult.put( "maximumPoolSize", this.getMaximumPoolSize() );
loggingResult.put( "taskCount", this.getTaskCount() );
loggingResult.put( "completedTaskCount", this.getCompletedTaskCount() );

return loggingResult;
}
}
15 changes: 15 additions & 0 deletions src/main/java/apoc/log/Logging.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package apoc.log;

import apoc.Pools;
import apoc.ThreadPoolExecutorLogger;
import apoc.result.MapResult;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.util.Map;
import java.util.stream.Stream;

/**
* @author bradnussbaum
Expand Down Expand Up @@ -49,4 +54,14 @@ public void debug( @Name( "message" ) String message,
log.debug( message, params );
}

@Procedure
@Description( "apoc.log.threadPools() - logs threading info." )
public Stream<MapResult> threadPools()
{
Map<String,Object> singleInfo = ((ThreadPoolExecutorLogger) Pools.SINGLE).getInfo();
Map<String,Object> defaultInfo = ((ThreadPoolExecutorLogger) Pools.DEFAULT).getInfo();
Map<String,Object> brokerInfo = ((ThreadPoolExecutorLogger) Pools.BROKER).getInfo();

return Stream.of( new MapResult( singleInfo ), new MapResult( defaultInfo ), new MapResult( brokerInfo ) );
}
}