Java Thread Pool
This is an approach to implement Java Thread Pool.
package com.yogeshchoudhry.example.ThreadPool; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * @author Yogesh Choudhry on 11/24/12. */ public class MyThreadPoolExecutor extends Thread { Logger logger = Logger.getLogger(MyThreadPoolExecutor.class); private static int BATCH_SIZE = 1; private static int CORE_POOL_SIZE = 5; private static int MAXIMUM_POOL_SIZE = 5; private static long KEEP_ALIVE_TIME = 1; private static int BLOCKING_QUEUE_CAPACITY = 1; public MyThreadPoolExecutor() { super("MyThreadPoolExecutor Thread"); this.start(); } public void run() { executeJobs(); } public void executeJobs() { int batchCount = 10; // Total job count if (batchCount > 0) createNDistributeTasks(batchCount, arrayList); // arrayList, param that has complete job list objects } private void createNDistributeTasks(int batchCount, ArrayListarrayList) { logger.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); Date start = new Date(); logger.info("TX-ID : " + ThreadLocalTxCounter.getThreadSerialNum() + " | Start "); ThreadPoolExecutor threadPoolExecutor = null; try { threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue (BLOCKING_QUEUE_CAPACITY), new RejectedQueueHandlerImpl()); int totalChildTask = arrayList.size(); Callable<string> task = null; int batch = 0; Collection<callable> collection = new ArrayList<callable>(); logger.info("Batch #" + batchCount); for (int i = 0; i < totalChildTask; i += BATCH_SIZE) { batch = (totalChildTask - i) > BATCH_SIZE ? BATCH_SIZE : (totalChildTask - i); task = new Custom_TaskWorker((i + 1), (i + batch), arrayList); collection.add(task); } try { logger.info("Task initiated:" + collection.size() + " and each task can contain maximum (" + BATCH_SIZE + ") records"); threadPoolExecutor.invokeAll(collection); logger.info(" -------------------------------------------"); logger.info("Task completed: " + threadPoolExecutor.getCompletedTaskCount()); logger.info(" -------------------------------------------"); } catch (Exception ex) { logger.error(ex.getMessage(), ex); throw ex; } } catch (Exception ex) { logger.error(ex.getMessage(), ex); threadPoolExecutor.shutdownNow(); } finally { try { if (threadPoolExecutor != null && !threadPoolExecutor.isShutdown()) threadPoolExecutor.shutdown(); } catch (Exception ex) { logger.error(ex.getMessage(), ex); } } } } private class RejectedQueueHandlerImpl implements RejectedExecutionHandler { @SuppressWarnings("unchecked") public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { boolean cancelStatus = false; try { cancelStatus = ((FutureTask ) r).cancel(false); logger.info("Task rejected due to limited threads; let me try to cancel this - Status [" + (cancelStatus ? "Cancelled]" : "Still active] - Repushing to queue...")); if (!cancelStatus) { executor.getQueue().put(r); } } catch (Exception ex) { logger.error(ex.getMessage(), ex); } } }
package com.yogeshchoudhry.example.ThreadPool; import java.util.ArrayList; import java.util.concurrent.Callable; import org.apache.log4j.Logger; /** * @author Yogesh Choudhry on 11/24/12. */ public class CustomTaskWorker implements Callable<String> { Logger logger = Logger.getLogger(CustomTaskWorker.class); private int startIndex; private int endIndex; ArrayListarrayList; public CustomTaskWorker(int startIndex, int endIndex, ArrayList arrayList) { this.startIndex = startIndex; this.endIndex = endIndex; this.arrayList = arrayList; } public String call() throws Exception { ThreadLocalTxCounter.resetChildTxId(); String workerName = "Custom Worker Thread[" + ThreadLocalTxCounter.getThreadSerialNum() + "-Worker-" + ThreadLocalTxCounter.getChildSerialNum() + "]"; Thread.currentThread().setName(workerName); int processedRecords = 0; logger.info("|" + workerName + " started... with (" + (((--endIndex) - startIndex) + 1) + ") records [" + startIndex + "-" + endIndex + "]"); try { for (int i = startIndex; i <= endIndex; i++) { logger.info("\t| " + workerName + " >> Index[" + i + "]" + arrayList.get(i).getJobName()); arrayList.get(i).run(); processedRecords++; } logger.info("|" + workerName + " finished (" + processedRecords + ") records"); } catch (Exception ex) { logger.error(" Error call", ex); } return workerName; } }
package com.yogeshchoudhry.example.ThreadPool; /** * @author Yogesh Choudhry on 11/24/12. */ public class ThreadLocalTxCounter { private static ThreadLocalworkerCounter = new ThreadLocal<>(); public static long getThreadSerialNum() { return Thread.currentThread().getId(); } public static Integer getChildSerialNum() { if (workerCounter.get() == null) { workerCounter.set(0); } else { workerCounter.set(workerCounter.get() + 1); } return workerCounter.get(); } public static void resetChildTxId() { workerCounter.set(0); } }
package com.yogeshchoudhry.example.ThreadPool; import org.apache.log4j.Logger; /** * @author Yogesh Choudhry on 11/24/12. */ public class CustomJob implements Runnable { Logger logger = Logger.getLogger(CustomJob.class); private String jobName; public CustomJob(String jobName) { this.jobName = jobName; } public void run() { } public String getJobName() { return jobName; } public void setJobName(String jobName) { this.jobName = jobName; } }
log4j.properties
# Root logger option log4j.rootLogger=DEBUG, stdout # Redirect log messages to console log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n
Output
MyThreadPoolExecutor - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx MyThreadPoolExecutor - TX-ID : 9 | Start MyThreadPoolExecutor - Batch #10 MyThreadPoolExecutor - Task initiated:3 and each task can contain maximum (5) records CustomTaskWorker - |Custom Worker Thread[11-Worker-1] started... with (5) records [0-4] CustomTaskWorker - |Custom Worker Thread[12-Worker-1] started... with (5) records [5-9] CustomTaskWorker - | Custom Worker Thread[12-Worker-1] >> Index[5]Job6 CustomTaskWorker - | Custom Worker Thread[12-Worker-1] >> Index[6]Job7 CustomTaskWorker - | Custom Worker Thread[12-Worker-1] >> Index[7]Job8 CustomTaskWorker - | Custom Worker Thread[12-Worker-1] >> Index[8]Job9 CustomTaskWorker - | Custom Worker Thread[12-Worker-1] >> Index[9]Job10 CustomTaskWorker - |Custom Worker Thread[12-Worker-1] finished (5) records CustomTaskWorker - | Custom Worker Thread[11-Worker-1] >> Index[0]Job1 CustomTaskWorker - | Custom Worker Thread[11-Worker-1] >> Index[1]Job2 CustomTaskWorker - |Custom Worker Thread[13-Worker-1] started... with (3) records [10-12] CustomTaskWorker - | Custom Worker Thread[13-Worker-1] >> Index[10]Job11 CustomTaskWorker - | Custom Worker Thread[13-Worker-1] >> Index[11]Job12 CustomTaskWorker - | Custom Worker Thread[13-Worker-1] >> Index[12]Job13 CustomTaskWorker - |Custom Worker Thread[13-Worker-1] finished (3) records CustomTaskWorker - | Custom Worker Thread[11-Worker-1] >> Index[2]Job3 CustomTaskWorker - | Custom Worker Thread[11-Worker-1] >> Index[3]Job4 CustomTaskWorker - | Custom Worker Thread[11-Worker-1] >> Index[4]Job5 CustomTaskWorker - |Custom Worker Thread[11-Worker-1] finished (5) records MyThreadPoolExecutor - ------------------------------------------- MyThreadPoolExecutor - Task completed: 3 MyThreadPoolExecutor - ------------------------------------------- MyThreadPoolExecutor - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Java Thread Pool
Reviewed by Yogesh Choudhry
on
November 24, 2012
Rating:
