rough framework
[lemu] / workqueue.h
diff --git a/workqueue.h b/workqueue.h
new file mode 100644 (file)
index 0000000..0c4d40e
--- /dev/null
@@ -0,0 +1,108 @@
+#ifndef _WORKQUEUE_H_
+#define _WORKQUEUE_H_
+
+#include <pthread.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <sys/types.h>
+
+#include "bsd_queue.h"
+
+/**
+ * Function signature to execute with user data, thread context, and thread id.
+ */
+typedef void (work_fn_t)(void *, void *, size_t);
+
+/**
+ * Function signature to free a worker thread's ctx.
+ */
+typedef void (worker_ctx_free_fn_t)(void *);
+
+/**
+ * Function signature for receiving error strings.
+ */
+typedef int (printf_fn_t) (const char *format, ...);
+
+/**
+ * Flag type for controlling worker behavior.
+ * We only need three.
+ */
+typedef uint_fast8_t workqueue_flags_t;
+
+/**
+ * Internal unit of work.
+ */
+struct work {
+       STAILQ_ENTRY(work) stailq;
+       work_fn_t *fn;
+       void *data;
+};
+
+enum workqueue_flags {
+       WQF_TERMINATE = (1<<0), /* worker thread will exit when out of work */
+       WQF_NOCOMPLETE = (1<<1), /* worker thread will not finish all work before exiting */
+       WQF_NORECYCLE = (1<<2), /* do not reuse work allocations */
+};
+
+/**
+ * A worker thread, with related data.
+ */
+struct worker {
+       pthread_t thread;
+       struct workqueue *workqueue;
+       void *ctx;
+       LIST_ENTRY(worker) list;
+       size_t id;
+       volatile workqueue_flags_t flags;
+
+       /* Queue of worker-specific work to perform, before consuming general work queue. */
+       /* Guarded by workqueue->work_mutex */
+       STAILQ_HEAD(priority_work_head, work) priority_work_head;
+       size_t n_priority_work;
+       /* End of workqueue->work_mutex guard */
+};
+
+/**
+ * 
+ */
+struct workqueue {
+       /* List of active worker threads. */
+       pthread_mutex_t workers_mutex;
+       LIST_HEAD(workers_head, worker) workers_head;
+       size_t n_workers;
+       size_t workers_next_id;
+       /* End of workers_mutex guard */
+
+       worker_ctx_free_fn_t *worker_ctx_free_fn;
+
+       /* Queue of work units awaiting processing. */
+       pthread_mutex_t work_mutex;
+       pthread_cond_t work_cond;
+       STAILQ_HEAD(work_head, work) work_head;
+       size_t n_work;
+       size_t n_work_highwater;
+       /* End of work_mutex guard */
+
+       /* Queue of allocated unutilized work units. */
+       pthread_mutex_t free_mutex;
+       STAILQ_HEAD(free_head, work) free_head;
+       size_t n_free;
+       /* End of free_mutex guard */
+
+       volatile workqueue_flags_t flags;
+};
+
+/**
+ * Initialize a workqueue.
+ */
+int workqueue_init(struct workqueue *, worker_ctx_free_fn_t *, workqueue_flags_t);
+void workqueue_fini(struct workqueue *, bool);
+
+ssize_t workqueue_worker_add(struct workqueue *, void *, workqueue_flags_t);
+ssize_t workqueue_worker_remove(struct workqueue *);
+
+ssize_t workqueue_release_work(struct workqueue *, size_t);
+
+int workqueue_add(struct workqueue *, work_fn_t *, void *);
+
+#endif /* _WORKQUEUE_H_ */