rough framework
[lemu] / workqueue.c
diff --git a/workqueue.c b/workqueue.c
new file mode 100644 (file)
index 0000000..ee24087
--- /dev/null
@@ -0,0 +1,633 @@
+/**
+ * A simple little concurrent work queue module.
+ */
+
+// #define _POSIX_C_SOURCE 200809L
+// #define _ISOC11_SOURCE
+
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "notify.h"
+#include "workqueue.h"
+
+#ifdef DEBUG_SILLY
+#      define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__)
+#else
+#      define D(fmt, ...)
+#endif /* DEBUG_SILLY */
+
+#define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE)
+#define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE)
+#define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE))
+#define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE)
+
+#if defined(__GNUC__) || defined(__clang__)
+#      define ALWAYS_INLINE __attribute__((always_inline))
+#else
+#      define ALWAYS_INLINE
+#endif
+
+/**
+ * Shorthand for locking with error report.
+ */
+inline static int
+lock_(pthread_mutex_t *mutex)
+{
+       assert(mutex != NULL);
+       int r;
+       if ( (r = pthread_mutex_lock(mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               if (r == EOWNERDEAD) {
+                       /* sanity check call here */
+                       if ( (r = pthread_mutex_consistent(mutex)) ) {
+                               NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r));
+                       }
+               }
+       }
+       return r;
+} ALWAYS_INLINE
+
+/**
+ * Shorthand for unlocking with error report.
+ */
+inline static int
+unlock_(pthread_mutex_t *mutex)
+{
+       assert(mutex != NULL);
+       int r;
+       if ( (r = pthread_mutex_unlock(mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+       return r;
+} ALWAYS_INLINE
+
+/**
+ * Release a work struct, either saving it to the free list or freeing the allocation.
+ */
+inline static void
+work_free_(struct workqueue *wq, struct work *work)
+{
+       assert(wq != NULL);
+       if (work) {
+               if (FLAG_IS_NORECYCLE(wq->flags)) {
+                       free(work);
+               } else {
+                       if (lock_(&(wq->free_mutex))) {
+                               D("freeing work %p", work);
+                               free(work);
+                               return;
+                       }
+
+                       STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq);
+                       wq->n_free += 1;
+
+                       (void)unlock_(&(wq->free_mutex));
+                       D("recycling work %p, %zu available", work, wq->n_free);
+               }
+       }
+}
+
+/**
+ * Allocate a work struct, either new or from free list.
+ */
+inline static struct work *
+work_alloc_(struct workqueue *wq)
+{
+       assert(wq != NULL);
+       struct work *work = NULL;
+
+       if (!FLAG_IS_NORECYCLE(wq->flags)) {
+               if (lock_(&(wq->free_mutex))) {
+                       return NULL;
+               }
+
+               work = STAILQ_FIRST(&(wq->free_head));
+               if (work) {
+                       STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
+                       wq->n_free -= 1;
+               }
+
+               (void)unlock_(&(wq->free_mutex));
+       }
+
+       if (!work) {
+               work = calloc(1, sizeof *work);
+               if (!work) {
+                       NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+               }
+               D("new work %p, %zu", work, wq->n_free);
+       } else {
+               D("recycled work %p, %zu available", work, wq->n_free);
+       }
+
+       return work;
+}
+
+/**
+ * Trim free list down to #retain items.
+ */
+ssize_t
+workqueue_release_work(struct workqueue *wq, size_t retain) {
+       assert(wq != NULL);
+       struct work *work;
+       ssize_t released = 0;
+
+       if (lock_(&(wq->free_mutex))) {
+               return -1;
+       }
+
+       while (wq->n_free > retain) {
+               work = STAILQ_FIRST(&(wq->free_head));
+               STAILQ_REMOVE_HEAD(&(wq->free_head), stailq);
+               wq->n_free -= 1;
+               released += 1;
+               free(work);
+               D("freeing work %p", work);
+       }
+
+       if (unlock_(&(wq->free_mutex))) {
+               return -1;
+       }
+
+       return released;
+}
+
+static void *
+worker_(void *data)
+{
+       assert(data != NULL);
+       struct worker * const worker = data;
+       struct workqueue * const wq = worker->workqueue;
+       void *retval = NULL;
+       int r;
+       struct work *work = NULL, *priority_work = NULL;
+
+       D("[%zu] started", worker->id);
+
+       while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
+               if (lock_(&(wq->work_mutex))) {
+                       retval = (void *)-1;
+                       goto done;
+               }
+
+               D("[%zu] looking for work", worker->id);
+               while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL
+               &&     (work = STAILQ_FIRST(&(wq->work_head))) == NULL) {
+                       if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) {
+                               D("[%zu] no more work", worker->id);
+                               if (unlock_(&(wq->work_mutex))) {
+                                       retval = (void *)-1;
+                               }
+                               goto done;
+                       }
+
+                       if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) {
+                               NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r));
+                               retval = (void *)-1;
+                       }
+                       D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags);
+                       if (r
+                       ||  FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) {
+                               if (unlock_(&(wq->work_mutex))) {
+                                       retval = (void *)-1;
+                               }
+                               goto done;
+                       }
+               }
+
+               if (priority_work) {
+                       D("[%zu] got priority work %p", worker->id, work);
+                       STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq);
+                       worker->n_priority_work -= 1;
+                       work = priority_work;
+               } else {
+                       D("[%zu] got work %p", worker->id, work);
+                       STAILQ_REMOVE_HEAD(&(wq->work_head), stailq);
+                       wq->n_work -= 1;
+               }
+
+               r = 0;
+               if (STAILQ_FIRST(&(wq->work_head))) {
+                       if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
+                               NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
+                               retval = (void *)-1;
+                       }
+               }
+
+               if (unlock_(&(wq->work_mutex)) || r) {
+                       retval = (void *)-1;
+                       goto done;
+               }
+
+               work->fn(work->data, worker->ctx, worker->id);
+               work_free_(wq, work);
+               work = NULL;
+       }
+done:
+       D("[%zu] done (%p)", worker->id, retval);
+       pthread_exit(retval);
+}
+
+int
+workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags)
+{
+       assert(wq != NULL);
+       int r;
+       pthread_mutexattr_t mutexattr;
+
+       D("initializing");
+
+       wq->flags = flags;
+       wq->worker_ctx_free_fn = worker_ctx_free_fn;
+       wq->n_workers = 0;
+       wq->workers_next_id = 0;
+       wq->n_free = 0;
+       wq->n_work = 0;
+       wq->n_work_highwater = 0;
+
+       STAILQ_INIT(&(wq->work_head));
+       STAILQ_INIT(&(wq->free_head));
+       LIST_INIT(&(wq->workers_head));
+
+       if ( (r = pthread_mutexattr_init(&mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r));
+               goto err_init;
+       }
+       if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r));
+               goto err_destroy_mutexattr;
+       }
+
+       if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               goto err_destroy_mutexattr;
+       }
+
+       if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               goto err_destroy_freemutex;
+       }
+
+       if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               goto err_destroy_workersmutex;
+       }
+       if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r));
+               goto err_destroy_workmutex;
+       }
+
+       if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
+               goto err_destroy_workcond;
+       }
+
+       return 0;
+
+err_destroy_workcond:
+       if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
+       }
+err_destroy_workmutex:
+       if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+err_destroy_workersmutex:
+       if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+err_destroy_freemutex:
+       if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+err_destroy_mutexattr:
+       if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
+       }
+err_init:
+       return -1;
+}
+
+void
+workqueue_fini(struct workqueue *wq, bool flush_unfinished)
+{
+       assert(wq != NULL);
+       struct worker *worker, *worker_tmp;
+       struct work *w1, *w2;
+       void *res;
+       int r;
+
+       D("destroying");
+
+       workqueue_flags_t new_flags = WQF_TERMINATE;
+       if (flush_unfinished == true) {
+               new_flags |= WQF_NOCOMPLETE;
+       }
+
+       wq->flags |= new_flags;
+
+       if (flush_unfinished == true) {
+               (void)lock_(&(wq->work_mutex));
+               w1 = STAILQ_FIRST(&(wq->work_head));
+               while (w1 != NULL) {
+                       D("flushing unfinished work %p", w1);
+                       w2 = STAILQ_NEXT(w1, stailq);
+                       free(w1);
+                       w1 = w2;
+               }
+               STAILQ_INIT(&(wq->work_head));
+               (void)unlock_(&(wq->work_mutex));
+       }
+
+       (void)lock_(&(wq->workers_mutex));
+
+       LIST_FOREACH(worker, &(wq->workers_head), list) {
+               worker->flags |= new_flags;
+       }
+
+       if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+       }
+
+       LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) {
+               LIST_REMOVE(worker, list);
+               wq->n_workers -= 1;
+
+               D("joining worker %zu", worker->id);
+               if ( (r = pthread_join(worker->thread, &res)) ) {
+                       NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+               }
+
+               w1 = STAILQ_FIRST(&(worker->priority_work_head));
+               while (w1 != NULL) {
+                       D("flushing unfinished priority work %p", w1);
+                       w2 = STAILQ_NEXT(w1, stailq);
+                       free(w1);
+                       w1 = w2;
+               }
+               STAILQ_INIT(&(worker->priority_work_head));
+
+               if (wq->worker_ctx_free_fn) {
+                       wq->worker_ctx_free_fn(worker->ctx);
+               }
+
+               free(worker);
+       }
+
+       (void)unlock_(&(wq->workers_mutex));
+
+       if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r));
+       }
+       if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+       if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       w1 = STAILQ_FIRST(&(wq->free_head));
+       while (w1 != NULL) {
+               D("freeing work %p", w1);
+               w2 = STAILQ_NEXT(w1, stailq);
+               free(w1);
+               w1 = w2;
+       }
+       STAILQ_INIT(&(wq->free_head));
+}
+
+ssize_t
+workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags)
+{
+       assert(wq != NULL);
+       struct worker *worker;
+       sigset_t set, oldset;
+       int r;
+
+       if (sigfillset(&set) < 0) {
+               NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno));
+               return -1;
+       }
+
+       worker = calloc(1, sizeof *worker);
+       if (worker == NULL) {
+               NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+               return -1;
+       }
+
+       worker->workqueue = wq;
+       worker->ctx = ctx;
+       worker->flags = flags;
+
+       STAILQ_INIT(&(worker->priority_work_head));
+       worker->n_priority_work = 0;
+
+       if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+               goto err_free_worker;
+       }
+
+       if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r));
+               if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) {
+                       NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+               }
+               goto err_cancel;
+       }
+
+       if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r));
+               goto err_cancel;
+       }
+
+       if (lock_(&(wq->workers_mutex))) {
+               goto err_cancel;
+       }
+
+       worker->id = wq->workers_next_id;
+       wq->workers_next_id += 1;
+
+       LIST_INSERT_HEAD(&(wq->workers_head), worker, list);
+
+       if (unlock_(&(wq->workers_mutex))) {
+               goto err_cancel;
+       }
+
+       return worker->id;
+
+err_cancel:
+       worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE;
+       if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+       }
+       void *res;
+       if ( (r = pthread_join(worker->thread, &res)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+       }
+
+err_free_worker:
+       free(worker);
+       return -1;
+}
+
+ssize_t
+workqueue_worker_remove(struct workqueue *wq)
+{
+       assert(wq != NULL);
+       struct worker *worker;
+       struct work *w1, *w2;
+       ssize_t retval = -1;
+       int r;
+
+       if (lock_(&(wq->workers_mutex))) {
+               return -1;
+       }
+
+       worker = LIST_FIRST(&(wq->workers_head));
+       if (worker) {
+               LIST_REMOVE(worker, list);
+               wq->n_workers -= 1;
+               retval = worker->id;
+       }
+
+       if (unlock_(&(wq->workers_mutex))) {
+               retval = -1;
+       }
+
+       if (worker == NULL) {
+               NOTIFY_ERROR("no workers to remove");
+               return -1;
+       }
+
+       worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE;
+
+       if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+               retval = -1;
+       }
+
+       void *res;
+       if ( (r = pthread_join(worker->thread, &res)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r));
+       }
+
+       w1 = STAILQ_FIRST(&(worker->priority_work_head));
+       while (w1 != NULL) {
+               D("freeing work %p", w1);
+               w2 = STAILQ_NEXT(w1, stailq);
+               free(w1);
+               w1 = w2;
+       }
+       STAILQ_INIT(&(wq->free_head));
+
+       if (wq->worker_ctx_free_fn) {
+               wq->worker_ctx_free_fn(worker->ctx);
+       }
+
+       free(worker);
+
+       return retval;
+}
+
+int
+workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data)
+{
+       assert(wq != NULL);
+       assert(fn != NULL);
+       struct work *work;
+       int r;
+
+       if (FLAG_IS_TERMINATE(wq->flags)) {
+               NOTIFY_ERROR("not adding work to terminating queue");
+               return -1;
+       }
+
+       work = work_alloc_(wq);
+       if (work == NULL) {
+               return -1;
+       }
+       work->fn = fn;
+       work->data = data;
+
+       if (lock_(&(wq->work_mutex))) {
+               free(work);
+               return -1;
+       }
+
+       STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq);
+       wq->n_work += 1;
+       if (wq->n_work > wq->n_work_highwater) {
+               wq->n_work_highwater = wq->n_work;
+       }
+
+       if ( (r = pthread_cond_signal(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r));
+       }
+
+       if (unlock_(&(wq->work_mutex))) {
+               return -1;
+       }
+
+       return 0;
+}
+
+int
+workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data)
+{
+       assert(wq != NULL);
+       assert(fn != NULL);
+       struct worker *worker;
+       struct work *work;
+       int retval = 0;
+       int r;
+
+       if (FLAG_IS_TERMINATE(wq->flags)) {
+               NOTIFY_ERROR("not adding work to terminating queue");
+               retval = -1;
+               goto done;
+       }
+
+       if (lock_(&(wq->workers_mutex))) {
+               retval = -1;
+               goto done;
+       }
+
+       if (lock_(&(wq->work_mutex))) {
+               retval = -1;
+               goto err_unlock_workers;
+       }
+
+       LIST_FOREACH(worker, &(wq->workers_head), list) {
+               work = work_alloc_(wq);
+               if (work == NULL) {
+                       retval = -1;
+                       goto err_unlock_work;
+               }
+               work->fn = fn;
+               work->data = data;
+
+               STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq);
+               worker->n_priority_work += 1;
+       }
+
+       if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r));
+       }
+
+err_unlock_work:
+       if (unlock_(&(wq->work_mutex))) {
+               retval = -1;
+               goto err_unlock_workers;
+       }
+
+err_unlock_workers:
+       if (unlock_(&(wq->workers_mutex))) {
+               return -1;
+       }
+done:
+       return retval;
+}