一个例子讲清楚线程间同步、互斥量、条件变量、队列、内存池

前段时间有朋友想要了解一下多线程编程,正好有个项目上有这么个例子可以抽出来讲一讲。只要搞清楚这个例子,就一下子掌握了线程间同步、互斥量、条件变量、队列、内存池的概念和使用。

首先,线程间同步的概念。

比如,学过数字电路的人都知道,两个时钟域的信号如果没有经过同步直接接到一起的话,会引起亚稳态。原因是如果恰好输入信号在时钟边沿附近变化的话(不满足建立保持时间的情况下),信号可能处于一个中间电平,这样会导致触发器处于一个振荡状态,引起整块数字电路的不稳定。这就是数字电路中异步的概念,两个时钟都是各自free running,彼此没有关系。

再比如单片机程序中,各个不同的中断程序或者跟主程序间是异步的,因为主程序在执行的过程中随时可能被进来的中断打断,如果中断和主程序之间要通过一个共享的变量传递数据,你就要注意这个共享的变量的保护。假如主程序只读取了一半的数据而被中断打断,然后中断程序中又更新了整个变量,这样的回到主程序继续执行时读到的数据就有一半是上一次的,一半是更新过的。这样的结果显然不是我们想要的。这里只是举了一个很明显的例子。更多的情况可以搜索一下“原子操作”。

所以在多线程环境下,我们就要注意线程间共享变量的保护,这块敏感区域叫临界区(Critical area)。在单片机中,我们用中断开关来保护共享变量读写操作的完整性。在操作系统中,我们用的是互斥锁(mutex)来占有这个变量,防止它被多个线程同时访问。当一个线程访问当前已经被另一个线程占有的变量时,就会进入阻塞态,直到另一个线程完成解锁操作后,这个线程将得到继续执行。

互斥锁(mutex)是多线程编程时最重要的一个工具,用来解决多线程竞争同个资源的问题。其最底层的实现都是一个原子操作来界定lock or unlock。

接下来的例子创建了两个线程,一个是producer, 另一个是cusumer, 它们两个是异步的,中间通过一个队列来通讯。producer 向队列中发送数据,cusumer读取数据。模拟了一个场景:producer 以较快的速度向队列写数据,cusumer 处理数据较慢。这在图像帧处理时经常会碰到CPU处理和发送数据较慢,而外设采集速度较快的情况,这样多余的帧将被丢弃。队列节点使用自己写的一个内存池来分配,在malloc_node 从内存池(free_queue)里取出node; release_node 时把节点放回资源池。当对free_queue 进行操作的时候都要加锁,因为malloc_node 和release_node 可能被不同的线程调用,必须对free_queue 进行保护。这样的函数称之为是线程安全的。同理对enqueue,dequeue的操作也要对队列进行保护。

然后使用条件变量来通知consumer 队列有新数据到来。条件变量同样是被多个线程调用,也是需要带一个mutex 来进行保护的。当条件不满足时,线程会解锁mutex 进入block状态等待消息,这样才不会一直占有CPU。当条件满足或者超时时,才继续执行下面的程序。

例子中使用了pthread(POSIX thread) 的实现。其实各大操作系统都有自己的实现,FreeRTOS, Linux kernel等等,都可以拿代码过来看看学习。

请看这个多线程的例子,可以在online gdb 中运行调试:

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
#include <assert.h>
#include <sys/time.h>
#include <errno.h>

/************ queue manage ***********/
typedef struct Node
{
    void *data;
    struct Node *next;
}queue_node_t;

#define BUFFER_POOL_SIZE  (640 * 480)
#define BUFFER_POOL_NUM   5

struct pbuf{
    uint32_t len;
    uint8_t payload[BUFFER_POOL_SIZE];
};

typedef struct QueueList
{
    int sizeOfQueue;
    uint16_t memSize;
    queue_node_t *head;
    queue_node_t *tail;
}queue_t;

/* to inform consumer_thread */
static pthread_cond_t cap_cond;
static pthread_mutex_t cap_mutex;

/* stream queue for communicate between two threads */
static queue_t strm_queue;
static pthread_mutex_t strmq_mutex;

int strm_queue_init(){
    queue_t *q = &strm_queue;
    q->sizeOfQueue = 0;
    q->memSize = 0;
    q->head = q->tail = NULL;

    if (pthread_cond_init(&cap_cond, NULL) != 0) {
        printf("pthread_cond_init failed\n");
        exit(-1);
    }
    if (pthread_mutex_init(&cap_mutex, NULL) != 0) {
        printf("pthread_mutex_init failed\n");
        pthread_cond_destroy(&cap_cond);
        exit(-1);
    }

    if (pthread_mutex_init(&strmq_mutex, NULL) != 0) {
        printf("pthread_mutex_init failed\n");
        exit(-1);
    }

    return 0;
}

int malloc_node(queue_node_t **newNode);

int frame_enqueue(void *data, uint32_t len){
    queue_t *q = &strm_queue;
    queue_node_t *newNode = NULL;
    struct pbuf *p;
	assert(len < BUFFER_POOL_SIZE);
	if (malloc_node(&newNode) != true) {
		//printf("no node available!\n");
		return -1;
	}
	p = (struct pbuf *)newNode->data;
	memcpy(p->payload, data, len);
	p->len = len;

	pthread_mutex_lock(&strmq_mutex);
	if (q->sizeOfQueue == 0) {
		q->head = q->tail = newNode;
	} else {
		q->tail->next = newNode;
		q->tail = newNode;
	}
	
	q->sizeOfQueue++;
	pthread_mutex_unlock(&strmq_mutex);
	
	pthread_mutex_lock(&cap_mutex);
	pthread_cond_signal(&cap_cond);
	pthread_mutex_unlock(&cap_mutex);

	return 0;
}

int frame_dequeue(queue_node_t **newNode){
    queue_t *q = &strm_queue;
    bool ret = false;
    pthread_mutex_lock(&strmq_mutex);
    if (q->sizeOfQueue <= 0) {
        pthread_mutex_unlock(&strmq_mutex);
        *newNode = NULL;
        ret = false;
    }else{
        *newNode = q->head;
        if (q->sizeOfQueue > 1) {
            q->head = q->head->next;
        } else {
            q->head = NULL;
            q->tail = NULL;
        }
        q->sizeOfQueue--;
        ret = true;
    }
    pthread_mutex_unlock(&strmq_mutex);

    return ret;
}

/***** node pool manage *****/
static struct pbuf buffer_pool[BUFFER_POOL_NUM];
static queue_node_t queue_node[BUFFER_POOL_NUM];
/* mutex for free_queue when malloc and release node */
pthread_mutex_t freeq_mutex;
queue_t free_queue;

int node_pool_init(){
    int i;
    queue_t *q = &free_queue;
    queue_node_t *newNode; 

    newNode = &queue_node[0];
    newNode->data = (void *)&buffer_pool[0];
    q->head = q->tail = newNode;

    for (i = 1; i < BUFFER_POOL_NUM; i++) {   
       newNode = &queue_node[i];
       newNode->data = (void *)&buffer_pool[i];      
       q->tail->next = newNode;
       q->tail = newNode;
    }
    q->sizeOfQueue = i;

    if (pthread_mutex_init(&freeq_mutex, NULL) != 0) {
        printf("pthread_mutex_init failed\n");
        exit(-1);
    }

    return 0;
}

/* dequeue from free queue */
int malloc_node(queue_node_t **newNode){

    queue_t *q = &free_queue;
    bool ret = false;
    pthread_mutex_lock(&freeq_mutex);
    if (q->sizeOfQueue <= 0) {
        pthread_mutex_unlock(&freeq_mutex);
        *newNode = NULL;
        ret = false;
    }else{
        *newNode = q->head;
        if (q->sizeOfQueue > 1) {
            q->head = q->head->next;
        } else {
            q->head = NULL;
            q->tail = NULL;
        }
        q->sizeOfQueue--;
        ret = true;
    }
    pthread_mutex_unlock(&freeq_mutex);

    return ret;
}

/* tailed to free queue */
int release_node(queue_node_t *newNode ){

    queue_t *q = &free_queue;
    newNode->next = NULL;
    pthread_mutex_lock(&freeq_mutex);
    if (q->sizeOfQueue == 0) {
        q->head = q->tail = newNode;
    } else {
        q->tail->next = newNode;
        q->tail = newNode;
    }

    q->sizeOfQueue++;
    pthread_mutex_unlock(&freeq_mutex);
    return 0;
}

/**************** test for communication between threads *************/
void* producer_thread(void *argv[]){
    char message[100];
    printf("producer start!\n");
    for(int i = 0; i < 100; i++){
        usleep(100000);
        sprintf(message, "Hello %d", i);
        if(0 != frame_enqueue((void *)message, strlen(message))){
            printf("drop message: %s!\n", message);
        };
    }
    printf("producer stop!\n");
}

void* consumer_thread(void *argv[]){
    queue_node_t *newNode;
    int i = 0;
    struct timeval cur_tv;
    struct timespec to;
    int timeout_msec = 5000;
    int ret;
    
    while(1){
        
        gettimeofday(&cur_tv, NULL);
        to.tv_sec = cur_tv.tv_sec + timeout_msec / 1000;
        to.tv_nsec = cur_tv.tv_usec * 1000 + (timeout_msec % 1000) * 1000000;
        
        printf("consumer wait!\n");        
        pthread_mutex_lock(&cap_mutex);
        ret = pthread_cond_timedwait(&cap_cond, &cap_mutex, &to);
        pthread_mutex_unlock(&cap_mutex);
        
        if((ret == ETIMEDOUT)){
            printf("condition wait timeout!\n");
            break;
        }
        while(frame_dequeue(&newNode)){
            /* do some process */
            printf("%s\n", ((struct pbuf *)(newNode->data))->payload);
            usleep(500000);
            release_node(newNode);
            i++;
        }
    }
    printf("consumer get %d message!\n", i);    
    printf("consumer exit!\n");
}

int main(int argc, char *argv[])
{
    int i;
    pthread_t pid1, pid2;
    printf("\n--- test for communication between threads ---\n");
    node_pool_init();    
    strm_queue_init();

    if (pthread_create(&pid2, NULL, consumer_thread, NULL)){
        printf("create consumer failed!\n");
    }

    if (pthread_create(&pid1, NULL, producer_thread, NULL)) {
        printf("create producer failed!\n");
        return -1;
    }
    
    pthread_join(pid1, NULL);
    pthread_join(pid2, NULL);
    printf("\n--- test exit ---!\n");
    return 0;
}


例子中模拟了 producer 的生产速度是cosumer 的5倍,所以当队列满了的时候很多数据就会被丢弃。

若要看其它更简单的使用例子,可以参看libpthread 源码 /tests目录下的小例子。OK!

About: Tagore


One thought on “一个例子讲清楚线程间同步、互斥量、条件变量、队列、内存池”

Leave a Reply

Your email address will not be published. Required fields are marked *