生产者消费者问题

The world puts off its mask of vastness to its lover.

世界对着它的爱人,把它浩翰的面具揭下了。  

It becomes small as one song, as one kiss of the eternal.

它变小了,小如一首歌,小如一回永恒的接吻。           

生产者消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

该问题需要注意的几点:

  • 在缓冲区为空时,消费者不能再进行消费
  • 在缓冲区为满时,生产者不能再进行生产
  • 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
  • 注意条件变量与互斥锁的顺序

由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。
在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。

管程法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//测试:生产者消费者模型--->利用缓冲区解决:管程法
//生产者,消费者,产品,缓冲区
public class TestPC {
public static void main(String[] args) {
SynContainer container = new SynContainer();
new Productor(container).start();
new Consumer(container).start();
}
}

//生产者
class Productor extends Thread {
SynContainer container;

public Productor(SynContainer container) {
this.container = container;
}

// 生产
@Override
public void run() {
for (int i = 0; i < 100; i++) {
container.push(new Chicken(i));
System.out.println("生产了" + i + "只鸡");
}
}
}

//消费者
class Consumer extends Thread {

SynContainer container;

public Consumer(SynContainer container) {
this.container = container;
}

// 消费
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("消费了-->" + container.pop().id + "只鸡");
}
}
}

//产品
class Chicken {
int id;// 产品编号
public Chicken(int id) {
this.id = id;
}

}

//缓冲区
class SynContainer {

// 需要一个容器大小
Chicken[] chickens = new Chicken[10];
// 容器计数器
int count = 0;

// 生产者放入产品
public synchronized void push(Chicken chicken) {
// 如果容器满了,就需要等待消费者消费
if (count == chickens.length) {
// 通知消费者消费,生产者等待
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 如果没有满,我们就放入产品
chickens[count] = chicken;
count++;
//通知消费者消费了
this.notifyAll();
}

// 消费者消费产品
public synchronized Chicken pop() {
// 判断是否能消费
if (count == 0) {
// 等待生产者生产,消费者等待
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

} // 如果可以消费
count--;
Chicken chicken = chickens[count];
// 吃完了,通知生产者生产
this.notifyAll();
return chicken;
}
}

信号灯法

通过标志位解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.test.Thread;
//解决生产者消费者问题方法2:信号灯法

public class TestPC2 {
public static void main(String[] args) {
TV tv =new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}
//生产者:演员
class Player extends Thread{
TV tv;

public Player(TV tv) {
this.tv = tv;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
if(i%2==0){
this.tv.play("快乐大本营");
}else{
this.tv.play("斗鱼");
}

}
}
}
//消费者:观众
class Watcher extends Thread{
TV tv;

public Watcher(TV tv) {
this.tv = tv;
}

@Override
public void run() {
for (int i = 0; i < 20; i++) {
tv.watch();

}
}
}
//产品:电视节目
class TV{
//演员表演,观众等待 T
//观众观看,演员等待 F
String voice;
boolean flag =true;
public synchronized void play(String voice){
//观众观看,演员等待
if (!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//若观众没看,演员表演
System.out.println("演员表演了"+voice);
//通知观众看
this.notifyAll();
this.voice=voice;
this.flag=!this.flag;

}
//观看方法
public synchronized void watch(){
//演员表演观众等待
if(flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("观看了:"+voice);
//通知演员表演
this.notifyAll();
this.flag=!this.flag;


}
}
查看评论