- 任务队列
- 拒绝策略(抛异常,丢弃,阻塞 , 临时队列 等 )
- init
- active
- max
成员变量
1 2 3 4 5 6 7 8
| // 工作线程数量 private int size ; //工作队列大小 private int queueSize ; // 任务缓存队列 private final static LinkedList<Runnable> taskQueue = new LinkedList<>(); // 线程集合 private final static List<WorkThread> threadQueue = new ArrayList<>();
|
构造函数进行线程初始化
1 2 3 4 5 6 7 8 9 10 11 12
| private void init() { for (int i = 0; i < size; i++) { createWorkThread(); } }
private void createWorkThread() { WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++); threadQueue.add(task); task.start(); }
|
工作线程核心代码
如果当前线程状态存活,就不断的从任务队列中取,如果任务队列为空就进入阻塞状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public void run() { OUTER: while (taskState != TaskState.DEAD) { Runnable runnable ; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { taskState = TaskState.BLOCKED; taskQueue.wait(); } catch (InterruptedException e) { // e.printStackTrace(); break OUTER; } } runnable = taskQueue.removeFirst(); }
if (runnable != null) { taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; }
} }
|
提交任务
锁定任务队列,计算缓存大小,添加任务并唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void submit(Runnable runnable) {
if (isDestoryed) { throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 "); } synchronized (taskQueue) {
if (taskQueue.size() > queueSize) { discardPolicy.discard(); }
taskQueue.addLast(runnable); taskQueue.notifyAll(); } }
|
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
| public class ThreadPool {
// 工作线程数量 private int size ;
//工作队列大小 private int queueSize ;
//拒绝策略 private DiscardPolicy discardPolicy;
private int serialNo ;
private final static int DEFAULT_SIZE = 10 ;
private final static int DEFAULT_QUEUE_SIZE = 100 ;
// 任务缓存队列 private final static LinkedList<Runnable> taskQueue = new LinkedList<>();
// 线程集合 private final static List<WorkThread> threadQueue = new ArrayList<>();
private final ThreadGroup threadGroup = new ThreadGroup("pool_group");
private static DiscardPolicy DEFAULT_POLICY = () -> { throw new DiscardException("拒绝了 ... "); };
private boolean isDestoryed = false;
public ThreadPool() { this(DEFAULT_SIZE , DEFAULT_QUEUE_SIZE , DEFAULT_POLICY); }
public ThreadPool(int poolSize, int queueSize ) { this(poolSize , queueSize ,DEFAULT_POLICY); }
public ThreadPool(int poolSize, int queueSize , DiscardPolicy policy) { this.size = poolSize; this.queueSize = queueSize ; this.discardPolicy = policy; init(); }
public void submit(Runnable runnable) {
if (isDestoryed) { throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 "); } synchronized (taskQueue) {
if (taskQueue.size() > queueSize) { discardPolicy.discard(); }
taskQueue.addLast(runnable); taskQueue.notifyAll(); } }
public void shutdown() throws InterruptedException {
while (!taskQueue.isEmpty()) { Thread.sleep(50); }
int initval = threadQueue.size();
while (initval > 0) { for (WorkThread task : threadQueue) { if (task.getTaskState() == TaskState.BLOCKED) { task.interrupt(); task.close(); initval--; } else { Thread.sleep(10); } } }
isDestoryed = true; }
private void init() { for (int i = 0; i < size; i++) { createWorkThread(); } }
private void createWorkThread() { WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++); threadQueue.add(task); task.start(); }
public static class DiscardException extends RuntimeException { public DiscardException(String message) { super(message); } }
public interface DiscardPolicy { void discard(); }
enum TaskState { FREE , RUNNING , BLOCKED , DEAD }
/** * 工作线程,核心代码 */ static class WorkThread extends Thread {
private volatile TaskState taskState = TaskState.FREE;
public WorkThread(ThreadGroup group , String name) { super(group,name); }
public TaskState getTaskState() { return taskState; }
@Override public void run() { OUTER: while (taskState != TaskState.DEAD) { Runnable runnable ; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { taskState = TaskState.BLOCKED; taskQueue.wait(); } catch (InterruptedException e) { // e.printStackTrace(); break OUTER; } } runnable = taskQueue.removeFirst(); }
if (runnable != null) { taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; }
} }
public void close() { this.taskState = TaskState.DEAD; } }
public static void main(String[] args) throws InterruptedException {
ThreadPool threadPool = new ThreadPool(); for (int i = 0; i < 40; i++) { int finalI = i; threadPool.submit(()-> { System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " start "); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " end "); }); }
System.out.println(" ----------------------------- ");
threadPool.shutdown();
threadPool.submit(() -> { System.out.println("hello world"); });
}
}
|