--- /dev/null
+/**
+ * A simple little concurrent work queue module.
+ */
+
+// #define _POSIX_C_SOURCE 200809L
+// #define _ISOC11_SOURCE
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "notify.h"
+#include "workqueue.h"
+
+#ifdef DEBUG_SILLY
+# define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__)
+#else
+# define D(fmt, ...)
+#endif /* DEBUG_SILLY */
+
+#define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE)
+#define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE)
+#define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE))
+#define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE)
+
+#if defined(__GNUC__) || defined(__clang__)
+# define ALWAYS_INLINE __attribute__((always_inline))
+#else
+# define ALWAYS_INLINE
+#endif
+
+/**
+ * Shorthand for locking with error report.
+ */
+inline static int
+lock_(pthread_mutex_t *mutex)
+{
+ assert(mutex != NULL);
+ int r;
+ if ( (r = pthread_mutex_lock(mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ if (r == EOWNERDEAD) {
+ /* sanity check call here */
+ if ( (r = pthread_mutex_consistent(mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r));
+ }
+ }
+ }
+ return r;
+} ALWAYS_INLINE
+
+/**
+ * Shorthand for unlocking with error report.
+ */
+inline static int
+unlock_(pthread_mutex_t *mutex)
+{
+ assert(mutex != NULL);
+ int r;
+ if ( (r = pthread_mutex_unlock(mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ return r;
+} ALWAYS_INLINE
+
+/**
+ * Release a work struct, either saving it to the free list or freeing the allocation.
+ */
+inline static void
+work_free_(struct workqueue *wq, struct work *work)
+{
+ assert(wq != NULL);
+ if (work) {
+ if (FLAG_IS_NORECYCLE(wq->flags)) {
+ free(work);
+ } else {
+ if (lock_(&(wq->free_mutex))) {
+ D("freeing work %p", work);
+ free(work);
+ return;
+ }
+
+ STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq);
+ wq->n_free += 1;
+
+ (void)unlock_(&(wq->free_mutex));
+ D("recycling work %p, %zu available", work, wq->n_free);
+ }
+ }
+}
+
+/**
+ * Allocate a work struct, either new or from free list.
+ */
+inline static struct work *
+work_alloc_(struct workqueue *wq)
+{
+ assert(wq != NULL);
+ struct work *work = NULL;
+
+ if (!FLAG_IS_NORECYCLE(wq->flags)) {
+ if (lock_(&(wq->free_mutex))) {
+ return NULL;
+ }
+
+ work = STAILQ_FIRST(&(wq->free_head));
+ if (work) {
+ STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
+ wq->n_free -= 1;
+ }
+
+ (void)unlock_(&(wq->free_mutex));
+ }
+
+ if (!work) {
+ work = calloc(1, sizeof *work);
+ if (!work) {
+ NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+ }
+ D("new work %p, %zu", work, wq->n_free);
+ } else {
+ D("recycled work %p, %zu available", work, wq->n_free);
+ }
+
+ return work;
+}
+
+/**
+ * Trim free list down to #retain items.
+ */
+ssize_t
+workqueue_release_work(struct workqueue *wq, size_t retain) {
+ assert(wq != NULL);
+ struct work *work;
+ ssize_t released = 0;
+
+ if (lock_(&(wq->free_mutex))) {
+ return -1;
+ }
+
+ while (wq->n_free > retain) {
+ work = STAILQ_FIRST(&(wq->free_head));
+ STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
+ wq->n_free -= 1;
+ released += 1;
+ free(work);
+ D("freeing work %p", work);
+ }
+
+ if (unlock_(&(wq->free_mutex))) {
+ return -1;
+ }
+
+ return released;
+}
+
+static void *
+worker_(void *data)
+{
+ assert(data != NULL);
+ struct worker * const worker = data;
+ struct workqueue * const wq = worker->workqueue;
+ void *retval = NULL;
+ int r;
+ struct work *work = NULL, *priority_work = NULL;
+
+ D("[%zu] started", worker->id);
+
+ while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
+ if (lock_(&(wq->work_mutex))) {
+ retval = (void *)-1;
+ goto done;
+ }
+
+ D("[%zu] looking for work", worker->id);
+ while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL
+ && (work = STAILQ_FIRST(&(wq->work_head))) == NULL) {
+ if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) {
+ D("[%zu] no more work", worker->id);
+ if (unlock_(&(wq->work_mutex))) {
+ retval = (void *)-1;
+ }
+ goto done;
+ }
+
+ if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r));
+ retval = (void *)-1;
+ }
+ D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags);
+ if (r
+ || FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
+ if (unlock_(&(wq->work_mutex))) {
+ retval = (void *)-1;
+ }
+ goto done;
+ }
+ }
+
+ if (priority_work) {
+ D("[%zu] got priority work %p", worker->id, work);
+ STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq);
+ worker->n_priority_work -= 1;
+ work = priority_work;
+ } else {
+ D("[%zu] got work %p", worker->id, work);
+ STAILQ_REMOVE_HEAD(&(wq->work_head), stailq);
+ wq->n_work -= 1;
+ }
+
+ r = 0;
+ if (STAILQ_FIRST(&(wq->work_head))) {
+ if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
+ retval = (void *)-1;
+ }
+ }
+
+ if (unlock_(&(wq->work_mutex)) || r) {
+ retval = (void *)-1;
+ goto done;
+ }
+
+ work->fn(work->data, worker->ctx, worker->id);
+ work_free_(wq, work);
+ work = NULL;
+ }
+done:
+ D("[%zu] done (%p)", worker->id, retval);
+ pthread_exit(retval);
+}
+
+int
+workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags)
+{
+ assert(wq != NULL);
+ int r;
+ pthread_mutexattr_t mutexattr;
+
+ D("initializing");
+
+ wq->flags = flags;
+ wq->worker_ctx_free_fn = worker_ctx_free_fn;
+ wq->n_workers = 0;
+ wq->workers_next_id = 0;
+ wq->n_free = 0;
+ wq->n_work = 0;
+ wq->n_work_highwater = 0;
+
+ STAILQ_INIT(&(wq->work_head));
+ STAILQ_INIT(&(wq->free_head));
+ LIST_INIT(&(wq->workers_head));
+
+ if ( (r = pthread_mutexattr_init(&mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r));
+ goto err_init;
+ }
+ if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r));
+ goto err_destroy_mutexattr;
+ }
+
+ if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ goto err_destroy_mutexattr;
+ }
+
+ if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ goto err_destroy_freemutex;
+ }
+
+ if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ goto err_destroy_workersmutex;
+ }
+ if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r));
+ goto err_destroy_workmutex;
+ }
+
+ if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
+ goto err_destroy_workcond;
+ }
+
+ return 0;
+
+err_destroy_workcond:
+ if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
+ }
+err_destroy_workmutex:
+ if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+err_destroy_workersmutex:
+ if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+err_destroy_freemutex:
+ if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+err_destroy_mutexattr:
+ if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
+ }
+err_init:
+ return -1;
+}
+
+void
+workqueue_fini(struct workqueue *wq, bool flush_unfinished)
+{
+ assert(wq != NULL);
+ struct worker *worker, *worker_tmp;
+ struct work *w1, *w2;
+ void *res;
+ int r;
+
+ D("destroying");
+
+ workqueue_flags_t new_flags = WQF_TERMINATE;
+ if (flush_unfinished == true) {
+ new_flags |= WQF_NOCOMPLETE;
+ }
+
+ wq->flags |= new_flags;
+
+ if (flush_unfinished == true) {
+ (void)lock_(&(wq->work_mutex));
+ w1 = STAILQ_FIRST(&(wq->work_head));
+ while (w1 != NULL) {
+ D("flushing unfinished work %p", w1);
+ w2 = STAILQ_NEXT(w1, stailq);
+ free(w1);
+ w1 = w2;
+ }
+ STAILQ_INIT(&(wq->work_head));
+ (void)unlock_(&(wq->work_mutex));
+ }
+
+ (void)lock_(&(wq->workers_mutex));
+
+ LIST_FOREACH(worker, &(wq->workers_head), list) {
+ worker->flags |= new_flags;
+ }
+
+ if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+ }
+
+ LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) {
+ LIST_REMOVE(worker, list);
+ wq->n_workers -= 1;
+
+ D("joining worker %zu", worker->id);
+ if ( (r = pthread_join(worker->thread, &res)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+ }
+
+ w1 = STAILQ_FIRST(&(worker->priority_work_head));
+ while (w1 != NULL) {
+ D("flushing unfinished priority work %p", w1);
+ w2 = STAILQ_NEXT(w1, stailq);
+ free(w1);
+ w1 = w2;
+ }
+ STAILQ_INIT(&(worker->priority_work_head));
+
+ if (wq->worker_ctx_free_fn) {
+ wq->worker_ctx_free_fn(worker->ctx);
+ }
+
+ free(worker);
+ }
+
+ (void)unlock_(&(wq->workers_mutex));
+
+ if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
+ }
+ if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+ if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+
+ w1 = STAILQ_FIRST(&(wq->free_head));
+ while (w1 != NULL) {
+ D("freeing work %p", w1);
+ w2 = STAILQ_NEXT(w1, stailq);
+ free(w1);
+ w1 = w2;
+ }
+ STAILQ_INIT(&(wq->free_head));
+}
+
+ssize_t
+workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags)
+{
+ assert(wq != NULL);
+ struct worker *worker;
+ sigset_t set, oldset;
+ int r;
+
+ if (sigfillset(&set) < 0) {
+ NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno));
+ return -1;
+ }
+
+ worker = calloc(1, sizeof *worker);
+ if (worker == NULL) {
+ NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+ return -1;
+ }
+
+ worker->workqueue = wq;
+ worker->ctx = ctx;
+ worker->flags = flags;
+
+ STAILQ_INIT(&(worker->priority_work_head));
+ worker->n_priority_work = 0;
+
+ if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+ goto err_free_worker;
+ }
+
+ if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r));
+ if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) {
+ NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+ }
+ goto err_cancel;
+ }
+
+ if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+ goto err_cancel;
+ }
+
+ if (lock_(&(wq->workers_mutex))) {
+ goto err_cancel;
+ }
+
+ worker->id = wq->workers_next_id;
+ wq->workers_next_id += 1;
+
+ LIST_INSERT_HEAD(&(wq->workers_head), worker, list);
+
+ if (unlock_(&(wq->workers_mutex))) {
+ goto err_cancel;
+ }
+
+ return worker->id;
+
+err_cancel:
+ worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE;
+ if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+ }
+ void *res;
+ if ( (r = pthread_join(worker->thread, &res)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+ }
+
+err_free_worker:
+ free(worker);
+ return -1;
+}
+
+ssize_t
+workqueue_worker_remove(struct workqueue *wq)
+{
+ assert(wq != NULL);
+ struct worker *worker;
+ struct work *w1, *w2;
+ ssize_t retval = -1;
+ int r;
+
+ if (lock_(&(wq->workers_mutex))) {
+ return -1;
+ }
+
+ worker = LIST_FIRST(&(wq->workers_head));
+ if (worker) {
+ LIST_REMOVE(worker, list);
+ wq->n_workers -= 1;
+ retval = worker->id;
+ }
+
+ if (unlock_(&(wq->workers_mutex))) {
+ retval = -1;
+ }
+
+ if (worker == NULL) {
+ NOTIFY_ERROR("no workers to remove");
+ return -1;
+ }
+
+ worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE;
+
+ if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+ retval = -1;
+ }
+
+ void *res;
+ if ( (r = pthread_join(worker->thread, &res)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+ }
+
+ w1 = STAILQ_FIRST(&(worker->priority_work_head));
+ while (w1 != NULL) {
+ D("freeing work %p", w1);
+ w2 = STAILQ_NEXT(w1, stailq);
+ free(w1);
+ w1 = w2;
+ }
+ STAILQ_INIT(&(wq->free_head));
+
+ if (wq->worker_ctx_free_fn) {
+ wq->worker_ctx_free_fn(worker->ctx);
+ }
+
+ free(worker);
+
+ return retval;
+}
+
+int
+workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data)
+{
+ assert(wq != NULL);
+ assert(fn != NULL);
+ struct work *work;
+ int r;
+
+ if (FLAG_IS_TERMINATE(wq->flags)) {
+ NOTIFY_ERROR("not adding work to terminating queue");
+ return -1;
+ }
+
+ work = work_alloc_(wq);
+ if (work == NULL) {
+ return -1;
+ }
+ work->fn = fn;
+ work->data = data;
+
+ if (lock_(&(wq->work_mutex))) {
+ free(work);
+ return -1;
+ }
+
+ STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq);
+ wq->n_work += 1;
+ if (wq->n_work > wq->n_work_highwater) {
+ wq->n_work_highwater = wq->n_work;
+ }
+
+ if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
+ }
+
+ if (unlock_(&(wq->work_mutex))) {
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data)
+{
+ assert(wq != NULL);
+ assert(fn != NULL);
+ struct worker *worker;
+ struct work *work;
+ int retval = 0;
+ int r;
+
+ if (FLAG_IS_TERMINATE(wq->flags)) {
+ NOTIFY_ERROR("not adding work to terminating queue");
+ retval = -1;
+ goto done;
+ }
+
+ if (lock_(&(wq->workers_mutex))) {
+ retval = -1;
+ goto done;
+ }
+
+ if (lock_(&(wq->work_mutex))) {
+ retval = -1;
+ goto err_unlock_workers;
+ }
+
+ LIST_FOREACH(worker, &(wq->workers_head), list) {
+ work = work_alloc_(wq);
+ if (work == NULL) {
+ retval = -1;
+ goto err_unlock_work;
+ }
+ work->fn = fn;
+ work->data = data;
+
+ STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq);
+ worker->n_priority_work += 1;
+ }
+
+ if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+ }
+
+err_unlock_work:
+ if (unlock_(&(wq->work_mutex))) {
+ retval = -1;
+ goto err_unlock_workers;
+ }
+
+err_unlock_workers:
+ if (unlock_(&(wq->workers_mutex))) {
+ return -1;
+ }
+done:
+ return retval;
+}