rough framework
[lemu] / workqueue.c
1 /**
2 * A simple little concurrent work queue module.
3 */
4
5 // #define _POSIX_C_SOURCE 200809L
6 // #define _ISOC11_SOURCE
7
8 #include <stdlib.h>
9 #include <string.h>
10 #include <signal.h>
11 #include <errno.h>
12 #include <assert.h>
13
14 #include "notify.h"
15 #include "workqueue.h"
16
17 #ifdef DEBUG_SILLY
18 # define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__)
19 #else
20 # define D(fmt, ...)
21 #endif /* DEBUG_SILLY */
22
23 #define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE)
24 #define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE)
25 #define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE))
26 #define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE)
27
28 #if defined(__GNUC__) || defined(__clang__)
29 # define ALWAYS_INLINE __attribute__((always_inline))
30 #else
31 # define ALWAYS_INLINE
32 #endif
33
34 /**
35 * Shorthand for locking with error report.
36 */
37 inline static int
38 lock_(pthread_mutex_t *mutex)
39 {
40 assert(mutex != NULL);
41 int r;
42 if ( (r = pthread_mutex_lock(mutex)) ) {
43 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
44 if (r == EOWNERDEAD) {
45 /* sanity check call here */
46 if ( (r = pthread_mutex_consistent(mutex)) ) {
47 NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r));
48 }
49 }
50 }
51 return r;
52 } ALWAYS_INLINE
53
54 /**
55 * Shorthand for unlocking with error report.
56 */
57 inline static int
58 unlock_(pthread_mutex_t *mutex)
59 {
60 assert(mutex != NULL);
61 int r;
62 if ( (r = pthread_mutex_unlock(mutex)) ) {
63 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
64 }
65 return r;
66 } ALWAYS_INLINE
67
68 /**
69 * Release a work struct, either saving it to the free list or freeing the allocation.
70 */
71 inline static void
72 work_free_(struct workqueue *wq, struct work *work)
73 {
74 assert(wq != NULL);
75 if (work) {
76 if (FLAG_IS_NORECYCLE(wq->flags)) {
77 free(work);
78 } else {
79 if (lock_(&(wq->free_mutex))) {
80 D("freeing work %p", work);
81 free(work);
82 return;
83 }
84
85 STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq);
86 wq->n_free += 1;
87
88 (void)unlock_(&(wq->free_mutex));
89 D("recycling work %p, %zu available", work, wq->n_free);
90 }
91 }
92 }
93
94 /**
95 * Allocate a work struct, either new or from free list.
96 */
97 inline static struct work *
98 work_alloc_(struct workqueue *wq)
99 {
100 assert(wq != NULL);
101 struct work *work = NULL;
102
103 if (!FLAG_IS_NORECYCLE(wq->flags)) {
104 if (lock_(&(wq->free_mutex))) {
105 return NULL;
106 }
107
108 work = STAILQ_FIRST(&(wq->free_head));
109 if (work) {
110 STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
111 wq->n_free -= 1;
112 }
113
114 (void)unlock_(&(wq->free_mutex));
115 }
116
117 if (!work) {
118 work = calloc(1, sizeof *work);
119 if (!work) {
120 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
121 }
122 D("new work %p, %zu", work, wq->n_free);
123 } else {
124 D("recycled work %p, %zu available", work, wq->n_free);
125 }
126
127 return work;
128 }
129
130 /**
131 * Trim free list down to #retain items.
132 */
133 ssize_t
134 workqueue_release_work(struct workqueue *wq, size_t retain) {
135 assert(wq != NULL);
136 struct work *work;
137 ssize_t released = 0;
138
139 if (lock_(&(wq->free_mutex))) {
140 return -1;
141 }
142
143 while (wq->n_free > retain) {
144 work = STAILQ_FIRST(&(wq->free_head));
145 STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
146 wq->n_free -= 1;
147 released += 1;
148 free(work);
149 D("freeing work %p", work);
150 }
151
152 if (unlock_(&(wq->free_mutex))) {
153 return -1;
154 }
155
156 return released;
157 }
158
159 static void *
160 worker_(void *data)
161 {
162 assert(data != NULL);
163 struct worker * const worker = data;
164 struct workqueue * const wq = worker->workqueue;
165 void *retval = NULL;
166 int r;
167 struct work *work = NULL, *priority_work = NULL;
168
169 D("[%zu] started", worker->id);
170
171 while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
172 if (lock_(&(wq->work_mutex))) {
173 retval = (void *)-1;
174 goto done;
175 }
176
177 D("[%zu] looking for work", worker->id);
178 while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL
179 && (work = STAILQ_FIRST(&(wq->work_head))) == NULL) {
180 if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) {
181 D("[%zu] no more work", worker->id);
182 if (unlock_(&(wq->work_mutex))) {
183 retval = (void *)-1;
184 }
185 goto done;
186 }
187
188 if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) {
189 NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r));
190 retval = (void *)-1;
191 }
192 D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags);
193 if (r
194 || FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
195 if (unlock_(&(wq->work_mutex))) {
196 retval = (void *)-1;
197 }
198 goto done;
199 }
200 }
201
202 if (priority_work) {
203 D("[%zu] got priority work %p", worker->id, work);
204 STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq);
205 worker->n_priority_work -= 1;
206 work = priority_work;
207 } else {
208 D("[%zu] got work %p", worker->id, work);
209 STAILQ_REMOVE_HEAD(&(wq->work_head), stailq);
210 wq->n_work -= 1;
211 }
212
213 r = 0;
214 if (STAILQ_FIRST(&(wq->work_head))) {
215 if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
216 NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
217 retval = (void *)-1;
218 }
219 }
220
221 if (unlock_(&(wq->work_mutex)) || r) {
222 retval = (void *)-1;
223 goto done;
224 }
225
226 work->fn(work->data, worker->ctx, worker->id);
227 work_free_(wq, work);
228 work = NULL;
229 }
230 done:
231 D("[%zu] done (%p)", worker->id, retval);
232 pthread_exit(retval);
233 }
234
235 int
236 workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags)
237 {
238 assert(wq != NULL);
239 int r;
240 pthread_mutexattr_t mutexattr;
241
242 D("initializing");
243
244 wq->flags = flags;
245 wq->worker_ctx_free_fn = worker_ctx_free_fn;
246 wq->n_workers = 0;
247 wq->workers_next_id = 0;
248 wq->n_free = 0;
249 wq->n_work = 0;
250 wq->n_work_highwater = 0;
251
252 STAILQ_INIT(&(wq->work_head));
253 STAILQ_INIT(&(wq->free_head));
254 LIST_INIT(&(wq->workers_head));
255
256 if ( (r = pthread_mutexattr_init(&mutexattr)) ) {
257 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r));
258 goto err_init;
259 }
260 if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) {
261 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r));
262 goto err_destroy_mutexattr;
263 }
264
265 if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) {
266 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
267 goto err_destroy_mutexattr;
268 }
269
270 if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) {
271 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
272 goto err_destroy_freemutex;
273 }
274
275 if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) {
276 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
277 goto err_destroy_workersmutex;
278 }
279 if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) {
280 NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r));
281 goto err_destroy_workmutex;
282 }
283
284 if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
285 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
286 goto err_destroy_workcond;
287 }
288
289 return 0;
290
291 err_destroy_workcond:
292 if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
293 NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
294 }
295 err_destroy_workmutex:
296 if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
297 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
298 }
299 err_destroy_workersmutex:
300 if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) {
301 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
302 }
303 err_destroy_freemutex:
304 if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
305 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
306 }
307 err_destroy_mutexattr:
308 if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
309 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
310 }
311 err_init:
312 return -1;
313 }
314
315 void
316 workqueue_fini(struct workqueue *wq, bool flush_unfinished)
317 {
318 assert(wq != NULL);
319 struct worker *worker, *worker_tmp;
320 struct work *w1, *w2;
321 void *res;
322 int r;
323
324 D("destroying");
325
326 workqueue_flags_t new_flags = WQF_TERMINATE;
327 if (flush_unfinished == true) {
328 new_flags |= WQF_NOCOMPLETE;
329 }
330
331 wq->flags |= new_flags;
332
333 if (flush_unfinished == true) {
334 (void)lock_(&(wq->work_mutex));
335 w1 = STAILQ_FIRST(&(wq->work_head));
336 while (w1 != NULL) {
337 D("flushing unfinished work %p", w1);
338 w2 = STAILQ_NEXT(w1, stailq);
339 free(w1);
340 w1 = w2;
341 }
342 STAILQ_INIT(&(wq->work_head));
343 (void)unlock_(&(wq->work_mutex));
344 }
345
346 (void)lock_(&(wq->workers_mutex));
347
348 LIST_FOREACH(worker, &(wq->workers_head), list) {
349 worker->flags |= new_flags;
350 }
351
352 if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
353 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
354 }
355
356 LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) {
357 LIST_REMOVE(worker, list);
358 wq->n_workers -= 1;
359
360 D("joining worker %zu", worker->id);
361 if ( (r = pthread_join(worker->thread, &res)) ) {
362 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
363 }
364
365 w1 = STAILQ_FIRST(&(worker->priority_work_head));
366 while (w1 != NULL) {
367 D("flushing unfinished priority work %p", w1);
368 w2 = STAILQ_NEXT(w1, stailq);
369 free(w1);
370 w1 = w2;
371 }
372 STAILQ_INIT(&(worker->priority_work_head));
373
374 if (wq->worker_ctx_free_fn) {
375 wq->worker_ctx_free_fn(worker->ctx);
376 }
377
378 free(worker);
379 }
380
381 (void)unlock_(&(wq->workers_mutex));
382
383 if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
384 NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
385 }
386 if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
387 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
388 }
389 if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
390 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
391 }
392
393 w1 = STAILQ_FIRST(&(wq->free_head));
394 while (w1 != NULL) {
395 D("freeing work %p", w1);
396 w2 = STAILQ_NEXT(w1, stailq);
397 free(w1);
398 w1 = w2;
399 }
400 STAILQ_INIT(&(wq->free_head));
401 }
402
403 ssize_t
404 workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags)
405 {
406 assert(wq != NULL);
407 struct worker *worker;
408 sigset_t set, oldset;
409 int r;
410
411 if (sigfillset(&set) < 0) {
412 NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno));
413 return -1;
414 }
415
416 worker = calloc(1, sizeof *worker);
417 if (worker == NULL) {
418 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
419 return -1;
420 }
421
422 worker->workqueue = wq;
423 worker->ctx = ctx;
424 worker->flags = flags;
425
426 STAILQ_INIT(&(worker->priority_work_head));
427 worker->n_priority_work = 0;
428
429 if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) {
430 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
431 goto err_free_worker;
432 }
433
434 if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) {
435 NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r));
436 if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) {
437 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
438 }
439 goto err_cancel;
440 }
441
442 if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) {
443 NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
444 goto err_cancel;
445 }
446
447 if (lock_(&(wq->workers_mutex))) {
448 goto err_cancel;
449 }
450
451 worker->id = wq->workers_next_id;
452 wq->workers_next_id += 1;
453
454 LIST_INSERT_HEAD(&(wq->workers_head), worker, list);
455
456 if (unlock_(&(wq->workers_mutex))) {
457 goto err_cancel;
458 }
459
460 return worker->id;
461
462 err_cancel:
463 worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE;
464 if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
465 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
466 }
467 void *res;
468 if ( (r = pthread_join(worker->thread, &res)) ) {
469 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
470 }
471
472 err_free_worker:
473 free(worker);
474 return -1;
475 }
476
477 ssize_t
478 workqueue_worker_remove(struct workqueue *wq)
479 {
480 assert(wq != NULL);
481 struct worker *worker;
482 struct work *w1, *w2;
483 ssize_t retval = -1;
484 int r;
485
486 if (lock_(&(wq->workers_mutex))) {
487 return -1;
488 }
489
490 worker = LIST_FIRST(&(wq->workers_head));
491 if (worker) {
492 LIST_REMOVE(worker, list);
493 wq->n_workers -= 1;
494 retval = worker->id;
495 }
496
497 if (unlock_(&(wq->workers_mutex))) {
498 retval = -1;
499 }
500
501 if (worker == NULL) {
502 NOTIFY_ERROR("no workers to remove");
503 return -1;
504 }
505
506 worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE;
507
508 if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
509 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
510 retval = -1;
511 }
512
513 void *res;
514 if ( (r = pthread_join(worker->thread, &res)) ) {
515 NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
516 }
517
518 w1 = STAILQ_FIRST(&(worker->priority_work_head));
519 while (w1 != NULL) {
520 D("freeing work %p", w1);
521 w2 = STAILQ_NEXT(w1, stailq);
522 free(w1);
523 w1 = w2;
524 }
525 STAILQ_INIT(&(wq->free_head));
526
527 if (wq->worker_ctx_free_fn) {
528 wq->worker_ctx_free_fn(worker->ctx);
529 }
530
531 free(worker);
532
533 return retval;
534 }
535
536 int
537 workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data)
538 {
539 assert(wq != NULL);
540 assert(fn != NULL);
541 struct work *work;
542 int r;
543
544 if (FLAG_IS_TERMINATE(wq->flags)) {
545 NOTIFY_ERROR("not adding work to terminating queue");
546 return -1;
547 }
548
549 work = work_alloc_(wq);
550 if (work == NULL) {
551 return -1;
552 }
553 work->fn = fn;
554 work->data = data;
555
556 if (lock_(&(wq->work_mutex))) {
557 free(work);
558 return -1;
559 }
560
561 STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq);
562 wq->n_work += 1;
563 if (wq->n_work > wq->n_work_highwater) {
564 wq->n_work_highwater = wq->n_work;
565 }
566
567 if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
568 NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
569 }
570
571 if (unlock_(&(wq->work_mutex))) {
572 return -1;
573 }
574
575 return 0;
576 }
577
578 int
579 workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data)
580 {
581 assert(wq != NULL);
582 assert(fn != NULL);
583 struct worker *worker;
584 struct work *work;
585 int retval = 0;
586 int r;
587
588 if (FLAG_IS_TERMINATE(wq->flags)) {
589 NOTIFY_ERROR("not adding work to terminating queue");
590 retval = -1;
591 goto done;
592 }
593
594 if (lock_(&(wq->workers_mutex))) {
595 retval = -1;
596 goto done;
597 }
598
599 if (lock_(&(wq->work_mutex))) {
600 retval = -1;
601 goto err_unlock_workers;
602 }
603
604 LIST_FOREACH(worker, &(wq->workers_head), list) {
605 work = work_alloc_(wq);
606 if (work == NULL) {
607 retval = -1;
608 goto err_unlock_work;
609 }
610 work->fn = fn;
611 work->data = data;
612
613 STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq);
614 worker->n_priority_work += 1;
615 }
616
617 if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
618 NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
619 }
620
621 err_unlock_work:
622 if (unlock_(&(wq->work_mutex))) {
623 retval = -1;
624 goto err_unlock_workers;
625 }
626
627 err_unlock_workers:
628 if (unlock_(&(wq->workers_mutex))) {
629 return -1;
630 }
631 done:
632 return retval;
633 }