rough framework
[lemu] / workqueue.h
1 #ifndef _WORKQUEUE_H_
2 #define _WORKQUEUE_H_
3
4 #include <pthread.h>
5 #include <stdint.h>
6 #include <stdbool.h>
7 #include <sys/types.h>
8
9 #include "bsd_queue.h"
10
11 /**
12 * Function signature to execute with user data, thread context, and thread id.
13 */
14 typedef void (work_fn_t)(void *, void *, size_t);
15
16 /**
17 * Function signature to free a worker thread's ctx.
18 */
19 typedef void (worker_ctx_free_fn_t)(void *);
20
21 /**
22 * Function signature for receiving error strings.
23 */
24 typedef int (printf_fn_t) (const char *format, ...);
25
26 /**
27 * Flag type for controlling worker behavior.
28 * We only need three.
29 */
30 typedef uint_fast8_t workqueue_flags_t;
31
32 /**
33 * Internal unit of work.
34 */
35 struct work {
36 STAILQ_ENTRY(work) stailq;
37 work_fn_t *fn;
38 void *data;
39 };
40
41 enum workqueue_flags {
42 WQF_TERMINATE = (1<<0), /* worker thread will exit when out of work */
43 WQF_NOCOMPLETE = (1<<1), /* worker thread will not finish all work before exiting */
44 WQF_NORECYCLE = (1<<2), /* do not reuse work allocations */
45 };
46
47 /**
48 * A worker thread, with related data.
49 */
50 struct worker {
51 pthread_t thread;
52 struct workqueue *workqueue;
53 void *ctx;
54 LIST_ENTRY(worker) list;
55 size_t id;
56 volatile workqueue_flags_t flags;
57
58 /* Queue of worker-specific work to perform, before consuming general work queue. */
59 /* Guarded by workqueue->work_mutex */
60 STAILQ_HEAD(priority_work_head, work) priority_work_head;
61 size_t n_priority_work;
62 /* End of workqueue->work_mutex guard */
63 };
64
65 /**
66 *
67 */
68 struct workqueue {
69 /* List of active worker threads. */
70 pthread_mutex_t workers_mutex;
71 LIST_HEAD(workers_head, worker) workers_head;
72 size_t n_workers;
73 size_t workers_next_id;
74 /* End of workers_mutex guard */
75
76 worker_ctx_free_fn_t *worker_ctx_free_fn;
77
78 /* Queue of work units awaiting processing. */
79 pthread_mutex_t work_mutex;
80 pthread_cond_t work_cond;
81 STAILQ_HEAD(work_head, work) work_head;
82 size_t n_work;
83 size_t n_work_highwater;
84 /* End of work_mutex guard */
85
86 /* Queue of allocated unutilized work units. */
87 pthread_mutex_t free_mutex;
88 STAILQ_HEAD(free_head, work) free_head;
89 size_t n_free;
90 /* End of free_mutex guard */
91
92 volatile workqueue_flags_t flags;
93 };
94
95 /**
96 * Initialize a workqueue.
97 */
98 int workqueue_init(struct workqueue *, worker_ctx_free_fn_t *, workqueue_flags_t);
99 void workqueue_fini(struct workqueue *, bool);
100
101 ssize_t workqueue_worker_add(struct workqueue *, void *, workqueue_flags_t);
102 ssize_t workqueue_worker_remove(struct workqueue *);
103
104 ssize_t workqueue_release_work(struct workqueue *, size_t);
105
106 int workqueue_add(struct workqueue *, work_fn_t *, void *);
107
108 #endif /* _WORKQUEUE_H_ */