- 任务队列
- 拒绝策略(抛异常,丢弃,阻塞 , 临时队列 等 )
- init
- active
- max
成员变量
1 2 3 4 5 6 7 8
| // 工作线程数量 private int size ; //工作队列大小 private int queueSize ; // 任务缓存队列 private final static LinkedList<Runnable> taskQueue = new LinkedList<>(); // 线程集合 private final static List<WorkThread> threadQueue = new ArrayList<>();
|
构造函数进行线程初始化
1 2 3 4 5 6 7 8 9 10 11 12
| private void init() { for (int i = 0; i < size; i++) { createWorkThread(); } }
private void createWorkThread() { WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++); threadQueue.add(task); task.start(); }
|
工作线程核心代码
如果当前线程状态存活,就不断的从任务队列中取,如果任务队列为空就进入阻塞状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public void run() { OUTER: while (taskState != TaskState.DEAD) { Runnable runnable ; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { taskState = TaskState.BLOCKED; taskQueue.wait(); } catch (InterruptedException e) { // e.printStackTrace(); break OUTER; } } runnable = taskQueue.removeFirst(); }
if (runnable != null) { taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; }
} }
|
提交任务
锁定任务队列,计算缓存大小,添加任务并唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void submit(Runnable runnable) {
if (isDestoryed) { throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 "); } synchronized (taskQueue) {
if (taskQueue.size() > queueSize) { discardPolicy.discard(); }
taskQueue.addLast(runnable); taskQueue.notifyAll(); } }
|
完整代码

| public class ThreadPool {
// 工作线程数量 private int size ;
//工作队列大小 private int queueSize ;
//拒绝策略 private DiscardPolicy discardPolicy;
private int serialNo ;
private final static int DEFAULT_SIZE = 10 ;
private final static int DEFAULT_QUEUE_SIZE = 100 ;
// 任务缓存队列 private final static LinkedList<Runnable> taskQueue = new LinkedList<>();
// 线程集合 private final static List<WorkThread> threadQueue = new ArrayList<>();
private final ThreadGroup threadGroup = new ThreadGroup("pool_group");
private static DiscardPolicy DEFAULT_POLICY = () -> { throw new DiscardException("拒绝了 ... "); };
private boolean isDestoryed = false;
public ThreadPool() { this(DEFAULT_SIZE , DEFAULT_QUEUE_SIZE , DEFAULT_POLICY); }
public ThreadPool(int poolSize, int queueSize ) { this(poolSize , queueSize ,DEFAULT_POLICY); }
public ThreadPool(int poolSize, int queueSize , DiscardPolicy policy) { this.size = poolSize; this.queueSize = queueSize ; this.discardPolicy = policy; init(); }
public void submit(Runnable runnable) {
if (isDestoryed) { throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 "); } synchronized (taskQueue) {
if (taskQueue.size() > queueSize) { discardPolicy.discard(); }
taskQueue.addLast(runnable); taskQueue.notifyAll(); } }
public void shutdown() throws InterruptedException {
while (!taskQueue.isEmpty()) { Thread.sleep(50); }
int initval = threadQueue.size();
while (initval > 0) { for (WorkThread task : threadQueue) { if (task.getTaskState() == TaskState.BLOCKED) { task.interrupt(); task.close(); initval--; } else { Thread.sleep(10); } } }
isDestoryed = true; }
private void init() { for (int i = 0; i < size; i++) { createWorkThread(); } }
private void createWorkThread() { WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++); threadQueue.add(task); task.start(); }
public static class DiscardException extends RuntimeException { public DiscardException(String message) { super(message); } }
public interface DiscardPolicy { void discard(); }
enum TaskState { FREE , RUNNING , BLOCKED , DEAD }
/** * 工作线程,核心代码 */ static class WorkThread extends Thread {
private volatile TaskState taskState = TaskState.FREE;
public WorkThread(ThreadGroup group , String name) { super(group,name); }
public TaskState getTaskState() { return taskState; }
@Override public void run() { OUTER: while (taskState != TaskState.DEAD) { Runnable runnable ; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { taskState = TaskState.BLOCKED; taskQueue.wait(); } catch (InterruptedException e) { // e.printStackTrace(); break OUTER; } } runnable = taskQueue.removeFirst(); }
if (runnable != null) { taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; }
} }
public void close() { this.taskState = TaskState.DEAD; } }
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool(); for (int i = 0; i < 40; i++) { int finalI = i; threadPool.submit(()-> { System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " start "); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " end "); }); }
System.out.println(" ----------------------------- ");
threadPool.shutdown();
threadPool.submit(() -> { System.out.println("hello world"); });
}
}
|