从生产者消费者模型实现看信号量和条件变量的区别

本文主要诞生于代码检视的时候,发现实现生产者消费者相关代码使用的信号量,因为通常生产者消费者模型大多数例子都是使用条件变量进行实现的。
通过该例子可以看到使用信号实现生产者与消费者模型与使用条件变量的区别。
本文主要介绍信号量和条件变量的区别,使用信号量和条件变量的优劣势。

信号量和条件变量的定义和区别

条件变量的定义

条件变量是一种允许线程挂起执行并等待某个特定条件的同步原语。它总是与互斥锁(mutex)一起使用,以避免多线程的竞态条件。条件变量使得线程可以睡眠等待某个条件的变化,而不是忙等待(busy-waiting),从而提高了系统资源的利用效率与程序的可扩展性。

条件变量的常见操作:

  • wait: 线程调用wait操作将自己置于等待状态直到某个条件为真,通常这个操作会原子性地释放相关的mutex,并且加入条件的等待队列。

  • signal: 当条件变为真时,另一个线程调用signal告知等待条件变量的线程(通常至少唤醒一个)可以继续执行。

  • broadcast: 类似于signal,但是它会唤醒等待同一个条件变量的所有线程。

信号量的定义

信号量是一个更为通用的同步原语,通常用来保护对共享资源的访问。它包含一个计数器,表示可用资源的数量,并且提供原子操作来增加或减少计数器的值。
《Unix环境高级编程》中提到“信号量与管道、FIFO以及消息队列不同。信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。”

信号量的常见操作:

  • waitP 操作: 如果计数器的值大于零,则将它减一,这通常表示线程占用了一个资源。如果计数器值为零,则线程进入阻塞状态,直到计数器大于零。

  • signalV 操作: 释放资源并将信号量的值加一,如果有线程因计数器值为零而阻塞,它们之中的一个或多个将被唤醒。

基本概念区别

信号量 (Semaphore)

1
2
3
4
5
6
7
8
9
10
11
  sem_t sem;
  sem_init(&sem, 0, 0);  // 初始值为0

  // 等待

  sem_wait(&sem);  // 计数器-1,如果<0则阻塞


  // 唤醒

  sem_post(&sem);  // 计数器+1,如果<=0则唤醒一个等待线程

条件变量 (Condition Variable)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

  pthread_cond_t cond = PTHREAD_COND_INITIALIZER;


  // 等待

  pthread_mutex_lock(&mutex);

  while (condition_not_met) {

      pthread_cond_wait(&cond, &mutex);  // 自动释放锁并等待

  }

  // 执行临界区代码

  pthread_mutex_unlock(&mutex);


  // 唤醒

  pthread_mutex_lock(&mutex);

  condition_met = true;

  pthread_cond_signal(&cond);    // 唤醒一个线程

  // 或者

  pthread_cond_broadcast(&cond); // 唤醒所有线程

  pthread_mutex_unlock(&mutex);

核心区别总结

特性 信号量 条件变量
状态保持 内置计数器状态 无状态,依赖外部条件
使用方式 独立使用 必须配合互斥锁使用
唤醒语义 计数器>0即可唤醒 需要显式检查条件
丢失唤醒 不会丢失(计数器记住) 可能丢失(需要while循环)
复杂度 简单 相对复杂

通常来说, 信号量侧重于资源保护(计数器语义);而条件变量则更侧重线程同步(状态协调语义)。当然有了信号量后,仍然需要使用条件变量,主要原因是条件变量提供了更为细粒度的线程同步机制、允许线程对复杂状态的等待、以及线程之间的更有效通信。信号量主要用于控制对共享资源的访问,是一种更为通用的同步机制,而条件变量则用于线程之间的协调,尤其适用于需要等待特定条件成立时才能继续执行的场景。例如,在生产者-消费者问题中, 使用条件变量可以让消费者线程在缓冲区为空时进入睡眠状态,直到生产者线程向缓冲区添加一个新的项目并通知条件变量,消费者线程才会被唤醒,这有利于减少不必要的轮询以及CPU资源的浪费。

也就是说如果用信号量实现生产者消费者模型,需要不停的轮询,是会比用条件变量耗费CPU资源。

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
bool LogicMDS::Init(const uint32_t local_id) {

local_id_ = local_id;


LAVA_ABORT(sem_init(&sem_, 0, 0) == 0);

LAVA_ABORT(sem_init(&cc_sem_, 0, 0) == 0);

LAVA_ABORT(sem_init(&cp_sem_, 0, 0) == 0);


cc_inflight_cnt_.store(0);


chunkclient_work_thread_ = new std::thread([this]() {

LAVA_LOG_INFO(streamfsmds_logger) << "chunkclient thread:" << std::this_thread::get_id() << lendl;

LAVA_ABORT(ChunkInstTLSInit());


cstor::lava::store::cc::ChunkClient *chunk_inst = ChunkInstTLSGet();


// 到此处后,开始处理

uint64_t last_feed_cc_dog_s = GetTimePointS();

uint64_t cur_feed_cc_dog_s;

CCFeedWatchDog(last_feed_cc_dog_s);

CCWakeupPolling();

std::function<void()> *cc_task = nullptr;

do {

if (cc_req_queue_.pop(cc_task)) {

cc_inflight_cnt_++;

(*cc_task)();

delete cc_task;

chunk_inst->ProcessCompletion(10);

} else {

if (cc_inflight_cnt_) {

// 有任务未回调完成

chunk_inst->ProcessCompletion(10);

} else {

// 无任务休眠

CCSemWait();

}

}


cur_feed_cc_dog_s = GetTimePointS();

if (LAVA_TIME_AFTER64(cur_feed_cc_dog_s, (last_feed_cc_dog_s + 1))) {

last_feed_cc_dog_s = cur_feed_cc_dog_s;

CCFeedWatchDog(cur_feed_cc_dog_s);

}

} while (running_flag_);


ChunkInstTLSDestory();

});


checkpoint_work_thread_ = new std::thread([this]() {

LAVA_LOG_INFO(streamfsmds_logger) << "checkpoint thread:" << std::this_thread::get_id() << lendl;

// 到此处后,开始处理

uint64_t last_feed_cp_dog_s = GetTimePointS();

uint64_t cur_feed_cp_dog_s;

CPFeedWatchDog(last_feed_cp_dog_s);

CPWakeupPolling();

do {

MDSOPTaskInfo *checkpoint_op;

if (cp_req_queue_.pop(checkpoint_op)) {

if (nullptr == checkpoint_op) {

// session_mgr ut析构时发现会pop出nullptr

return;

}

LAVA_LOG_TRACE(streamfsmds_logger) << *checkpoint_op << " get cp task." << lendl;

HandleCPTask(checkpoint_op);

} else {

CPSemWait();

}


cur_feed_cp_dog_s = GetTimePointS();

if (LAVA_TIME_AFTER64(cur_feed_cp_dog_s, (last_feed_cp_dog_s + 1))) {

last_feed_cp_dog_s = cur_feed_cp_dog_s;

CPFeedWatchDog(cur_feed_cp_dog_s);

}

} while (running_flag_);

});


mds_work_thread_ = new std::thread([this]() {

LAVA_LOG_INFO(streamfsmds_logger) << "mds thread:" << std::this_thread::get_id() << lendl;

std::function<void(void)> func_vec[MDS_POLLING_TYPE_CNT];


func_vec[MDS_POLLING_TYPE_TASK] = [this]() {

MDSOPTaskInfo *mds_op = nullptr;

if (req_queue_.pop(mds_op)) {

if (nullptr == mds_op) {

// session_mgr ut析构时发现会pop出nullptr

return;

}

LAVA_LOG_TRACE(streamfsmds_logger) << *mds_op << " get from queue." << lendl;

LogicMDSTaskDo(mds_op);

}

};


func_vec[MDS_POLLING_TYPE_CHUNK] = [this]() {

std::function<void()> *cc_callback = nullptr;

if (cc_callback_queue_.pop(cc_callback)) {

if (nullptr == cc_callback) {

// session_mgr ut析构时发现会pop出nullptr

return;

}

(*cc_callback)();

delete cc_callback;

cc_inflight_cnt_--;

}

};


uint32_t count = 0;


// 到此处后,开始处理

uint64_t last_fetch_dog_s = GetTimePointS();

uint64_t cur_fetch_dog_s;

FeedWatchDog(last_fetch_dog_s);

WakeupPolling();

do {

if (cc_inflight_cnt_ == 0 && MdsTaskCntGet() == 0) {

SemWait();

}


// 轮流pollong,防止饿死任务

func_vec[count & MDS_POLLING_TYPE_MASK]();

count++;


// 没有极致时延要求,可以做一下睡眠

// 极致时延要求下可以去掉

cur_fetch_dog_s = GetTimePointS();

if (LAVA_TIME_AFTER64(cur_fetch_dog_s, (last_fetch_dog_s + 1))) {

last_fetch_dog_s = cur_fetch_dog_s;

FeedWatchDog(cur_fetch_dog_s);

}

} while (running_flag_);

});


uint32_t interval = READ_CONFIG(sfs_mds_checkout_fs_msecs);

MDSTimer::GetInstance()->AddEventAfter(interval, [this]() { CheckOutFS(); });


interval = READ_CONFIG(sfs_mds_timer_report_msecs);

MDSTimer::GetInstance()->AddEventAfter(interval, [this]() { TimerReportCouter(); });


// 后台清理sst key

interval = READ_CONFIG(sfs_mds_free_msecs);

MDSTimer::GetInstance()->AddEventAfter(interval, [this]() { CheckShardsstConf(); });

return true;

}

bool LogicMDS::DispatchCCTask(std::function<void()> *task) {

bool ret = false;

ret = cc_req_queue_.push(task);

if (ret && cc_inflight_cnt_ < 1024) CCSemPost();


return ret;

}

void LogicMDS::CCSemWait(void) {

do {

int32_t ret;

ret = sem_wait(&cc_sem_);

if (0 == ret) {

break;

}

} while (errno != ETIMEDOUT);

}


void LogicMDS::CCSemPost(void) {

sem_post(&cc_sem_);

}

这是实际项目中使用信号量的一段代码,这段代码中使用了信号量(semaphore)来实现线程间的同步和任务调度。具体来说,它使用了三个信号量:sem_cc_sem_cp_sem_,分别用于不同的工作线程(mds线程、chunkclient线程和checkpoint线程)的唤醒。

在分析之前,我们先回顾一下条件变量和信号量的区别:

  • 条件变量(condition variable)通常与互斥锁(mutex)一起使用,用于等待某个条件成立。等待时,条件变量会释放互斥锁,并在被唤醒后重新获取锁。它需要循环检查条件,因为可能有虚假唤醒。

  • 信号量是一个计数器,用于控制对共享资源的访问。它有两个操作:wait(P操作,减少信号量)和post(V操作,增加信号量)。如果信号量为0,则wait会阻塞,直到信号量变为正数。

在代码中,信号量被用于工作线程的唤醒,当有任务加入队列时,通过post操作增加信号量,从而唤醒等待的线程。这类似于条件变量的通知机制。

具体分析:

  1. chunkclient_work_thread_线程(处理chunkclient任务):

    • 使用 cc_sem_信号量。
    • 当任务队列 cc_req_queue_为空且没有正在处理的任务(cc_inflight_cnt_为0)时,线程调用 CCSemWait()等待信号量。
    • 当有新任务加入队列时,DispatchCCTask函数会调用 CCSemPost()来增加信号量,从而唤醒等待的线程。
  2. checkpoint_work_thread_线程(处理checkpoint任务):

    • 使用 cp_sem_信号量(虽然没有在代码片段中直接看到CPSemWaitCPSemPost,但根据命名和对称性,可以推测存在类似的操作)。
    • 当任务队列 cp_req_queue_为空时,线程调用 CPSemWait()等待。
    • 当有新的checkpoint任务加入队列时,应该会调用一个类似 DispatchCPTask的函数,该函数会调用 CPSemPost()
  3. mds_work_thread_线程(处理mds任务):

    • 使用 sem_信号量(通过SemWaitSemPost,虽然代码中没有直接展示SemPost,但可以推测在任务加入队列时会调用)。
    • 当没有任务(mds任务和chunkclient回调任务)时,线程调用SemWait()等待。
    • 当有任务加入队列时,应该会调用一个类似DispatchMDSTask的函数,该函数会调用SemPost()

信号量的使用在这里是合理的,因为每个工作线程都在等待任务到来,而任务到来时通过post操作唤醒一个等待的线程。这实际上是用信号量实现了一种“生产者-消费者”模型,其中生产者将任务放入队列并post信号量,消费者(工作线程)等待信号量然后从队列中取出任务。

但是,这里使用信号量确实实现了条件变量的部分功能,即等待和唤醒。但是,信号量和条件变量在使用上有一些区别:

  • 条件变量通常需要与互斥锁配合使用,以保护共享数据(如任务队列)的访问。而在这段代码中,任务队列是使用无锁队列(例如cc_req_queue_moodycamel::ConcurrentQueue)实现的,因此不需要额外的互斥锁。所以,使用信号量直接唤醒线程是合适的。

  • 信号量的计数特性使得它可以记录尚未处理的通知次数(即信号量的计数值表示有多少个任务加入队列后还没有被处理)。而条件变量没有计数,如果多次通知可能只唤醒一次(但通常条件变量使用时会结合一个条件,比如队列非空,所以不会丢失任务)。

在这个实现中,信号量的使用是合理的,因为:

  1. 任务队列是无锁的,所以不需要在等待时保护队列

  2. 信号量的post操作在任务入队后执行,确保只要有任务,工作线程就会被唤醒。

信号量使用分析

这段代码使用信号量实现了类似条件变量的功能,用于线程间的任务调度和唤醒机制。我们来分析其设计合理性以及是否成功实现了条件变量的功能。

1. 信号量初始化

1
2
3
4
5
LAVA_ABORT(sem_init(&sem_, 0, 0) == 0);
LAVA_ABORT(sem_init(&cc_sem_, 0, 0) == 0);
LAVA_ABORT(sem_init(&cp_sem_, 0, 0) == 0);


  • 初始化三个信号量,初始值都为0
  • 用作线程唤醒机制,而非资源计数

2. 信号量等待模式

1
2
3
4
5
6
7
8
9
10
11
void LogicMDS::CCSemWait(void) {
do {
int32_t ret;
ret = sem_wait(&cc_sem_);
if (0 == ret) {
break;
}
} while (errno != ETIMEDOUT);
}


这段代码有bug,通常sem_wait不会返回ETIMEDOUT,sem_timedwait才会返回ETIMEDOUT,上述代码while (errno != ETIMEDOUT)相当于while (true)。

3. 信号量发布

1
2
3
4
5
void LogicMDS::CCSemPost(void) {
sem_post(&cc_sem_);
}


  • 标准信号量发布操作
  • 用于唤醒等待线程

4. 任务调度中的信号量使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// chunkclient 线程
if (cc_req_queue_.pop(cc_task)) {
// 处理任务
} else {
if (cc_inflight_cnt_) {
// 处理回调
} else {
CCSemWait(); // 等待信号量
}
}

// checkpoint 线程
if (cp_req_queue_.pop(checkpoint_op)) {
// 处理任务
} else {
CPSemWait(); // 等待信号量
}

// mds 线程
if (cc_inflight_cnt_ == 0 && MdsTaskCntGet() == 0) {
SemWait(); // 等待信号量
}

  • 信号量用于线程阻塞等待新任务

  • 当任务队列为空且无待处理任务时,线程进入等待状态

  • 有新任务时通过 SemPost唤醒线程

与条件变量对比

实现的类似功能

  1. ​​线程阻塞等待​​:

    • 信号量等待 (CCSemWait) ≈ cond_var.wait(lock)
  2. ​​线程唤醒​​:

    • 信号量发布 (CCSemPost) ≈ cond_var.notify_one()
  3. ​​任务调度​​:

    • 当队列为空时阻塞 ≈ 条件变量的谓词等待

关键差异

  1. ​​状态管理​​:

    • 信号量:内置计数器,不依赖外部状态
    • 条件变量:需要外部谓词状态(如 !queue.empty()
  2. ​​虚假唤醒​​:

    • 信号量:不会发生虚假唤醒
    • 条件变量:可能发生,需要循环检查
  3. ​​锁机制​​:

    • 信号量:不强制要求与锁配合
    • 条件变量:必须与互斥锁配合使用
  4. ​​唤醒粒度​​:

    • 信号量:不指定唤醒哪个线程
    • 条件变量:可精确唤醒一个或所有线程

信号量 vs 条件变量的效率分析

高负载场景(大量请求)

信号量优势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  // 高负载时,信号量效率更高
  while (true) {

      if (cc_req_queue_.pop(cc_task)) {

          // 直接处理,无需锁竞争

          cc_inflight_cnt_++;

          (*cc_task)();

          delete cc_task;

      }

  }

优势

  1. 无锁队列 + 信号量 = 零锁竞争
  2. sem_post() 只是一个原子操作
  3. sem_wait() 在有任务时立即返回

条件变量劣势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  // 高负载时,条件变量的锁竞争问题

  pthread_mutex_lock(&cc_mutex_);

  if (cc_req_queue_.pop(task)) {

      pthread_mutex_unlock(&cc_mutex_);

      // 处理任务...

  } else {

      pthread_cond_wait(&cc_cond_, &cc_mutex_);  // 涉及锁的释放和获取

      pthread_mutex_unlock(&cc_mutex_);

  }

劣势

  1. 每次操作都需要获取/释放锁
  2. 多线程竞争同一个锁
  3. pthread_cond_wait() 的开销更大

低负载场景(少量请求)

信号量的CPU浪费问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  // LogicMDS中的定时器机制

  void LogicMDS::CCWakeupPolling(void) {

      CCSemPost();  // 定时唤醒,即使没有任务


      uint32_t timer_check_interval = READ_CONFIG(sfs_mds_check_interval_msecs);

      // 比如:每10ms唤醒一次

      MDSTimer::GetInstance()->AddEventAfter(10, [this]() {

          CCWakeupPolling();

      });

  }

问题:

  1. 每10ms唤醒线程,即使队列为空
  2. 线程被唤醒后检查队列,发现为空,再次等待
  3. 产生不必要的CPU上下文切换

条件变量的节能优势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  // 条件变量只在真正有任务时唤醒

  void ChunkClientThread() {

      pthread_mutex_lock(&cc_mutex_);

      while (cc_req_queue_.empty() && running_flag_) {

          pthread_cond_wait(&cc_cond_, &cc_mutex_);  // 真正休眠,零CPU消耗

      }

      pthread_mutex_unlock(&cc_mutex_);


      // 被唤醒时一定有任务

  }

优势:

  1. 没有任务时线程完全休眠
  2. 只在有任务时才唤醒
  3. 无定时器的额外开销

参考资料

有了信号量为什么还要用条件变量
Unix环境高级编程

如果你觉得本文对你有帮助,欢迎打赏