X-Git-Url: http://git.squeep.com/?p=lemu;a=blobdiff_plain;f=workqueue.h;fp=workqueue.h;h=0c4d40e63cfdaad6e0c16b40e72cb7618dc66da2;hp=0000000000000000000000000000000000000000;hb=3c54afe11e890fc476cc4730226be1c45f8a04cb;hpb=29235d4c1f0b11bd2efcad262eaae70383228293 diff --git a/workqueue.h b/workqueue.h new file mode 100644 index 0000000..0c4d40e --- /dev/null +++ b/workqueue.h @@ -0,0 +1,108 @@ +#ifndef _WORKQUEUE_H_ +#define _WORKQUEUE_H_ + +#include +#include +#include +#include + +#include "bsd_queue.h" + +/** + * Function signature to execute with user data, thread context, and thread id. + */ +typedef void (work_fn_t)(void *, void *, size_t); + +/** + * Function signature to free a worker thread's ctx. + */ +typedef void (worker_ctx_free_fn_t)(void *); + +/** + * Function signature for receiving error strings. + */ +typedef int (printf_fn_t) (const char *format, ...); + +/** + * Flag type for controlling worker behavior. + * We only need three. + */ +typedef uint_fast8_t workqueue_flags_t; + +/** + * Internal unit of work. + */ +struct work { + STAILQ_ENTRY(work) stailq; + work_fn_t *fn; + void *data; +}; + +enum workqueue_flags { + WQF_TERMINATE = (1<<0), /* worker thread will exit when out of work */ + WQF_NOCOMPLETE = (1<<1), /* worker thread will not finish all work before exiting */ + WQF_NORECYCLE = (1<<2), /* do not reuse work allocations */ +}; + +/** + * A worker thread, with related data. + */ +struct worker { + pthread_t thread; + struct workqueue *workqueue; + void *ctx; + LIST_ENTRY(worker) list; + size_t id; + volatile workqueue_flags_t flags; + + /* Queue of worker-specific work to perform, before consuming general work queue. */ + /* Guarded by workqueue->work_mutex */ + STAILQ_HEAD(priority_work_head, work) priority_work_head; + size_t n_priority_work; + /* End of workqueue->work_mutex guard */ +}; + +/** + * + */ +struct workqueue { + /* List of active worker threads. */ + pthread_mutex_t workers_mutex; + LIST_HEAD(workers_head, worker) workers_head; + size_t n_workers; + size_t workers_next_id; + /* End of workers_mutex guard */ + + worker_ctx_free_fn_t *worker_ctx_free_fn; + + /* Queue of work units awaiting processing. */ + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + STAILQ_HEAD(work_head, work) work_head; + size_t n_work; + size_t n_work_highwater; + /* End of work_mutex guard */ + + /* Queue of allocated unutilized work units. */ + pthread_mutex_t free_mutex; + STAILQ_HEAD(free_head, work) free_head; + size_t n_free; + /* End of free_mutex guard */ + + volatile workqueue_flags_t flags; +}; + +/** + * Initialize a workqueue. + */ +int workqueue_init(struct workqueue *, worker_ctx_free_fn_t *, workqueue_flags_t); +void workqueue_fini(struct workqueue *, bool); + +ssize_t workqueue_worker_add(struct workqueue *, void *, workqueue_flags_t); +ssize_t workqueue_worker_remove(struct workqueue *); + +ssize_t workqueue_release_work(struct workqueue *, size_t); + +int workqueue_add(struct workqueue *, work_fn_t *, void *); + +#endif /* _WORKQUEUE_H_ */