自定义线程池实现

  1. 任务队列
  2. 拒绝策略(抛异常,丢弃,阻塞 , 临时队列 等 )
  3. init
  4. active
  5. max

成员变量

1
2
3
4
5
6
7
8
// 工作线程数量
private int size ;
//工作队列大小
private int queueSize ;
// 任务缓存队列
private final static LinkedList<Runnable> taskQueue = new LinkedList<>();
// 线程集合
private final static List<WorkThread> threadQueue = new ArrayList<>();

构造函数进行线程初始化

1
2
3
4
5
6
7
8
9
10
11
12
private void init() {
for (int i = 0; i < size; i++) {
createWorkThread();
}
}

private void createWorkThread() {
WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++);
threadQueue.add(task);
task.start();
}

工作线程核心代码

如果当前线程状态存活,就不断的从任务队列中取,如果任务队列为空就进入阻塞状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void run() {
OUTER:
while (taskState != TaskState.DEAD) {
Runnable runnable ;
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskState = TaskState.BLOCKED;
taskQueue.wait();
} catch (InterruptedException e) {
// e.printStackTrace();
break OUTER;
}
}
runnable = taskQueue.removeFirst();
}

if (runnable != null) {
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}

}
}

提交任务

锁定任务队列,计算缓存大小,添加任务并唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void submit(Runnable runnable) {

if (isDestoryed) {
throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 ");
}
synchronized (taskQueue) {

if (taskQueue.size() > queueSize) {
discardPolicy.discard();
}

taskQueue.addLast(runnable);
taskQueue.notifyAll();
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194

public class ThreadPool {

// 工作线程数量
private int size ;

//工作队列大小
private int queueSize ;

//拒绝策略
private DiscardPolicy discardPolicy;


private int serialNo ;

private final static int DEFAULT_SIZE = 10 ;

private final static int DEFAULT_QUEUE_SIZE = 100 ;

// 任务缓存队列
private final static LinkedList<Runnable> taskQueue = new LinkedList<>();

// 线程集合
private final static List<WorkThread> threadQueue = new ArrayList<>();

private final ThreadGroup threadGroup = new ThreadGroup("pool_group");

private static DiscardPolicy DEFAULT_POLICY = () -> {
throw new DiscardException("拒绝了 ... ");
};

private boolean isDestoryed = false;


public ThreadPool() {
this(DEFAULT_SIZE , DEFAULT_QUEUE_SIZE , DEFAULT_POLICY);
}

public ThreadPool(int poolSize, int queueSize ) {
this(poolSize , queueSize ,DEFAULT_POLICY);
}

public ThreadPool(int poolSize, int queueSize , DiscardPolicy policy) {
this.size = poolSize;
this.queueSize = queueSize ;
this.discardPolicy = policy;
init();
}

public void submit(Runnable runnable) {

if (isDestoryed) {
throw new IllegalStateException("线程池已经关闭 , 不允许提交任务 ");
}
synchronized (taskQueue) {

if (taskQueue.size() > queueSize) {
discardPolicy.discard();
}

taskQueue.addLast(runnable);
taskQueue.notifyAll();
}
}

public void shutdown() throws InterruptedException {

while (!taskQueue.isEmpty()) {
Thread.sleep(50);
}

int initval = threadQueue.size();

while (initval > 0) {
for (WorkThread task : threadQueue) {
if (task.getTaskState() == TaskState.BLOCKED) {
task.interrupt();
task.close();
initval--;
} else {
Thread.sleep(10);
}
}
}

isDestoryed = true;
}


private void init() {
for (int i = 0; i < size; i++) {
createWorkThread();
}
}

private void createWorkThread() {
WorkThread task = new WorkThread(threadGroup , "thread: "+serialNo++);
threadQueue.add(task);
task.start();
}


public static class DiscardException extends RuntimeException {
public DiscardException(String message) {
super(message);
}
}

public interface DiscardPolicy {
void discard();
}

enum TaskState {
FREE , RUNNING , BLOCKED , DEAD
}


/**
* 工作线程,核心代码
*/
static class WorkThread extends Thread {

private volatile TaskState taskState = TaskState.FREE;


public WorkThread(ThreadGroup group , String name) {
super(group,name);
}

public TaskState getTaskState() {
return taskState;
}

@Override
public void run() {
OUTER:
while (taskState != TaskState.DEAD) {
Runnable runnable ;
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskState = TaskState.BLOCKED;
taskQueue.wait();
} catch (InterruptedException e) {
// e.printStackTrace();
break OUTER;
}
}
runnable = taskQueue.removeFirst();
}

if (runnable != null) {
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}

}
}

public void close() {
this.taskState = TaskState.DEAD;
}
}


public static void main(String[] args) throws InterruptedException {

ThreadPool threadPool = new ThreadPool();
for (int i = 0; i < 40; i++) {
int finalI = i;
threadPool.submit(()-> {
System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " start ");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("the runnable is : " + finalI + " thread: " + Thread.currentThread() + " end ");
});
}

System.out.println(" ----------------------------- ");

threadPool.shutdown();

threadPool.submit(() -> {
System.out.println("hello world");
});

}

}