2 * A simple little concurrent work queue module.
5 // #define _POSIX_C_SOURCE 200809L
6 // #define _ISOC11_SOURCE
15 #include "workqueue.h"
18 # define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__)
21 #endif /* DEBUG_SILLY */
23 #define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE)
24 #define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE)
25 #define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE))
26 #define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE)
28 #if defined(__GNUC__) || defined(__clang__)
29 # define ALWAYS_INLINE __attribute__((always_inline))
31 # define ALWAYS_INLINE
35 * Shorthand for locking with error report.
38 lock_(pthread_mutex_t
*mutex
)
40 assert(mutex
!= NULL
);
42 if ( (r
= pthread_mutex_lock(mutex
)) ) {
43 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r
));
44 if (r
== EOWNERDEAD
) {
45 /* sanity check call here */
46 if ( (r
= pthread_mutex_consistent(mutex
)) ) {
47 NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r
));
55 * Shorthand for unlocking with error report.
58 unlock_(pthread_mutex_t
*mutex
)
60 assert(mutex
!= NULL
);
62 if ( (r
= pthread_mutex_unlock(mutex
)) ) {
63 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r
));
69 * Release a work struct, either saving it to the free list or freeing the allocation.
72 work_free_(struct workqueue
*wq
, struct work
*work
)
76 if (FLAG_IS_NORECYCLE(wq
->flags
)) {
79 if (lock_(&(wq
->free_mutex
))) {
80 D("freeing work %p", work
);
85 STAILQ_INSERT_TAIL(&(wq
->free_head
), work
, stailq
);
88 (void)unlock_(&(wq
->free_mutex
));
89 D("recycling work %p, %zu available", work
, wq
->n_free
);
95 * Allocate a work struct, either new or from free list.
97 inline static struct work
*
98 work_alloc_(struct workqueue
*wq
)
101 struct work
*work
= NULL
;
103 if (!FLAG_IS_NORECYCLE(wq
->flags
)) {
104 if (lock_(&(wq
->free_mutex
))) {
108 work
= STAILQ_FIRST(&(wq
->free_head
));
110 STAILQ_REMOVE_HEAD(&(wq
->free_head
), stailq
);
114 (void)unlock_(&(wq
->free_mutex
));
118 work
= calloc(1, sizeof *work
);
120 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
122 D("new work %p, %zu", work
, wq
->n_free
);
124 D("recycled work %p, %zu available", work
, wq
->n_free
);
131 * Trim free list down to #retain items.
134 workqueue_release_work(struct workqueue
*wq
, size_t retain
) {
137 ssize_t released
= 0;
139 if (lock_(&(wq
->free_mutex
))) {
143 while (wq
->n_free
> retain
) {
144 work
= STAILQ_FIRST(&(wq
->free_head
));
145 STAILQ_REMOVE_HEAD(&(wq
->free_head
), stailq
);
149 D("freeing work %p", work
);
152 if (unlock_(&(wq
->free_mutex
))) {
162 assert(data
!= NULL
);
163 struct worker
* const worker
= data
;
164 struct workqueue
* const wq
= worker
->workqueue
;
167 struct work
*work
= NULL
, *priority_work
= NULL
;
169 D("[%zu] started", worker
->id
);
171 while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq
->flags
| worker
->flags
)) {
172 if (lock_(&(wq
->work_mutex
))) {
177 D("[%zu] looking for work", worker
->id
);
178 while ((priority_work
= STAILQ_FIRST(&(worker
->priority_work_head
))) == NULL
179 && (work
= STAILQ_FIRST(&(wq
->work_head
))) == NULL
) {
180 if (FLAG_IS_TERMINATE(wq
->flags
| worker
->flags
)) {
181 D("[%zu] no more work", worker
->id
);
182 if (unlock_(&(wq
->work_mutex
))) {
188 if ( (r
= pthread_cond_wait(&(wq
->work_cond
), &(wq
->work_mutex
))) ) {
189 NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r
));
192 D("[%zu] woke (flags %d|%d)", worker
->id
, worker
->flags
, wq
->flags
);
194 || FLAG_IS_TERMINATE_NOCOMPLETE(wq
->flags
| worker
->flags
)) {
195 if (unlock_(&(wq
->work_mutex
))) {
203 D("[%zu] got priority work %p", worker
->id
, work
);
204 STAILQ_REMOVE_HEAD(&(worker
->priority_work_head
), stailq
);
205 worker
->n_priority_work
-= 1;
206 work
= priority_work
;
208 D("[%zu] got work %p", worker
->id
, work
);
209 STAILQ_REMOVE_HEAD(&(wq
->work_head
), stailq
);
214 if (STAILQ_FIRST(&(wq
->work_head
))) {
215 if ( (r
= pthread_cond_signal(&(wq
->work_cond
))) ) {
216 NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r
));
221 if (unlock_(&(wq
->work_mutex
)) || r
) {
226 work
->fn(work
->data
, worker
->ctx
, worker
->id
);
227 work_free_(wq
, work
);
231 D("[%zu] done (%p)", worker
->id
, retval
);
232 pthread_exit(retval
);
236 workqueue_init(struct workqueue
*wq
, worker_ctx_free_fn_t
*worker_ctx_free_fn
, workqueue_flags_t flags
)
240 pthread_mutexattr_t mutexattr
;
245 wq
->worker_ctx_free_fn
= worker_ctx_free_fn
;
247 wq
->workers_next_id
= 0;
250 wq
->n_work_highwater
= 0;
252 STAILQ_INIT(&(wq
->work_head
));
253 STAILQ_INIT(&(wq
->free_head
));
254 LIST_INIT(&(wq
->workers_head
));
256 if ( (r
= pthread_mutexattr_init(&mutexattr
)) ) {
257 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r
));
260 if ( (r
= pthread_mutexattr_setrobust(&mutexattr
, 1)) ) {
261 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r
));
262 goto err_destroy_mutexattr
;
265 if ( (r
= pthread_mutex_init(&(wq
->free_mutex
), &mutexattr
)) ) {
266 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r
));
267 goto err_destroy_mutexattr
;
270 if ( (r
= pthread_mutex_init(&(wq
->workers_mutex
), &mutexattr
)) ) {
271 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r
));
272 goto err_destroy_freemutex
;
275 if ( (r
= pthread_mutex_init(&(wq
->work_mutex
), &mutexattr
)) ) {
276 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r
));
277 goto err_destroy_workersmutex
;
279 if ( (r
= pthread_cond_init(&(wq
->work_cond
), NULL
)) ) {
280 NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r
));
281 goto err_destroy_workmutex
;
284 if ( (r
= pthread_mutexattr_destroy(&mutexattr
)) ) {
285 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r
));
286 goto err_destroy_workcond
;
291 err_destroy_workcond
:
292 if ( (r
= pthread_cond_destroy(&(wq
->work_cond
))) ) {
293 NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r
));
295 err_destroy_workmutex
:
296 if ( (r
= pthread_mutex_destroy(&(wq
->work_mutex
))) ) {
297 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
299 err_destroy_workersmutex
:
300 if ( (r
= pthread_mutex_destroy(&(wq
->workers_mutex
))) ) {
301 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
303 err_destroy_freemutex
:
304 if ( (r
= pthread_mutex_destroy(&(wq
->free_mutex
))) ) {
305 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
307 err_destroy_mutexattr
:
308 if ( (r
= pthread_mutexattr_destroy(&mutexattr
)) ) {
309 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r
));
316 workqueue_fini(struct workqueue
*wq
, bool flush_unfinished
)
319 struct worker
*worker
, *worker_tmp
;
320 struct work
*w1
, *w2
;
326 workqueue_flags_t new_flags
= WQF_TERMINATE
;
327 if (flush_unfinished
== true) {
328 new_flags
|= WQF_NOCOMPLETE
;
331 wq
->flags
|= new_flags
;
333 if (flush_unfinished
== true) {
334 (void)lock_(&(wq
->work_mutex
));
335 w1
= STAILQ_FIRST(&(wq
->work_head
));
337 D("flushing unfinished work %p", w1
);
338 w2
= STAILQ_NEXT(w1
, stailq
);
342 STAILQ_INIT(&(wq
->work_head
));
343 (void)unlock_(&(wq
->work_mutex
));
346 (void)lock_(&(wq
->workers_mutex
));
348 LIST_FOREACH(worker
, &(wq
->workers_head
), list
) {
349 worker
->flags
|= new_flags
;
352 if ( (r
= pthread_cond_broadcast(&(wq
->work_cond
))) ) {
353 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r
));
356 LIST_FOREACH_SAFE(worker
, &(wq
->workers_head
), list
, worker_tmp
) {
357 LIST_REMOVE(worker
, list
);
360 D("joining worker %zu", worker
->id
);
361 if ( (r
= pthread_join(worker
->thread
, &res
)) ) {
362 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r
));
365 w1
= STAILQ_FIRST(&(worker
->priority_work_head
));
367 D("flushing unfinished priority work %p", w1
);
368 w2
= STAILQ_NEXT(w1
, stailq
);
372 STAILQ_INIT(&(worker
->priority_work_head
));
374 if (wq
->worker_ctx_free_fn
) {
375 wq
->worker_ctx_free_fn(worker
->ctx
);
381 (void)unlock_(&(wq
->workers_mutex
));
383 if ( (r
= pthread_cond_destroy(&(wq
->work_cond
))) ) {
384 NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r
));
386 if ( (r
= pthread_mutex_destroy(&(wq
->work_mutex
))) ) {
387 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
389 if ( (r
= pthread_mutex_destroy(&(wq
->free_mutex
))) ) {
390 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
393 w1
= STAILQ_FIRST(&(wq
->free_head
));
395 D("freeing work %p", w1
);
396 w2
= STAILQ_NEXT(w1
, stailq
);
400 STAILQ_INIT(&(wq
->free_head
));
404 workqueue_worker_add(struct workqueue
*wq
, void *ctx
, workqueue_flags_t flags
)
407 struct worker
*worker
;
408 sigset_t set
, oldset
;
411 if (sigfillset(&set
) < 0) {
412 NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno
));
416 worker
= calloc(1, sizeof *worker
);
417 if (worker
== NULL
) {
418 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
422 worker
->workqueue
= wq
;
424 worker
->flags
= flags
;
426 STAILQ_INIT(&(worker
->priority_work_head
));
427 worker
->n_priority_work
= 0;
429 if ( (r
= pthread_sigmask(SIG_BLOCK
, &set
, &oldset
)) ) {
430 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r
));
431 goto err_free_worker
;
434 if ( (r
= pthread_create(&(worker
->thread
), NULL
, worker_
, worker
)) ) {
435 NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r
));
436 if ((r
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
))) {
437 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r
));
442 if ( (r
= pthread_sigmask(SIG_SETMASK
, &oldset
, NULL
)) ) {
443 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r
));
447 if (lock_(&(wq
->workers_mutex
))) {
451 worker
->id
= wq
->workers_next_id
;
452 wq
->workers_next_id
+= 1;
454 LIST_INSERT_HEAD(&(wq
->workers_head
), worker
, list
);
456 if (unlock_(&(wq
->workers_mutex
))) {
463 worker
->flags
|= WQF_TERMINATE
|WQF_NOCOMPLETE
;
464 if ( (r
= pthread_cond_broadcast(&(wq
->work_cond
))) ) {
465 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r
));
468 if ( (r
= pthread_join(worker
->thread
, &res
)) ) {
469 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r
));
478 workqueue_worker_remove(struct workqueue
*wq
)
481 struct worker
*worker
;
482 struct work
*w1
, *w2
;
486 if (lock_(&(wq
->workers_mutex
))) {
490 worker
= LIST_FIRST(&(wq
->workers_head
));
492 LIST_REMOVE(worker
, list
);
497 if (unlock_(&(wq
->workers_mutex
))) {
501 if (worker
== NULL
) {
502 NOTIFY_ERROR("no workers to remove");
506 worker
->flags
|= WQF_TERMINATE
| WQF_NOCOMPLETE
;
508 if ( (r
= pthread_cond_broadcast(&(wq
->work_cond
))) ) {
509 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r
));
514 if ( (r
= pthread_join(worker
->thread
, &res
)) ) {
515 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r
));
518 w1
= STAILQ_FIRST(&(worker
->priority_work_head
));
520 D("freeing work %p", w1
);
521 w2
= STAILQ_NEXT(w1
, stailq
);
525 STAILQ_INIT(&(wq
->free_head
));
527 if (wq
->worker_ctx_free_fn
) {
528 wq
->worker_ctx_free_fn(worker
->ctx
);
537 workqueue_add(struct workqueue
*wq
, work_fn_t
*fn
, void *data
)
544 if (FLAG_IS_TERMINATE(wq
->flags
)) {
545 NOTIFY_ERROR("not adding work to terminating queue");
549 work
= work_alloc_(wq
);
556 if (lock_(&(wq
->work_mutex
))) {
561 STAILQ_INSERT_TAIL(&(wq
->work_head
), work
, stailq
);
563 if (wq
->n_work
> wq
->n_work_highwater
) {
564 wq
->n_work_highwater
= wq
->n_work
;
567 if ( (r
= pthread_cond_signal(&(wq
->work_cond
))) ) {
568 NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r
));
571 if (unlock_(&(wq
->work_mutex
))) {
579 workqueue_add_all_workers(struct workqueue
*wq
, work_fn_t
*fn
, void *data
)
583 struct worker
*worker
;
588 if (FLAG_IS_TERMINATE(wq
->flags
)) {
589 NOTIFY_ERROR("not adding work to terminating queue");
594 if (lock_(&(wq
->workers_mutex
))) {
599 if (lock_(&(wq
->work_mutex
))) {
601 goto err_unlock_workers
;
604 LIST_FOREACH(worker
, &(wq
->workers_head
), list
) {
605 work
= work_alloc_(wq
);
608 goto err_unlock_work
;
613 STAILQ_INSERT_TAIL(&(worker
->priority_work_head
), work
, stailq
);
614 worker
->n_priority_work
+= 1;
617 if ( (r
= pthread_cond_broadcast(&(wq
->work_cond
))) ) {
618 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r
));
622 if (unlock_(&(wq
->work_mutex
))) {
624 goto err_unlock_workers
;
628 if (unlock_(&(wq
->workers_mutex
))) {