#ifndef _WORKQUEUE_H_ #define _WORKQUEUE_H_ #include #include #include #include #include "bsd_queue.h" /** * Function signature to execute with user data, thread context, and thread id. */ typedef void (work_fn_t)(void *, void *, size_t); /** * Function signature to free a worker thread's ctx. */ typedef void (worker_ctx_free_fn_t)(void *); /** * Function signature for receiving error strings. */ typedef int (printf_fn_t) (const char *format, ...); /** * Flag type for controlling worker behavior. * We only need three. */ typedef uint_fast8_t workqueue_flags_t; /** * Internal unit of work. */ struct work { STAILQ_ENTRY(work) stailq; work_fn_t *fn; void *data; }; enum workqueue_flags { WQF_TERMINATE = (1<<0), /* worker thread will exit when out of work */ WQF_NOCOMPLETE = (1<<1), /* worker thread will not finish all work before exiting */ WQF_NORECYCLE = (1<<2), /* do not reuse work allocations */ }; /** * A worker thread, with related data. */ struct worker { pthread_t thread; struct workqueue *workqueue; void *ctx; LIST_ENTRY(worker) list; size_t id; volatile workqueue_flags_t flags; /* Queue of worker-specific work to perform, before consuming general work queue. */ /* Guarded by workqueue->work_mutex */ STAILQ_HEAD(priority_work_head, work) priority_work_head; size_t n_priority_work; /* End of workqueue->work_mutex guard */ }; /** * */ struct workqueue { /* List of active worker threads. */ pthread_mutex_t workers_mutex; LIST_HEAD(workers_head, worker) workers_head; size_t n_workers; size_t workers_next_id; /* End of workers_mutex guard */ worker_ctx_free_fn_t *worker_ctx_free_fn; /* Queue of work units awaiting processing. */ pthread_mutex_t work_mutex; pthread_cond_t work_cond; STAILQ_HEAD(work_head, work) work_head; size_t n_work; size_t n_work_highwater; /* End of work_mutex guard */ /* Queue of allocated unutilized work units. */ pthread_mutex_t free_mutex; STAILQ_HEAD(free_head, work) free_head; size_t n_free; /* End of free_mutex guard */ volatile workqueue_flags_t flags; }; /** * Initialize a workqueue. */ int workqueue_init(struct workqueue *, worker_ctx_free_fn_t *, workqueue_flags_t); void workqueue_fini(struct workqueue *, bool); ssize_t workqueue_worker_add(struct workqueue *, void *, workqueue_flags_t); ssize_t workqueue_worker_remove(struct workqueue *); ssize_t workqueue_release_work(struct workqueue *, size_t); int workqueue_add(struct workqueue *, work_fn_t *, void *); #endif /* _WORKQUEUE_H_ */