生产者消费者模式的三种实现

1. 什么是生产者消费者模式

通过一个容器解决生产者消费者强耦合的问题,生产者和消费者之间不直接通信,通过阻塞的方式来实现数据的存取,阻塞的容器就相当于一个缓冲区,平衡生产者消费者的能力。

2. 为什么要使用生产者消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

代码实战

1.首先是生产者和消费者线程

生产者和消费者各是线程,在线程启动时候,开启无限循环,不断的进行添加和删除的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	public class Consumer extends Thread {

private Resources res;

public Consumer(Resources res) {
this.res = res;
}

@Override
public void run() {
super.run();
while(true) {
res.countDown();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
	public class Productor extends Thread {

private Resources res;

public Productor(Resources res) {
this.res = res;
}

@Override
public void run() {
super.run();
while(true) {
res.add();
}
}
}

2. 重点是我们的资源库,一共三种实现方式

1
2
3
4
5
6
	public interface Resources {

public void add();
public void countDown();

}

定义了一个 resource接口,有一个添加和一个消费的接口。

1. 使用synchronize + wait() + notifyAll() 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
	public class ResourcesImpl2 implements Resources {

// 当前资源数量
private int num = 0;
// 资源池中允许存放的资源数目
private int size = 10;

@Override
public synchronized void add() {

System.out.println(" add ........... ");

if (num < size) {
num++;
System.out.println("生产者"+Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个");
// 通知等待的消费者
this.notifyAll();
} else {
// 如果当前资源池中有10件资源
try {
System.out.println("生产者"+Thread.currentThread().getName() + "线程进入等待");
this.wait();// 生产者进入等待状态,并释放锁
} catch (Exception e) {
e.printStackTrace();
}
}

}

@Override
public synchronized void countDown() {

if (num > 0) {
num--;
System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前线程池有" + num + "个");
this.notifyAll();// 通知生产者生产资源
} else {
try {
// 如果没有资源,则消费者进入等待状态
this.wait();
System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
} catch (Exception e) {
e.printStackTrace();
}
}

}

}

1. 使用阻塞队列BlockQueue 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
	public class ResourcesImpl implements Resources {

BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10);

@Override
public void add() {

if (queue.size() < 10) {
queue.add(1);
}
}

@Override
public void countDown() {
try {
if (queue.size() > 0) {
Integer take = queue.take();
System.out.println(take);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

1. 使用可重入锁的方式 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
	public class ResourcesImpl3 implements Resources {

// 当前资源数量
private int num = 0;
// 资源池中允许存放的资源数目
private int size = 10;

private Lock lock = new ReentrantLock();
private Condition con;
private Condition pro;

public ResourcesImpl3() {
con = lock.newCondition();
pro = lock.newCondition();
}

@Override
public void add() {

System.out.println(" add ........... ");
lock.lock();

try {

if (num < size) {
num++;
System.out.println("生产者" + Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个");
// 通知等待的消费者
con.signalAll();
} else {
// 如果当前资源池中有10件资源
try {
System.out.println("生产者" + Thread.currentThread().getName() + "线程进入等待");
pro.await();// 生产者进入等待状态,并释放锁
} catch (Exception e) {
e.printStackTrace();
}
}

} finally {
lock.unlock();
}
}

@Override
public void countDown() {

lock.lock();

try {
if (num > 0) {
num--;
System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前线程池有" + num + "个");
pro.signalAll();// 通知生产者生产资源
} else {
try {
// 如果没有资源,则消费者进入等待状态
con.await();
System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
} catch (Exception e) {
e.printStackTrace();
}
}

} finally {
lock.unlock();
}
}

}