开始之前

这篇文章主要是为了帮助大家熟悉 POSIX 线程库以及在实际开发中使用它的特性。我们会具体讲解如何利用这个线程库定义的不同工具 来解决编程中的问题。当然这里隐含了一个假设,就是读者已经了解过并行编程(或者多进程)的相关概念,如果没有这些背景知识 的话,读者可能会感觉到很难理解。不过也没关系,我的另一篇教程里边有专门为只具备线性编程思维的读者提供了对并行编程理论 和相关术语的讲解。

同样的,我假设聪明的你已经熟悉了异步编程模型,那些经常使用桌面环境的人会更容易去接受多线程编程的理念。

当我们谈到 POSIX 线程的时候,肯定会有人心生疑惑:“我们应该使用哪个标准下的 POSIX 线程?”。由于 POSIX 线程标准已经修订了好 多年,人们会发现,依据不同标准的实现有不同的函数集,不同的默认值和不同的细微差别。所以在此说明的是,本教程使用的是 v0.5 版的 Linux 内核中的线程库,使用其他操作系统和使用其他版本的读者,需要阅读一下你们对应的系统文档来同本文中的实例进行对应。同时,有些示例代码中使用到了阻塞式的系统调用,它们不能再用户级的线程库中很好的工作(参考另一篇文章:parallel programming theory tutorial 来获取详细信息)。好了,说了那么多,主要是为了能保证文章中的示例代码能够在其他系统中正常使用,从而提高跨平台性。

什么是线程?为什么要使用线程?

线程是一个迷你版的进程,它们拥有自己的栈,能够执行给定的一段代码。但是不同于进程的是,线程通常与其他线程共享记忆体(而 每个进程都拥有一个独立的记忆体区域)。一个线程组就是一个执行相同代码的线程的集合,他们共用记忆体,可以访问相同的全局变 量,拥有同样的文件描述符等等,他们以并行的方式执行(可能是时间片的方式,或者对于多核心系统,他们会真正平行执行)。

使用线程组而不是普通顺序执行程序的好处是多个操作可以同时进行,当一些事件产生的时候,他们能立马被处理(例如:如果我们有 一个线程处理用户接口,另一个线程处理数据库查询,那么我们可以在处理很多用户查询的同时,依然能够响应用户的输入)。

使用线程组而不是进程组的好处是线程间的上下文切换要比进程间的上下文切换要快很多(上下文切换是指系统从一个正在运行的线程或进程切换到去执行另一个线程或进程)。此外,线程间的通信也远远比进程间通信要高效很多。

线程编程有利也有弊,由于线程组共享记忆体,如果一个线程破坏了记忆体,那么其他线程也要受到牵连。但是进程就不同了,操作系 统会将进程之间隔离开,如果一个进程破坏了它的记忆体,那么其他进程不会受到影响。使用进程的另一个好处是,不同的进程可以运 行在不同的机器上,但是线程必须运行在同一台机器上(至少通常情况下是这样的)。

创建和销毁线程

当一个多线程程序启动执行到main()函数的时候,就会有一个线程运行,这是一个全程线程(full-fledged thread,或者叫主线程),如果想创建一个新的线程,程序中需要使用pthread_create()函数

 1#include <stdio.h>
 2#include <pthread.h>
 3
 4void *do_loop(void *data)
 5{
 6    int i; // counter, to print numbers
 7    int j; // counter, for delay
 8    int me = *((int *)data);
 9    for (i = 0; i < 10; i++) {
10        for (j = 0; j < 50000; j++) // delay loop
11            ;
12        printf("'%d' - Got '%d'\n", me, i);
13    }
14
15    // terminate the thread
16    pthread_exit(NULL);
17}
18
19int main(int argc, char *argv[])
20{
21    int       thr_id;    // thread ID for the newly created thread
22    pthread_t p_thread;  // thread's structure
23    int a = 1;           // thread 1 identifying number
24    int b = 2;           // thread 2 identifying number
25
26    // create a new thread that will execute 'do_loop()'
27    thr_id = pthread_create(&p_thread, NULL, do_loop, (void *)&a);
28
29    // run 'do_loop' in the main thread as well
30    do_loop((void *)&b);
31
32    return 0;
33}

上述这段代码需要特殊说明的是:

  1. 因为执行 main 函数的也是一个线程,所以它在执行do_loop()函数的时候跟它创建的线程是并行的;
  2. pthread_create()函数接收 4 个参数。第一个是提供该线程的相关信息,第二个是用来定义线程的属性,在上面的示例代码中,我 们使用 NULL 来表示使用默认的值,第三个参数是传递一个函数来给线程执行,第四个参数是传递给要执行函数的参数。
  3. 在函数内部使用一个空的循环只是为了演示线程的并行执行。如果你的 CPU 足够快,导致你看到一个线程输出前都是同一个线程的输出,那就把空循环的次数改的更大一些。
  4. 调用pthread_exit()会使当前线程退出,同时释放当前线程占有的私有资源。在线程执行函数的末尾其实是没有必要显示调用这个函数的,因为当函数返回的时候,这个线程自动就会退出。当我们想要退出一个执行中的线程的时候就会非常有用。

使用 gcc 编译多线程程序的时候,我们需要链接 pthread 库。确定你的系统中已经安装了线程库,下面是如何编译我们的第一个示例程序:

1gcc pthread_create.c -o pthread_create -lpthread

用互斥同步线程

使用多线程遇到的基本问题是确保他们不会相互踩到脚,这里指的是在不同的线程中使用同一个数据结构。

例如:假设有两个线程想要修改两个变量。一个线程想将他们都改为 0,另一个想改为 1.如果两个线程同时执行,我们可能会得到这样 的结果:一个变量是 1,而另一个是 0.造成这种结果的原因是上下文切换可能会发生在第一个线程将第一个变量设置为 0 之后,此时第二个线程将两个变量都设置为 1,然后第一个线程恢复执行,将第二个变量设置为 0,这样就会导致第一个变量为 1,第二个变量为 0 的结果。

什么是互斥?

线程库中针对上述问题提供的一个基本的解决方案就是互斥。

创建和初始化一个互斥锁

要创建一个互斥锁,我们首先要申明一个pthread_mutex_t类型的变量,然后再对其进行初始化。最简单的方法就是将它赋值为PTHREAD_MUTEX_INITIALIZER 常量。形如以下代码:

1pthread_mutex_t a_mutex = PTHREAD_MUTEX_INITIALIZER;

这里需要说明的是:用这种方式创建的互斥锁称为“快速互斥锁”,也就是说,如果一个线程已经获得了一个互斥锁的前提下尝试重新获得 这个锁,那么他就会卡死,形成死锁。

还有另一种被称为“可重入锁”的互斥锁,它能够允许线程多次获取锁而不会发生阻塞。如果这个线程释放了锁,那么它将继续持有锁,直 到它做了通获取锁相同次数的释放锁操作为止。这种锁可以使用常量PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP来初始化。

获取和释放互斥锁

想要获取互斥锁,我们需要使用pthread_mutex_lock()函数,该函数尝试获取一把锁,或者当该锁已经被其他线程持有的时候阻塞当 前线程,这种情况下,当前面的线程释放锁的时候,改函数会返回锁给当前的线程。下面是如何获取锁的操作(假设我们已经事先初始化过了):

1int rc = pthread_mutex_lock(&a_mutex);
2if (rc) {
3    /* an error has occurred */
4    perror("pthread_mutex_lock");
5    pthread_exit(NULL);
6}
7
8/* mutex is now locked - do your stuff. */

当线程完成了相关操作(修改变量或数据结构,处理文件或其他的事情),它需要释放锁,此时需要使用pthread_mutex_unlock()函数 ,就像下面这样:

1rc = pthread_mutex_unlock(&a_mutex);
2if (rc) {
3    perror("pthread_mutex_unlock");
4    pthread_exit(NULL);
5}

销毁一个互斥锁

当我们使用完一把锁之后,我们需要对他进行销毁操作。完成使用是指再没有线程需要使用它了,如果只有一个线程不再使用了,而其它 线程还需要使用,那么就不能销毁。销毁互斥锁需要使用到pthread_mutex_destroy()函数:

1rc = pthread_mutex_destroy(&a_mutex);

销毁完之后,变量a_mutex就不能再被用于锁操作了,除非它再次被初始化。也就是说,如果销毁太早,后续还有线程尝试获取或释放 锁,那么该线程会从调用 lock 或者 unlock 的函数得到一个EINVAL的错误。

使用互斥锁——一个完整的例子

我们已经知道了一个互斥锁的完整的生命周期,那么让我们来看一个使用互斥锁的完整程序。程序会产生两个员工来竞争"employee of the day"的荣耀。我们将会创建 3 个线程,一个用来让 Danny 获得"employee of the day"的荣誉,另一个让 Moshe 获得,第三个负责确保只有一个人获得。这里提供两份代码,一个使用了互斥锁,另一个没有使用。运行这些程序,观察他们的区别,你会发现互斥锁对于多线程环境 编程是非常重要的。

  1#include <stdio.h>
  2#include <pthread.h>
  3
  4#define num_employees 2
  5
  6/* global mutex for out program. assignment initializes it */
  7pthread_mutex_t a_mutex = pthread_mutex_initializer;
  8
  9struct employee {
 10    int number;
 11    int id;
 12    char first_name[20];
 13    char last_name[20];
 14    char department[30];
 15    int room_number;
 16};
 17
 18/* global variable - our employee array, with 2 employees */
 19struct employee employees[] = {
 20    { 1, 12345678, "danny", "cohen", "accounting", 101 },
 21    { 2, 87654321, "moshe", "levy", "programmers", 202 },
 22};
 23
 24/* global variable - employee of the day. */
 25struct employee employee_of_the_day;
 26
 27void copy_employee(struct employee *from, struct employee *to)
 28{
 29    int rc; /* contain mutex lock/unlock results */
 30
 31    /* lock the mutex, to assure exclusive access to 'a' and 'b'. */
 32    rc = pthread_mutex_lock(&a_mutex);
 33
 34    to->number = from->number;
 35    to->id = from->id;
 36    strcpy(to->first_name, from->first_name);
 37    strcpy(to->last_name, from->last_name);
 38    strcpy(to->department, from->department);
 39    to->room_number = from->room_number;
 40
 41    /* unlock mutex */
 42    rc = pthread_mutex_unlock(&a_mutex);
 43}
 44
 45
 46void *do_loop(void *data)
 47{
 48    int my_num = *((int *)data);
 49    while (1) {
 50        /* set employee of the day to be the one with number 'my_num'. */
 51        copy_employee(&employees[my_num - 1], &employee_of_the_day);
 52    }
 53}
 54
 55int main(int argc, char *argv[])
 56{
 57    int       i;                /* loop counter */
 58    int       thr_id1;          /* thread id for the first new thread */
 59    int       thr_id2;          /* thread id for the second new thread */
 60    pthread_t p_thread1;        /* first thread's structure */
 61    pthread_t p_thread2;        /* second thread's structure */
 62    int       num1 = 1;         /* thread 1 employee number */
 63    int       num2 = 2;         /* thread 2 employee number */
 64    struct    employee eotd;    /* local copy of 'employee of the day' */
 65    struct    employee *worker; /* pointer to currently checked employee */
 66
 67    /* initialize employee of the day to first 1. */
 68    copy_employee(&employees[0], &employee_of_the_day);
 69
 70    /* create a new thread that will execute 'do_loop' with 1 */
 71    thr_id1 = pthread_create(&p_thread1, null, do_loop, (void *)&num1);
 72
 73    /* create a new thread that will execute 'do_loop' with 2 */
 74    thr_id2 = pthread_create(&p_thread2, null, do_loop, (void *)&num2);
 75
 76    /* run a loop that verfies integrity of 'employee of the day' many */
 77    for (i = 0; i < 6000; i++) {
 78        copy_employee(&employee_of_the_day, &eotd);
 79        worker = &employees[eotd.number - 1];
 80
 81        if (eotd.id != worker->id) {
 82            printf("mismatching 'id', %d != %d (loop '%d')\n", eotd.id, worker->id, i);
 83            exit(0);
 84        }
 85
 86        if (strcmp(eotd.first_name, worker->first_name) != 0) {
 87            printf("mismatching 'first_name', %s != %s (loop '%d')\n", eotd.first_name, worker->first_name, i);
 88            exit(0);
 89        }
 90
 91        if (strcmp(eotd.last_name, worker->last_name) != 0) {
 92            printf("mismatching 'last_name', %s != %s (loop '%d')\n", eotd.last_name, worker->last_name, i);
 93            exit(0);
 94        }
 95
 96        if (strcmp(eotd.department, worker->department) !=0) {
 97            printf("mismatching 'department', %s != %s (loop '%d')\n", eotd.department, worker->department, i);
 98            exit(0);
 99        }
100
101        if (eotd.room_number != worker->room_number) {
102            printf("mismatching 'room_number', %d != %d (loop '%d')\n", eotd.room_number, worker->room_number, i);
103            exit(0);
104        }
105    }
106
107    printf("glory, employees contents was always consistent\n");
108    return 0;
109}

饥饿和死锁

特别需要我们注意的是,由于pthread_mutex_lock()持有锁的时间不确定,当一个线程长时间持有一把锁的时候,其他线程就会产生“饥饿”——因为这些线程一直无法获取资源。然而线程库无法帮助我们避免这种情况的发生,需要我们自己来保证。

线程库也可能会产生“死锁”。所谓的死锁就是多个线程相互等待其他线程释放锁,以获取资源。通常情况下,如果所有线程都阻塞在等待其他线程释放锁,那么它们将不会自己恢复到执行状态。幸运的是,线程库会检测这种情况,当发生死锁的时候,线程库会在最后一个线程调用pthread_mutex_lock()的时候返回一个EDEADLK的错误。我们在开发的时候需要对这种错误采取措施,以避免死锁的发生。

精简同步——条件变量

通过上面对互斥锁的描述我们了解到,互斥能够实现一些简单的线程交互——互斥的访问资源。然而,我们经常需要在线程间使用真正的异步。

  • 在一个服务器程序中,一个线程从客户端接收请求,然后分发给其他线程去处理请求。这些处理请求的线程需要在数据到达的时候能够被唤醒,否的它们需要处于等待状态,而且不会占用 CPU 时间
  • 在一个 GUI 应用中,有一个线程用于读取用户的输入,另一个线程处理图形界面的渲染,第三个线程用于发送请求到服务端并且处理服务端的响应。处理服务端响应的线程在接收到服务端响应的时候需要能够通知图形渲染线程立即将服务端响应的结果渲染出来呈现给用户。读取用户输入的线程需要实时对用户的操作做出响应,例如允许用户取消服务端处理线程的一个长时间操作。

以上例子都需要线程间必须具备相互通信的能力,这就需要使用到条件变量。

什么是条件变量

条件变量是一种允许线程等待某些事件发生的机制。多个线程可能会等待一个条件变量,直到其他线程给这个条件变量发送信号。此时,会有一个线程会被唤醒,然后来处理这个事件。也可以通过广播的形式唤醒所有等待该条件变量的线程。

由于条件变量不提供锁,也就是说我们需要对条件变量单独使用互斥锁,来提供必要的访问控制。

创建和初始化条件变量

创建一个条件变量需要定义一个类型为pthread_cond_t的变量,然后正确的对其进行初始化。初始化条件变量可以简单的用一个叫做PTHREAD_COND_INITIALIZER的宏,也可以通过调用pthread_cond_init()函数。我们这里使用宏来初始化:

1pthread_cond_t got_request = PTHREAD_COND_INITIALIZER;

上述代码就定义并初始化了一个名为got_request的条件变量。

注意:由于PTHREAD_COND_INITIALIZER实际上是一个结构体,只能在条件变量被定义的时候用它来初始化。如果想在运行时对条件变量进行初始化,就只能使用pthread_cond_init()函数了。

给条件变量发送信号

我们可以使用pthread_cond_signal()函数来给条件变量发送信号,以唤其中一个等待该条件变量的线程,或者使用pthread_cond_broadcast()函数来唤醒所有等待该条件变量的线程。下面是使用示例:

1int rc = pthread_cond_signal(&got_request);
2
3// or
4
5int rc = pthread_cond_broadcast(&got_request);

当函数返回后,‘rc’等于 0 表示执行成功,否则失败。返回值代表了错误的类型,EINVAL表示传递的参数不是一个条件变量,ENOMEM表示系统内存溢出了。

注意:成功发送信号并不代表一定有等待的线程被唤醒,因为有可能该条件变量压根就没有被任何线程等待,这种情况下,发送信号就不会做任何事情。如果一个线程在一次信号发送后开始等待该条件变量,那么需要再次发送信号才能唤醒该线程。

等待一个条件变量

如果一个线程发送信号给条件变量,而其他线程可能想要等待该条件变量,那么就需要使用到pthread_cond_wait()或者pthread_cond_timedwait()中的其中一个。这两个函数都接收一个条件变量和一个互斥锁作为参数,当 wait 函数执行的时候会释放锁,然后等待条件变量,并将线程挂起。如果有信号唤醒了这个线程,wait 函数会自动重新获取锁,然后返回。

这两个函数的唯一区别是pthread_cond_timedwait()允许我们指定一个等待的超时时间,当超过这个时间,函数就会立即返回一个ETIMEDOUT的错误,来告诉我们程序在超时时间内没有,条件变量没有获取到信号。而pthread_cond_wait()在条件变量获取到信号前会一直等待。

下面是代码示例,我们假设got_request是一个经过正确初始化的条件变量,request_mutex是一个经过正确初始化的锁,我们来尝试使用pthread_cond_wait()函数:

 1/* first, lock the mutex */
 2int rc = pthread_mutex_lock(&request_mutex);
 3if (rc) { /* an error has occurred */
 4    perror("pthread_mutex_lock");
 5    pthread_exit(NULL);
 6}
 7
 8/* mutex is now locked - wait on the condition variable */
 9/* During the execution of pthread_cond_wait, the mutex is unlocked */
10rc = pthread_cond_wait(&got_request, &request_mutex);
11if (rc == 0) { /* we were awakened due to the cond. variable being signaled */
12               /* The mutex is now locked again by pthread_cond_wait() */
13    /* do your stuff... */
14}
15
16/* finally, unlock the mutex */
17pthread_mutex_unlock(&request_mutex);

下面我们来使用pthread_cond_timedwait()函数:

 1#include <sys/time.h> /* struct timeval definition */
 2#include <unistd.h>   /* declaration of gettimeofday() */
 3
 4struct timeval  now;     /* time when we started waiting. */
 5struct timespec timeout; /* timeout value for the wait function */
 6int    done;             /* are we done waiting? */
 7
 8/* first, lock the mutex */
 9int rc = pthread_mutex_lock(&request_mutex);
10if (rc) { /* an error has occurred. */
11    perror("pthread_mutex_lock");
12    pthread_exit(NULL);
13}
14
15/* mutex is now locked */
16
17/* get current time */
18gettimeofday(&now);
19/* prepare timeout value */
20timeout.tv_sec = now.tv_sec + 5;
21timeout.tv_nsec = now.tv_usec * 1000; /* timeval uses microseconds. */
22                                      /* timespec uses nanoseconds. */
23                                      /* 1 nanosecond = 1000 micro seconds. */
24
25/* wait on the condition variable. */
26/* we use a loop, since a Unix signal might stop the wait before the timeout. */
27done = 0;
28while (!done) {
29    /* remember that pthread_cond_timedwait() unlocks the mutex on entrance */
30    rc = pthread_cond_timedwait(&got_request, &request_mutex, &timeout);
31    switch (rc) {
32        case 0: /* we were awakened due to the cond. variable being signaled */
33                /* the mutex was now locked again by pthread_cond_timedwait. */
34            /* do your stuff here... */
35            done = 0;
36            break;
37        case ETIMEDOUT: /* our time is up */
38            done = 0;
39            break;
40        default:   /* some error occurred (e.g. we got a Unix signal) */
41            break; /* break this switch, but re-do the while loop. */
42    }
43}
44
45/* finally, unlock the mutex */
46pthread_mutex_unlock(&request_mutex);

正如你所看到的,使用 timed wait 的方式更加复杂一些,最好能够将其封装成一个函数,而不是在必要的地方一直写重复的代码。

销毁一个条件变量

当我们用完一个条件变量后,我们需要将它销毁,以释放它占用的系统资源。完成这个操作就需要使用pthread_cond_destroy()函数。 而在销毁之前,一定要确保没有线程等待该条件变量:

1int rc = pthread_cond_destroy(&got_request);
2if (rc == EBUSY) { /* some thread is still waiting on this condition variable */
3    /* handle this case here ... */
4}

当还有线程正在等待该条件变量的时候该怎么办呢?这个要视情况而定,它可能意味着我们对这个条件变量的使用存在缺陷,或者缺少合适的 清理线程的代码。这对程序开发者来说是一个很好的提示,至少在程序调试的时候。它可能不能说明任何问题,但是却很重要。

对条件变量的条件检查

在这里需要对条件变量特别说明的是——如果没有结合一些实际的条件检查,它们的存在通常是毫无意义的。为了解释清楚,我们拿前面说的那个服务端程序的例子来说明。假设我们使用got_request这个条件变量来发送请求到来时需要被处理的信号,然后将其放入请求队列里边。如果有线程正在等待这个条件变量,那么我们可以肯定至少有一个线程会被唤醒来处理这个请求。

然而,假如当一个新的请求到来时,所有线程恰好都正在处理之前的请求呢?此时发送给这个条件变量的信号不会做任何操作(因为所有的线程都在忙于处理之前的请求,没有线程等待该条件变量),当这些线程完成手头上的工作后,就会回来继续等待,而该条件变量不会再次发送信号(例如,没有新的请求到来),此时,至少有一个请求一直处于等待状态,而所有的处理线程都阻塞在等待信号上而无法处理。

为了解决上面的问题,我们需要设置一些整数变量来表示正在等待处理的请求数,然后在每个线程等待条件变量之前检查一下这个变量的值,如果是正数,说明有一些请求正等待处理,然后该线程就需要去先处理完请求,然后再进入休眠,此外,一个线程处理完一个请求后,需要将这个数值减 1,一定要确保数值的准确性。 下面我们来用代码实现以下:

 1/* number of pending requests, initially none. */
 2int num_requests = 0;
 3
 4/* first, lock the mutex */
 5int rc = pthread_mutex_lock(&a_mutex);
 6if (rc) { /* an error has occurred */
 7    perror("pthread_mutex_lock");
 8    pthread_exit(NULL);
 9}
10
11/* mutex is now locked - wait on the condition variable */
12/* if there are no requests to be handled. */
13rc = 0;
14if (num_requests == 0) {
15    rc = pthread_cond_wait(&got_request, &a_mutex);
16}
17
18if (num_requests > 0 && rc == 0) { /* we have a request pending. */
19    /* do your stuff... */
20
21    /* decrease count of pending requests */
22    num_requests--;
23}
24
25/* finally, unlock the mutex */
26pthread_mutex_unlock(&a_mutex);

使用条件变量的完整示例

为了能够更好的说明条件变量的用法,我们用前面描述的服务端程序的例子来做一个完整示例。

  1#include <stdio.h>
  2#define __USE_GNU
  3#include <pthread.h>
  4#include <stdlib.h>
  5
  6#define NUM_HANDLER_THREADS 3
  7
  8/* global mutex for our program. assignment initializes it. */
  9/* note that we use a RECURSIVE mutex, since a handler thread might try to lock
 10 * it twice consecutively. */
 11pthread_mutex_t request_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 12
 13/* global condition variable for our program. assignment initializes it. */
 14pthread_cond_t got_request = PTHREAD_COND_INITIALIZER;
 15
 16/* number for pending requests, initially none. */
 17int num_requests = 0;
 18
 19/* format of a single request. */
 20struct request {
 21    int number;           /* number of the request */
 22    struct request *next; /* pointer to next request, NULL if none. */
 23};
 24
 25/* head of linked list of requests. */
 26struct request *requests = NULL;
 27/* pointer to last request */
 28struct request *last_request = NULL;
 29
 30void add_request(int request_num, pthread_mutex_t *p_mutex,
 31                 pthread_cond_t *p_cond_var) {
 32    /* return code of pthreads functions. */
 33    int rc;
 34
 35    /* pointer to newly added request */
 36    struct request *a_request;
 37
 38    a_request = (struct request *)malloc(sizeof(struct request));
 39    if (!a_request) {
 40        fprintf(stderr, "add_request: out of memory\n");
 41        exit(1);
 42    }
 43
 44    a_request->number = request_num;
 45    a_request->next = NULL;
 46
 47    /* lock the mutex, to assure exclusive access to the list */
 48    rc = pthread_mutex_lock(p_mutex);
 49
 50    /* add new request to the end of the list, updating list */
 51    /* pointers as required */
 52    if (num_requests == 0) {
 53        requests = a_request;
 54        last_request = a_request;
 55    } else {
 56        last_request->next = a_request;
 57        last_request = a_request;
 58    }
 59
 60    num_requests++;
 61
 62#ifdef DEBUG
 63    printf("add_request: added request with id '%d'\n", a_request->number);
 64    fflush(stdout);
 65#endif
 66
 67    /* unlock mutex */
 68    rc = pthread_mutex_unlock(p_mutex);
 69
 70    /* signal the condition variable - there's a new request to handle */
 71    rc = pthread_cond_signal(p_cond_var);
 72}
 73
 74struct request *get_request(pthread_mutex_t *p_mutex) {
 75    int rc;
 76    struct request *a_request;
 77
 78    rc = pthread_mutex_lock(p_mutex);
 79
 80    if (num_requests > 0) {
 81        a_request = requests;
 82        requests = a_request->next;
 83
 84        if (requests == NULL) {
 85            last_request = NULL;
 86        }
 87        num_requests--;
 88    } else {
 89        a_request = NULL;
 90    }
 91
 92    /* unlock mutex */
 93    rc = pthread_mutex_unlock(p_mutex);
 94
 95    return a_request;
 96}
 97
 98void handle_request(struct request *a_request, int thread_id) {
 99    if (a_request) {
100        printf("Thread '%d' handled request '%d'\n", thread_id,
101               a_request->number);
102        fflush(stdout);
103    }
104}
105
106void handle_requests_loop(void *data) {
107    int rc;
108    struct request *a_request;
109    int thread_id = *((int *)data);
110
111#ifdef DEBUG
112    printf("Starting thread '%d'\n", thread_id);
113    fflush(stdout);
114#endif
115
116    /* lock the mutex, to access the requests list exclusively. */
117    rc = pthread_mutex_lock(&request_mutex);
118
119#ifdef DEBUG
120    printf("thread '%d' after pthread_mutex_lock\n", thread_id);
121    fflush(stdout);
122#endif
123
124    /* do forever... */
125    while (1) {
126#ifdef DEBUG
127        printf("thread '%d', number_requests = %d\n", thread_id, num_requests);
128        fflush(stdout);
129#endif
130        if (num_requests > 0) {
131            a_request = get_request(&request_mutex);
132            if (a_request) {
133                handle_request(a_request, thread_id);
134                free(a_request);
135            }
136        } else {
137#ifdef DEBUG
138            printf("thread '%d' before pthread_cond_wait\n", thread_id);
139            fflush(stdout);
140#endif
141            rc = pthread_cond_wait(&got_request, &request_mutex);
142
143#ifdef DEBUG
144            printf("thread '%d' after pthread_cond_wait\n", thread_id);
145            fflush(stdout);
146#endif
147        }
148    }
149}
150
151int main(int argc, char *argv[]) {
152    int i;
153    int thr_id[NUM_HANDLER_THREADS];
154    pthread_t p_threads[NUM_HANDLER_THREADS];
155    struct timespec delay;
156
157    /* create the request-handling threads */
158    for (i = 0; i < NUM_HANDLER_THREADS; i++) {
159        thr_id[i] = i;
160        pthread_create(&p_threads[i], NULL, handle_requests_loop,
161                       (void *)&thr_id[i]);
162    }
163
164    sleep(3);
165
166    /* run a loop that generates requests */
167    for (i = 0; i < 600; i++) {
168        add_request(i, &request_mutex, &got_request);
169        if (rand() > 3 * (RAND_MAX / 4)) {
170            delay.tv_sec = 0;
171            delay.tv_nsec = 10;
172            nanosleep(&delay, NULL);
173        }
174    }
175
176    sleep(5);
177    printf("Glory, we are done.\n");
178
179    return 0;
180}