Earn Online (2) English (10) Java (7) Money (1) Money Online (1) Script (2) Trending (7)

Java Thread Pool





This is an approach to implement Java Thread Pool.


package com.yogeshchoudhry.example.ThreadPool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

/**
 * @author Yogesh Choudhry on 11/24/12.
 */
public class MyThreadPoolExecutor extends Thread {

    Logger logger = Logger.getLogger(MyThreadPoolExecutor.class);

    private static int BATCH_SIZE = 1;
    private static int CORE_POOL_SIZE = 5;
    private static int MAXIMUM_POOL_SIZE = 5;
    private static long KEEP_ALIVE_TIME = 1;
    private static int BLOCKING_QUEUE_CAPACITY = 1;


    public MyThreadPoolExecutor() {
        super("MyThreadPoolExecutor Thread");
        this.start();
    }

    public void run() {
        executeJobs();
    }

    public void executeJobs() {
        int batchCount = 10; // Total job count


        if (batchCount > 0)
            createNDistributeTasks(batchCount, arrayList);

        // arrayList, param that has complete job list objects

    }

    private void createNDistributeTasks(int batchCount,
                                        ArrayList arrayList) {

        logger.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
        Date start = new Date();
        logger.info("TX-ID : " + ThreadLocalTxCounter.getThreadSerialNum()
                + " | Start ");
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
                    MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                    new LinkedBlockingQueue(BLOCKING_QUEUE_CAPACITY),
                    new RejectedQueueHandlerImpl());
            int totalChildTask = arrayList.size();

            Callable<string> task = null;
            int batch = 0;

            Collection<callable> collection = new ArrayList<callable>();

            logger.info("Batch #" + batchCount);

            for (int i = 0; i < totalChildTask; i += BATCH_SIZE) {
                batch = (totalChildTask - i) > BATCH_SIZE ? BATCH_SIZE
                        : (totalChildTask - i);
                task = new Custom_TaskWorker((i + 1), (i + batch), arrayList);
                collection.add(task);
            }
            try {
                logger.info("Task initiated:" + collection.size()
                        + " and each task can contain maximum (" + BATCH_SIZE
                        + ") records");
                threadPoolExecutor.invokeAll(collection);

                logger.info(" -------------------------------------------");
                logger.info("Task completed: "
                        + threadPoolExecutor.getCompletedTaskCount());
                logger.info(" -------------------------------------------");

            } catch (Exception ex) {
                logger.error(ex.getMessage(), ex);
                throw ex;
            }

        } catch (Exception ex) {
            logger.error(ex.getMessage(), ex);
            threadPoolExecutor.shutdownNow();
        } finally {
            try {
                if (threadPoolExecutor != null
                        && !threadPoolExecutor.isShutdown())
                    threadPoolExecutor.shutdown();
            } catch (Exception ex) {
                logger.error(ex.getMessage(), ex);
            }
        }
    }
}


private class RejectedQueueHandlerImpl implements
            RejectedExecutionHandler {
        @SuppressWarnings("unchecked")
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            boolean cancelStatus = false;
            try {
                cancelStatus = ((FutureTask) r).cancel(false);
                logger.info("Task rejected due to limited threads; let me try to cancel this - Status ["
                        + (cancelStatus ? "Cancelled]"
                        : "Still active] - Repushing to queue..."));
                if (!cancelStatus) {
                    executor.getQueue().put(r);
                }
            } catch (Exception ex) {
                logger.error(ex.getMessage(), ex);
            }
        }
    }

package com.yogeshchoudhry.example.ThreadPool;

import java.util.ArrayList;
import java.util.concurrent.Callable;

import org.apache.log4j.Logger;

/**
 * @author Yogesh Choudhry on 11/24/12.
 */
public class CustomTaskWorker implements Callable<String> {

    Logger logger = Logger.getLogger(CustomTaskWorker.class);

    private int startIndex;
    private int endIndex;
    ArrayList arrayList;

    public CustomTaskWorker(int startIndex, int endIndex,
                            ArrayList arrayList) {
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.arrayList = arrayList;
    }

    public String call() throws Exception {
        ThreadLocalTxCounter.resetChildTxId();
        String workerName = "Custom Worker Thread["
                + ThreadLocalTxCounter.getThreadSerialNum() + "-Worker-"
                + ThreadLocalTxCounter.getChildSerialNum() + "]";
        Thread.currentThread().setName(workerName);
        int processedRecords = 0;
        logger.info("|" + workerName + " started... with ("
                + (((--endIndex) - startIndex) + 1) + ") records [" + startIndex + "-" + endIndex + "]");
        try {
            for (int i = startIndex; i <= endIndex; i++) {
                logger.info("\t| " + workerName + " >> Index[" + i + "]" + arrayList.get(i).getJobName());
                arrayList.get(i).run();
                processedRecords++;
            }
            logger.info("|" + workerName + " finished (" + processedRecords
                    + ") records");
        } catch (Exception ex) {
            logger.error(" Error call", ex);
        }
        return workerName;
    }
}

package com.yogeshchoudhry.example.ThreadPool;

/**
 * @author Yogesh Choudhry on 11/24/12.
 */
public class ThreadLocalTxCounter {

    private static ThreadLocal workerCounter = new ThreadLocal<>();

    public static long getThreadSerialNum() {
        return Thread.currentThread().getId();
    }

    public static Integer getChildSerialNum() {
        if (workerCounter.get() == null) {
            workerCounter.set(0);
        } else {
            workerCounter.set(workerCounter.get() + 1);
        }
        return workerCounter.get();
    }

    public static void resetChildTxId() {
        workerCounter.set(0);
    }

}

package com.yogeshchoudhry.example.ThreadPool;

import org.apache.log4j.Logger;

/**
 * @author Yogesh Choudhry on 11/24/12.
 */
public class CustomJob implements Runnable {

    Logger logger = Logger.getLogger(CustomJob.class);

    private String jobName;

    public CustomJob(String jobName) {
        this.jobName = jobName;
    }

    public void run() {

    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }
}

log4j.properties
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n

Output
MyThreadPoolExecutor - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
MyThreadPoolExecutor - TX-ID : 9 | Start 
MyThreadPoolExecutor - Batch #10
MyThreadPoolExecutor - Task initiated:3 and each task can contain maximum (5) records
CustomTaskWorker - |Custom Worker Thread[11-Worker-1] started... with (5) records [0-4]
CustomTaskWorker - |Custom Worker Thread[12-Worker-1] started... with (5) records [5-9]
CustomTaskWorker -  | Custom Worker Thread[12-Worker-1] >> Index[5]Job6
CustomTaskWorker -  | Custom Worker Thread[12-Worker-1] >> Index[6]Job7
CustomTaskWorker -  | Custom Worker Thread[12-Worker-1] >> Index[7]Job8
CustomTaskWorker -  | Custom Worker Thread[12-Worker-1] >> Index[8]Job9
CustomTaskWorker -  | Custom Worker Thread[12-Worker-1] >> Index[9]Job10
CustomTaskWorker - |Custom Worker Thread[12-Worker-1] finished (5) records
CustomTaskWorker -  | Custom Worker Thread[11-Worker-1] >> Index[0]Job1
CustomTaskWorker -  | Custom Worker Thread[11-Worker-1] >> Index[1]Job2
CustomTaskWorker - |Custom Worker Thread[13-Worker-1] started... with (3) records [10-12]
CustomTaskWorker -  | Custom Worker Thread[13-Worker-1] >> Index[10]Job11
CustomTaskWorker -  | Custom Worker Thread[13-Worker-1] >> Index[11]Job12
CustomTaskWorker -  | Custom Worker Thread[13-Worker-1] >> Index[12]Job13
CustomTaskWorker - |Custom Worker Thread[13-Worker-1] finished (3) records
CustomTaskWorker -  | Custom Worker Thread[11-Worker-1] >> Index[2]Job3
CustomTaskWorker -  | Custom Worker Thread[11-Worker-1] >> Index[3]Job4
CustomTaskWorker -  | Custom Worker Thread[11-Worker-1] >> Index[4]Job5
CustomTaskWorker - |Custom Worker Thread[11-Worker-1] finished (5) records
MyThreadPoolExecutor -  -------------------------------------------
MyThreadPoolExecutor - Task completed: 3
MyThreadPoolExecutor -  -------------------------------------------
MyThreadPoolExecutor - xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx



Java Thread Pool Java Thread Pool Reviewed by Yogesh Choudhry on November 24, 2012 Rating: 5

Popular Posts