longchute

about

Lock-Free Multiprocess Shared Memory Counter (Atomic CAS) in C

27 Dec 2013

This C code forks a number of processes (children) which each contribute a per_child number of increments to a globally shared counter. The parent process does not wait for the children to exit. Each child sleeps for a random number of nanoseconds between reading the shared counter and attemping to compare-and-swap, in order to generate failed swaps for illustration purposes.

To compile and run: gcc -Wall -march=native -o counter -lrt counter.c && ./counter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include <sys/mman.h>   /*  `shm_open`, `shm_unlink`, `mmap`                */
#include <sys/stat.h>   /*  `S_IRUSR`, `S_IWUSR`                            */
#include <fcntl.h>      /*  `O_RDWR`, `O_CREAT`                             */
#include <string.h>     /*  `strerror`                                      */
#include <unistd.h>     /*  `ftruncate`, `getpid`, `pid_t`, `fork`, `close` */
#include <stdlib.h>     /*  `rand_r`                                        */
#include <errno.h>      /*  `errno`                                         */
#include <stdlib.h>     /*  `abort`                                         */
#include <time.h>       /*  `struct timespec`, `nanosleep`                  */
#include <stdio.h>      /*  `printf`                                        */

#define false 0
#define true ~false

int shm_init(void **shm, int *shm_fd, char *shm_handle);
int child_task(int a_child, void *shm);
int random_sleep(void);

int shm_init(void **shm, int *shm_fd, char *shm_handle) {
    /*  Open a shared memory space  */
    if (!(*shm_fd = shm_open(shm_handle, (O_RDWR | O_CREAT), (S_IRUSR | S_IWUSR)))) {
        printf("%s\n", strerror(errno));
        abort();
    }

    /*  Truncate shared memory to the size of an `int`. */
    if (ftruncate(*shm_fd, sizeof(int)) == -1) {
        printf("ftruncate: %s\n", strerror(errno));
        abort();
    }

    /*  Map the shared space    */
    if (!(*shm = mmap(NULL, sizeof(int), (PROT_READ | PROT_WRITE), (MAP_SHARED | MAP_LOCKED), *shm_fd, 0))) {
        printf("%s\n", strerror(errno));
        abort();
    }

    /*  Unlink so file goes away on last close  */
    if (shm_unlink(shm_handle) == -1) {
        printf("%s\n", strerror(errno));
        abort();
    }

    return true;
}

int child_task(int a_child, void *shm) {
    int per_child       = 5;    /*  Number of increments each worker contributes                    */
    int an_iteration    = 0;    /*  Current increment to contribute                                 */
    int val             = 0;    /*  The next proposed increment to the shared counter               */
    int val_old         = 0;    /*  The last known value of the shared counter                      */
    int val_last        = 0;    /*  The shared counter value when attemping to compare-and-swap     */

    printf("child %d PID: %d\n", a_child, getpid());

    /*  Do `per_child` iterations (increments to the shared counter)    */
    for (; an_iteration < per_child; an_iteration++) {
        while (1) {
            val     = *((int *) shm);           /*  Read shared counter     */
            val_old = val;                      /*  Save old counter        */
            val++;                              /*  Increment local counter */

            /*  Create swap failures for demonstration purposes */
            random_sleep();

            /*  Swap counter if old counter == shared counter   */
            val_last = __sync_val_compare_and_swap((int *) shm, val_old, val);  

            /*  If `val_old` != `val_last`, another child incremented,  */
            if (val_old != val_last) {                                          
                printf("child %d: no swap (%d), val_old: %d, val_last: %d\n", a_child, an_iteration, val_old, val_last);
                continue;
            /*  Otherwise, this child's increment was successful.   */
            } else {                                                            
                printf("child %d: swap (%d), val_old: %d, val_last: %d\n", a_child, an_iteration, val_old, val_last);
            }

            /*  Print the shared counter value contributed by the child.    */
            printf("child %d: %d\n", a_child, val);
            break;
        }
    }

    return true;
}

int random_sleep(void) {
    static unsigned int seed    = 0;    /*  Re-entrant seed for rand_r()                                    */
    struct timespec sleepspec   = {     /*  Time to sleep between reading shared counter and incrementing   */
        .tv_sec                 = 0,
        .tv_nsec                = 0
    };

    sleepspec.tv_nsec = rand_r(&seed);
    nanosleep(&sleepspec, NULL);

    return true;
}

int main(int argc, char *argv[]) {
    char *shm_handle    = "/cas_test_shm";  /*  Handle for shared memory space to hold shared counter           */
    int shm_fd          = 0;                /*  File descriptor for shared memory                               */
    void *shm           = NULL;             /*  Pointer to mmap()'d shared memory                               */ 
    int children        = 4;                /*  Number of worker processes to fork()                            */
    pid_t child         = 0;                /*  Worker PID                                                      */
    int a_child         = 0;                /*  Current worker                                                  */ 

    printf("parent PID: %d\n", getpid());

    /*  Create shared memory space for a counter    */
    shm_init(&shm, &shm_fd, shm_handle);

    /*  Fork the process `children` times   */
    for (; a_child < children; a_child++) {
        if ((child = fork()) < 0)
            abort();

        /*  If we're the child process, else parent process continues the loop  */
        if (child == 0) {
            child_task(a_child, shm);

            /*  Child has contributed `per_child` increments.   */
            printf("child %d: going home\n", a_child);
            if (close(shm_fd) == -1) {
                printf("%s\n", strerror(errno));
                abort();
            }
            
            return true;
        }
    }

    /*  Parent does not wait for children.  */
    printf("parent: going home\n");

    if (close(shm_fd) == -1) {
        printf("%s\n", strerror(errno));
        abort();
    }

    return true;
}