/** * A simple little concurrent work queue module. */ // #define _POSIX_C_SOURCE 200809L // #define _ISOC11_SOURCE #include #include #include #include #include #include "notify.h" #include "workqueue.h" #ifdef DEBUG_SILLY # define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__) #else # define D(fmt, ...) #endif /* DEBUG_SILLY */ #define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE) #define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE) #define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE)) #define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE) #if defined(__GNUC__) || defined(__clang__) # define ALWAYS_INLINE __attribute__((always_inline)) #else # define ALWAYS_INLINE #endif /** * Shorthand for locking with error report. */ inline static int lock_(pthread_mutex_t *mutex) { assert(mutex != NULL); int r; if ( (r = pthread_mutex_lock(mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); if (r == EOWNERDEAD) { /* sanity check call here */ if ( (r = pthread_mutex_consistent(mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r)); } } } return r; } ALWAYS_INLINE /** * Shorthand for unlocking with error report. */ inline static int unlock_(pthread_mutex_t *mutex) { assert(mutex != NULL); int r; if ( (r = pthread_mutex_unlock(mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } return r; } ALWAYS_INLINE /** * Release a work struct, either saving it to the free list or freeing the allocation. */ inline static void work_free_(struct workqueue *wq, struct work *work) { assert(wq != NULL); if (work) { if (FLAG_IS_NORECYCLE(wq->flags)) { free(work); } else { if (lock_(&(wq->free_mutex))) { D("freeing work %p", work); free(work); return; } STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq); wq->n_free += 1; (void)unlock_(&(wq->free_mutex)); D("recycling work %p, %zu available", work, wq->n_free); } } } /** * Allocate a work struct, either new or from free list. */ inline static struct work * work_alloc_(struct workqueue *wq) { assert(wq != NULL); struct work *work = NULL; if (!FLAG_IS_NORECYCLE(wq->flags)) { if (lock_(&(wq->free_mutex))) { return NULL; } work = STAILQ_FIRST(&(wq->free_head)); if (work) { STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); wq->n_free -= 1; } (void)unlock_(&(wq->free_mutex)); } if (!work) { work = calloc(1, sizeof *work); if (!work) { NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); } D("new work %p, %zu", work, wq->n_free); } else { D("recycled work %p, %zu available", work, wq->n_free); } return work; } /** * Trim free list down to #retain items. */ ssize_t workqueue_release_work(struct workqueue *wq, size_t retain) { assert(wq != NULL); struct work *work; ssize_t released = 0; if (lock_(&(wq->free_mutex))) { return -1; } while (wq->n_free > retain) { work = STAILQ_FIRST(&(wq->free_head)); STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); wq->n_free -= 1; released += 1; free(work); D("freeing work %p", work); } if (unlock_(&(wq->free_mutex))) { return -1; } return released; } static void * worker_(void *data) { assert(data != NULL); struct worker * const worker = data; struct workqueue * const wq = worker->workqueue; void *retval = NULL; int r; struct work *work = NULL, *priority_work = NULL; D("[%zu] started", worker->id); while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { if (lock_(&(wq->work_mutex))) { retval = (void *)-1; goto done; } D("[%zu] looking for work", worker->id); while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL && (work = STAILQ_FIRST(&(wq->work_head))) == NULL) { if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) { D("[%zu] no more work", worker->id); if (unlock_(&(wq->work_mutex))) { retval = (void *)-1; } goto done; } if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r)); retval = (void *)-1; } D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags); if (r || FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { if (unlock_(&(wq->work_mutex))) { retval = (void *)-1; } goto done; } } if (priority_work) { D("[%zu] got priority work %p", worker->id, work); STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq); worker->n_priority_work -= 1; work = priority_work; } else { D("[%zu] got work %p", worker->id, work); STAILQ_REMOVE_HEAD(&(wq->work_head), stailq); wq->n_work -= 1; } r = 0; if (STAILQ_FIRST(&(wq->work_head))) { if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); retval = (void *)-1; } } if (unlock_(&(wq->work_mutex)) || r) { retval = (void *)-1; goto done; } work->fn(work->data, worker->ctx, worker->id); work_free_(wq, work); work = NULL; } done: D("[%zu] done (%p)", worker->id, retval); pthread_exit(retval); } int workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags) { assert(wq != NULL); int r; pthread_mutexattr_t mutexattr; D("initializing"); wq->flags = flags; wq->worker_ctx_free_fn = worker_ctx_free_fn; wq->n_workers = 0; wq->workers_next_id = 0; wq->n_free = 0; wq->n_work = 0; wq->n_work_highwater = 0; STAILQ_INIT(&(wq->work_head)); STAILQ_INIT(&(wq->free_head)); LIST_INIT(&(wq->workers_head)); if ( (r = pthread_mutexattr_init(&mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r)); goto err_init; } if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r)); goto err_destroy_mutexattr; } if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); goto err_destroy_mutexattr; } if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); goto err_destroy_freemutex; } if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); goto err_destroy_workersmutex; } if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r)); goto err_destroy_workmutex; } if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); goto err_destroy_workcond; } return 0; err_destroy_workcond: if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); } err_destroy_workmutex: if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } err_destroy_workersmutex: if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } err_destroy_freemutex: if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } err_destroy_mutexattr: if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); } err_init: return -1; } void workqueue_fini(struct workqueue *wq, bool flush_unfinished) { assert(wq != NULL); struct worker *worker, *worker_tmp; struct work *w1, *w2; void *res; int r; D("destroying"); workqueue_flags_t new_flags = WQF_TERMINATE; if (flush_unfinished == true) { new_flags |= WQF_NOCOMPLETE; } wq->flags |= new_flags; if (flush_unfinished == true) { (void)lock_(&(wq->work_mutex)); w1 = STAILQ_FIRST(&(wq->work_head)); while (w1 != NULL) { D("flushing unfinished work %p", w1); w2 = STAILQ_NEXT(w1, stailq); free(w1); w1 = w2; } STAILQ_INIT(&(wq->work_head)); (void)unlock_(&(wq->work_mutex)); } (void)lock_(&(wq->workers_mutex)); LIST_FOREACH(worker, &(wq->workers_head), list) { worker->flags |= new_flags; } if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); } LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) { LIST_REMOVE(worker, list); wq->n_workers -= 1; D("joining worker %zu", worker->id); if ( (r = pthread_join(worker->thread, &res)) ) { NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); } w1 = STAILQ_FIRST(&(worker->priority_work_head)); while (w1 != NULL) { D("flushing unfinished priority work %p", w1); w2 = STAILQ_NEXT(w1, stailq); free(w1); w1 = w2; } STAILQ_INIT(&(worker->priority_work_head)); if (wq->worker_ctx_free_fn) { wq->worker_ctx_free_fn(worker->ctx); } free(worker); } (void)unlock_(&(wq->workers_mutex)); if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); } if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } w1 = STAILQ_FIRST(&(wq->free_head)); while (w1 != NULL) { D("freeing work %p", w1); w2 = STAILQ_NEXT(w1, stailq); free(w1); w1 = w2; } STAILQ_INIT(&(wq->free_head)); } ssize_t workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags) { assert(wq != NULL); struct worker *worker; sigset_t set, oldset; int r; if (sigfillset(&set) < 0) { NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno)); return -1; } worker = calloc(1, sizeof *worker); if (worker == NULL) { NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); return -1; } worker->workqueue = wq; worker->ctx = ctx; worker->flags = flags; STAILQ_INIT(&(worker->priority_work_head)); worker->n_priority_work = 0; if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) { NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); goto err_free_worker; } if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) { NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r)); if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) { NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); } goto err_cancel; } if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) { NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); goto err_cancel; } if (lock_(&(wq->workers_mutex))) { goto err_cancel; } worker->id = wq->workers_next_id; wq->workers_next_id += 1; LIST_INSERT_HEAD(&(wq->workers_head), worker, list); if (unlock_(&(wq->workers_mutex))) { goto err_cancel; } return worker->id; err_cancel: worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE; if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); } void *res; if ( (r = pthread_join(worker->thread, &res)) ) { NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); } err_free_worker: free(worker); return -1; } ssize_t workqueue_worker_remove(struct workqueue *wq) { assert(wq != NULL); struct worker *worker; struct work *w1, *w2; ssize_t retval = -1; int r; if (lock_(&(wq->workers_mutex))) { return -1; } worker = LIST_FIRST(&(wq->workers_head)); if (worker) { LIST_REMOVE(worker, list); wq->n_workers -= 1; retval = worker->id; } if (unlock_(&(wq->workers_mutex))) { retval = -1; } if (worker == NULL) { NOTIFY_ERROR("no workers to remove"); return -1; } worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE; if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); retval = -1; } void *res; if ( (r = pthread_join(worker->thread, &res)) ) { NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); } w1 = STAILQ_FIRST(&(worker->priority_work_head)); while (w1 != NULL) { D("freeing work %p", w1); w2 = STAILQ_NEXT(w1, stailq); free(w1); w1 = w2; } STAILQ_INIT(&(wq->free_head)); if (wq->worker_ctx_free_fn) { wq->worker_ctx_free_fn(worker->ctx); } free(worker); return retval; } int workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data) { assert(wq != NULL); assert(fn != NULL); struct work *work; int r; if (FLAG_IS_TERMINATE(wq->flags)) { NOTIFY_ERROR("not adding work to terminating queue"); return -1; } work = work_alloc_(wq); if (work == NULL) { return -1; } work->fn = fn; work->data = data; if (lock_(&(wq->work_mutex))) { free(work); return -1; } STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq); wq->n_work += 1; if (wq->n_work > wq->n_work_highwater) { wq->n_work_highwater = wq->n_work; } if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); } if (unlock_(&(wq->work_mutex))) { return -1; } return 0; } int workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data) { assert(wq != NULL); assert(fn != NULL); struct worker *worker; struct work *work; int retval = 0; int r; if (FLAG_IS_TERMINATE(wq->flags)) { NOTIFY_ERROR("not adding work to terminating queue"); retval = -1; goto done; } if (lock_(&(wq->workers_mutex))) { retval = -1; goto done; } if (lock_(&(wq->work_mutex))) { retval = -1; goto err_unlock_workers; } LIST_FOREACH(worker, &(wq->workers_head), list) { work = work_alloc_(wq); if (work == NULL) { retval = -1; goto err_unlock_work; } work->fn = fn; work->data = data; STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq); worker->n_priority_work += 1; } if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); } err_unlock_work: if (unlock_(&(wq->work_mutex))) { retval = -1; goto err_unlock_workers; } err_unlock_workers: if (unlock_(&(wq->workers_mutex))) { return -1; } done: return retval; }