1. 什么是生产者消费者模式
通过一个容器解决生产者消费者强耦合的问题,生产者和消费者之间不直接通信,通过阻塞的方式来实现数据的存取,阻塞的容器就相当于一个缓冲区,平衡生产者消费者的能力。
2. 为什么要使用生产者消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。
代码实战
1.首先是生产者和消费者线程
生产者和消费者各是线程,在线程启动时候,开启无限循环,不断的进行添加和删除的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Consumer extends Thread { private Resources res;
public Consumer(Resources res) { this.res = res; } @Override public void run() { super.run(); while(true) { res.countDown(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Productor extends Thread { private Resources res;
public Productor(Resources res) { this.res = res; } @Override public void run() { super.run(); while(true) { res.add(); } } }
|
2. 重点是我们的资源库,一共三种实现方式
1 2 3 4 5 6
| public interface Resources { public void add(); public void countDown(); }
|
定义了一个 resource接口,有一个添加和一个消费的接口。
1. 使用synchronize + wait() + notifyAll() 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ResourcesImpl2 implements Resources {
private int num = 0; private int size = 10;
@Override public synchronized void add() {
System.out.println(" add ........... "); if (num < size) { num++; System.out.println("生产者"+Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); this.notifyAll(); } else { try { System.out.println("生产者"+Thread.currentThread().getName() + "线程进入等待"); this.wait(); } catch (Exception e) { e.printStackTrace(); } }
}
@Override public synchronized void countDown() {
if (num > 0) { num--; System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前线程池有" + num + "个"); this.notifyAll(); } else { try { this.wait(); System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态"); } catch (Exception e) { e.printStackTrace(); } }
}
}
|
1. 使用阻塞队列BlockQueue 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ResourcesImpl implements Resources {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(10);
@Override public void add() {
if (queue.size() < 10) { queue.add(1); } }
@Override public void countDown() { try { if (queue.size() > 0) { Integer take = queue.take(); System.out.println(take); } } catch (InterruptedException e) { e.printStackTrace(); } }
}
|
1. 使用可重入锁的方式 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| public class ResourcesImpl3 implements Resources {
private int num = 0; private int size = 10;
private Lock lock = new ReentrantLock(); private Condition con; private Condition pro;
public ResourcesImpl3() { con = lock.newCondition(); pro = lock.newCondition(); }
@Override public void add() {
System.out.println(" add ........... "); lock.lock();
try {
if (num < size) { num++; System.out.println("生产者" + Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); con.signalAll(); } else { try { System.out.println("生产者" + Thread.currentThread().getName() + "线程进入等待"); pro.await(); } catch (Exception e) { e.printStackTrace(); } }
} finally { lock.unlock(); } }
@Override public void countDown() {
lock.lock();
try { if (num > 0) { num--; System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前线程池有" + num + "个"); pro.signalAll(); } else { try { con.await(); System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态"); } catch (Exception e) { e.printStackTrace(); } }
} finally { lock.unlock(); } }
}
|