互斥量

一个例子讲清楚线程间同步、互斥量、条件变量、队列、内存池

前段时间有朋友想要了解一下多线程编程,正好有个项目上有这么个例子可以抽出来讲一讲。只要搞清楚这个例子,就一下子掌握了线程间同步、互斥量、条件变量、队列、内存池的概念和使用。 首先,线程间同步的概念。 比如,学过数字电路的人都知道,两个时钟域的信号如果没有经过同步直接接到一起的话,会引起亚稳态。原因是如果恰好输入信号在时钟边沿附近变化的话(不满足建立保持时间的情况下),信号可能处于一个中间电平,这样会导致触发器处于一个振荡状态,引起整块数字电路的不稳定。这就是数字电路中异步的概念,两个时钟都是各自free running,彼此没有关系。 再比如单片机程序中,各个不同的中断程序或者跟主程序间是异步的,因为主程序在执行的过程中随时可能被进来的中断打断,如果中断和主程序之间要通过一个共享的变量传递数据,你就要注意这个共享的变量的保护。假如主程序只读取了一半的数据而被中断打断,然后中断程序中又更新了整个变量,这样的回到主程序继续执行时读到的数据就有一半是上一次的,一半是更新过的。这样的结果显然不是我们想要的。这里只是举了一个很明显的例子。更多的情况可以搜索一下“原子操作”。 所以在多线程环境下,我们就要注意线程间共享变量的保护,这块敏感区域叫临界区(Critical area)。在单片机中,我们用中断开关来保护共享变量读写操作的完整性。在操作系统中,我们用的是互斥锁(mutex)来占有这个变量,防止它被多个线程同时访问。当一个线程访问当前已经被另一个线程占有的变量时,就会进入阻塞态,直到另一个线程完成解锁操作后,这个线程将得到继续执行。 互斥锁(mutex)是多线程编程时最重要的一个工具,用来解决多线程竞争同个资源的问题。其最底层的实现都是一个原子操作来界定lock or unlock。 接下来的例子创建了两个线程,一个是producer, 另一个是cusumer, 它们两个是异步的,中间通过一个队列来通讯。producer 向队列中发送数据,cusumer读取数据。模拟了一个场景:producer 以较快的速度向队列写数据,cusumer 处理数据较慢。这在图像帧处理时经常会碰到CPU处理和发送数据较慢,而外设采集速度较快的情况,这样多余的帧将被丢弃。队列节点使用自己写的一个内存池来分配,在malloc_node 从内存池(free_queue)里取出node; release_node 时把节点放回资源池。当对free_queue 进行操作的时候都要加锁,因为malloc_node 和release_node 可能被不同的线程调用,必须对free_queue 进行保护。这样的函数称之为是线程安全的。同理对enqueue,dequeue的操作也要对队列进行保护。 然后使用条件变量来通知consumer 队列有新数据到来。条件变量同样是被多个线程调用,也是需要带一个mutex 来进行保护的。当条件不满足时,线程会解锁mutex 进入block状态等待消息,这样才不会一直占有CPU。当条件满足或者超时时,才继续执行下面的程序。 例子中使用了pthread(POSIX thread) 的实现。其实各大操作系统都有自己的实现,FreeRTOS, Linux kernel等等,都可以拿代码过来看看学习。 请看这个多线程的例子,可以在online gdb 中运行调试: #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <pthread.h> #include <stdbool.h> #include <assert.h> #include <sys/time.h> #include <errno.h> /\*\*\*\*\*\*\*\*\*\*\*\* queue manage \*\*\*\*\*\*\*\*\*\*\*/ typedef struct Node { void \*data; struct Node \*next; }queue\_node\_t; #define BUFFER\_POOL\_SIZE (640 \* 480) #define BUFFER\_POOL\_NUM 5 struct pbuf{ uint32\_t len; uint8\_t payload\[BUFFER\_POOL\_SIZE\]; }; typedef struct QueueList { int sizeOfQueue; uint16\_t memSize; queue\_node\_t \*head; queue\_node\_t \*tail; }queue\_t; /\* to inform consumer\_thread \*/ static pthread\_cond\_t cap\_cond; static pthread\_mutex\_t cap\_mutex; /\* stream queue for communicate between two threads \*/ static queue\_t strm\_queue; static pthread\_mutex\_t strmq\_mutex; int strm\_queue\_init(){ queue\_t \*q = &strm\_queue; q->sizeOfQueue = 0; q->memSize = 0; q->head = q->tail = NULL; if (pthread\_cond\_init(&cap\_cond, NULL) !