X-Git-Url: http://git.squeep.com/?p=lemu;a=blobdiff_plain;f=workqueue.c;fp=workqueue.c;h=ee24087bf2fb4eca6f13e402cae2ff8902c3d958;hp=0000000000000000000000000000000000000000;hb=3c54afe11e890fc476cc4730226be1c45f8a04cb;hpb=29235d4c1f0b11bd2efcad262eaae70383228293 diff --git a/workqueue.c b/workqueue.c new file mode 100644 index 0000000..ee24087 --- /dev/null +++ b/workqueue.c @@ -0,0 +1,633 @@ +/** + * A simple little concurrent work queue module. + */ + +// #define _POSIX_C_SOURCE 200809L +// #define _ISOC11_SOURCE + +#include +#include +#include +#include +#include + +#include "notify.h" +#include "workqueue.h" + +#ifdef DEBUG_SILLY +# define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__) +#else +# define D(fmt, ...) +#endif /* DEBUG_SILLY */ + +#define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE) +#define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE) +#define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE)) +#define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE) + +#if defined(__GNUC__) || defined(__clang__) +# define ALWAYS_INLINE __attribute__((always_inline)) +#else +# define ALWAYS_INLINE +#endif + +/** + * Shorthand for locking with error report. + */ +inline static int +lock_(pthread_mutex_t *mutex) +{ + assert(mutex != NULL); + int r; + if ( (r = pthread_mutex_lock(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + if (r == EOWNERDEAD) { + /* sanity check call here */ + if ( (r = pthread_mutex_consistent(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r)); + } + } + } + return r; +} ALWAYS_INLINE + +/** + * Shorthand for unlocking with error report. + */ +inline static int +unlock_(pthread_mutex_t *mutex) +{ + assert(mutex != NULL); + int r; + if ( (r = pthread_mutex_unlock(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return r; +} ALWAYS_INLINE + +/** + * Release a work struct, either saving it to the free list or freeing the allocation. + */ +inline static void +work_free_(struct workqueue *wq, struct work *work) +{ + assert(wq != NULL); + if (work) { + if (FLAG_IS_NORECYCLE(wq->flags)) { + free(work); + } else { + if (lock_(&(wq->free_mutex))) { + D("freeing work %p", work); + free(work); + return; + } + + STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq); + wq->n_free += 1; + + (void)unlock_(&(wq->free_mutex)); + D("recycling work %p, %zu available", work, wq->n_free); + } + } +} + +/** + * Allocate a work struct, either new or from free list. + */ +inline static struct work * +work_alloc_(struct workqueue *wq) +{ + assert(wq != NULL); + struct work *work = NULL; + + if (!FLAG_IS_NORECYCLE(wq->flags)) { + if (lock_(&(wq->free_mutex))) { + return NULL; + } + + work = STAILQ_FIRST(&(wq->free_head)); + if (work) { + STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); + wq->n_free -= 1; + } + + (void)unlock_(&(wq->free_mutex)); + } + + if (!work) { + work = calloc(1, sizeof *work); + if (!work) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + } + D("new work %p, %zu", work, wq->n_free); + } else { + D("recycled work %p, %zu available", work, wq->n_free); + } + + return work; +} + +/** + * Trim free list down to #retain items. + */ +ssize_t +workqueue_release_work(struct workqueue *wq, size_t retain) { + assert(wq != NULL); + struct work *work; + ssize_t released = 0; + + if (lock_(&(wq->free_mutex))) { + return -1; + } + + while (wq->n_free > retain) { + work = STAILQ_FIRST(&(wq->free_head)); + STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); + wq->n_free -= 1; + released += 1; + free(work); + D("freeing work %p", work); + } + + if (unlock_(&(wq->free_mutex))) { + return -1; + } + + return released; +} + +static void * +worker_(void *data) +{ + assert(data != NULL); + struct worker * const worker = data; + struct workqueue * const wq = worker->workqueue; + void *retval = NULL; + int r; + struct work *work = NULL, *priority_work = NULL; + + D("[%zu] started", worker->id); + + while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { + if (lock_(&(wq->work_mutex))) { + retval = (void *)-1; + goto done; + } + + D("[%zu] looking for work", worker->id); + while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL + && (work = STAILQ_FIRST(&(wq->work_head))) == NULL) { + if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) { + D("[%zu] no more work", worker->id); + if (unlock_(&(wq->work_mutex))) { + retval = (void *)-1; + } + goto done; + } + + if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r)); + retval = (void *)-1; + } + D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags); + if (r + || FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { + if (unlock_(&(wq->work_mutex))) { + retval = (void *)-1; + } + goto done; + } + } + + if (priority_work) { + D("[%zu] got priority work %p", worker->id, work); + STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq); + worker->n_priority_work -= 1; + work = priority_work; + } else { + D("[%zu] got work %p", worker->id, work); + STAILQ_REMOVE_HEAD(&(wq->work_head), stailq); + wq->n_work -= 1; + } + + r = 0; + if (STAILQ_FIRST(&(wq->work_head))) { + if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); + retval = (void *)-1; + } + } + + if (unlock_(&(wq->work_mutex)) || r) { + retval = (void *)-1; + goto done; + } + + work->fn(work->data, worker->ctx, worker->id); + work_free_(wq, work); + work = NULL; + } +done: + D("[%zu] done (%p)", worker->id, retval); + pthread_exit(retval); +} + +int +workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags) +{ + assert(wq != NULL); + int r; + pthread_mutexattr_t mutexattr; + + D("initializing"); + + wq->flags = flags; + wq->worker_ctx_free_fn = worker_ctx_free_fn; + wq->n_workers = 0; + wq->workers_next_id = 0; + wq->n_free = 0; + wq->n_work = 0; + wq->n_work_highwater = 0; + + STAILQ_INIT(&(wq->work_head)); + STAILQ_INIT(&(wq->free_head)); + LIST_INIT(&(wq->workers_head)); + + if ( (r = pthread_mutexattr_init(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r)); + goto err_init; + } + if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r)); + goto err_destroy_mutexattr; + } + + if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_mutexattr; + } + + if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_freemutex; + } + + if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_workersmutex; + } + if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r)); + goto err_destroy_workmutex; + } + + if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); + goto err_destroy_workcond; + } + + return 0; + +err_destroy_workcond: + if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); + } +err_destroy_workmutex: + if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_workersmutex: + if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_freemutex: + if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_mutexattr: + if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); + } +err_init: + return -1; +} + +void +workqueue_fini(struct workqueue *wq, bool flush_unfinished) +{ + assert(wq != NULL); + struct worker *worker, *worker_tmp; + struct work *w1, *w2; + void *res; + int r; + + D("destroying"); + + workqueue_flags_t new_flags = WQF_TERMINATE; + if (flush_unfinished == true) { + new_flags |= WQF_NOCOMPLETE; + } + + wq->flags |= new_flags; + + if (flush_unfinished == true) { + (void)lock_(&(wq->work_mutex)); + w1 = STAILQ_FIRST(&(wq->work_head)); + while (w1 != NULL) { + D("flushing unfinished work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->work_head)); + (void)unlock_(&(wq->work_mutex)); + } + + (void)lock_(&(wq->workers_mutex)); + + LIST_FOREACH(worker, &(wq->workers_head), list) { + worker->flags |= new_flags; + } + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + + LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) { + LIST_REMOVE(worker, list); + wq->n_workers -= 1; + + D("joining worker %zu", worker->id); + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + + w1 = STAILQ_FIRST(&(worker->priority_work_head)); + while (w1 != NULL) { + D("flushing unfinished priority work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(worker->priority_work_head)); + + if (wq->worker_ctx_free_fn) { + wq->worker_ctx_free_fn(worker->ctx); + } + + free(worker); + } + + (void)unlock_(&(wq->workers_mutex)); + + if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + w1 = STAILQ_FIRST(&(wq->free_head)); + while (w1 != NULL) { + D("freeing work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->free_head)); +} + +ssize_t +workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags) +{ + assert(wq != NULL); + struct worker *worker; + sigset_t set, oldset; + int r; + + if (sigfillset(&set) < 0) { + NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno)); + return -1; + } + + worker = calloc(1, sizeof *worker); + if (worker == NULL) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + return -1; + } + + worker->workqueue = wq; + worker->ctx = ctx; + worker->flags = flags; + + STAILQ_INIT(&(worker->priority_work_head)); + worker->n_priority_work = 0; + + if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + goto err_free_worker; + } + + if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) { + NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r)); + if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + } + goto err_cancel; + } + + if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + goto err_cancel; + } + + if (lock_(&(wq->workers_mutex))) { + goto err_cancel; + } + + worker->id = wq->workers_next_id; + wq->workers_next_id += 1; + + LIST_INSERT_HEAD(&(wq->workers_head), worker, list); + + if (unlock_(&(wq->workers_mutex))) { + goto err_cancel; + } + + return worker->id; + +err_cancel: + worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE; + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + void *res; + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + +err_free_worker: + free(worker); + return -1; +} + +ssize_t +workqueue_worker_remove(struct workqueue *wq) +{ + assert(wq != NULL); + struct worker *worker; + struct work *w1, *w2; + ssize_t retval = -1; + int r; + + if (lock_(&(wq->workers_mutex))) { + return -1; + } + + worker = LIST_FIRST(&(wq->workers_head)); + if (worker) { + LIST_REMOVE(worker, list); + wq->n_workers -= 1; + retval = worker->id; + } + + if (unlock_(&(wq->workers_mutex))) { + retval = -1; + } + + if (worker == NULL) { + NOTIFY_ERROR("no workers to remove"); + return -1; + } + + worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE; + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + retval = -1; + } + + void *res; + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + + w1 = STAILQ_FIRST(&(worker->priority_work_head)); + while (w1 != NULL) { + D("freeing work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->free_head)); + + if (wq->worker_ctx_free_fn) { + wq->worker_ctx_free_fn(worker->ctx); + } + + free(worker); + + return retval; +} + +int +workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data) +{ + assert(wq != NULL); + assert(fn != NULL); + struct work *work; + int r; + + if (FLAG_IS_TERMINATE(wq->flags)) { + NOTIFY_ERROR("not adding work to terminating queue"); + return -1; + } + + work = work_alloc_(wq); + if (work == NULL) { + return -1; + } + work->fn = fn; + work->data = data; + + if (lock_(&(wq->work_mutex))) { + free(work); + return -1; + } + + STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq); + wq->n_work += 1; + if (wq->n_work > wq->n_work_highwater) { + wq->n_work_highwater = wq->n_work; + } + + if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); + } + + if (unlock_(&(wq->work_mutex))) { + return -1; + } + + return 0; +} + +int +workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data) +{ + assert(wq != NULL); + assert(fn != NULL); + struct worker *worker; + struct work *work; + int retval = 0; + int r; + + if (FLAG_IS_TERMINATE(wq->flags)) { + NOTIFY_ERROR("not adding work to terminating queue"); + retval = -1; + goto done; + } + + if (lock_(&(wq->workers_mutex))) { + retval = -1; + goto done; + } + + if (lock_(&(wq->work_mutex))) { + retval = -1; + goto err_unlock_workers; + } + + LIST_FOREACH(worker, &(wq->workers_head), list) { + work = work_alloc_(wq); + if (work == NULL) { + retval = -1; + goto err_unlock_work; + } + work->fn = fn; + work->data = data; + + STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq); + worker->n_priority_work += 1; + } + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + +err_unlock_work: + if (unlock_(&(wq->work_mutex))) { + retval = -1; + goto err_unlock_workers; + } + +err_unlock_workers: + if (unlock_(&(wq->workers_mutex))) { + return -1; + } +done: + return retval; +}