From 3c54afe11e890fc476cc4730226be1c45f8a04cb Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Thu, 25 Mar 2021 11:21:58 -0700 Subject: [PATCH] rough framework --- .gitignore | 5 + Makefile | 88 ++++ README.md | 28 ++ bsd_queue.3 | 1196 +++++++++++++++++++++++++++++++++++++++++++++++ bsd_queue.h | 633 +++++++++++++++++++++++++ command.c | 248 ++++++++++ command.h | 24 + common.c | 150 ++++++ common.h | 20 + connections.c | 927 ++++++++++++++++++++++++++++++++++++ connections.h | 109 +++++ db.c | 179 +++++++ db.h | 22 + lua_interface.c | 168 +++++++ lua_interface.h | 8 + main.c | 93 ++++ notify.c | 52 +++ notify.h | 14 + server.c | 839 +++++++++++++++++++++++++++++++++ server.h | 55 +++ version.h | 12 + version.sh | 41 ++ workqueue.c | 633 +++++++++++++++++++++++++ workqueue.h | 108 +++++ 24 files changed, 5652 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 bsd_queue.3 create mode 100644 bsd_queue.h create mode 100644 command.c create mode 100644 command.h create mode 100644 common.c create mode 100644 common.h create mode 100644 connections.c create mode 100644 connections.h create mode 100644 db.c create mode 100644 db.h create mode 100644 lua_interface.c create mode 100644 lua_interface.h create mode 100644 main.c create mode 100644 notify.c create mode 100644 notify.h create mode 100644 server.c create mode 100644 server.h create mode 100644 version.h create mode 100755 version.sh create mode 100644 workqueue.c create mode 100644 workqueue.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ce4f229 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.o +lemu +conf +.vscode +*.depend diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e20df4b --- /dev/null +++ b/Makefile @@ -0,0 +1,88 @@ +# +# +# + +# hard paths to development versions of libraries +LIBEVENT_PATH ?= /usr +LIBEVENT_H_PATH ?= $(LIBEVENT_PATH)/include +LIBEVENT_CPPFLAGS = -I$(LIBEVENT_H_PATH) +LIBEVENT_LIB_PATH ?= $(LIBEVENT_PATH)/lib +LIBEVENT_LDFLAGS = -L$(LIBEVENT_LIB_PATH) -levent -levent_pthreads -levent_openssl + +# another library +LIBUUID_PATH ?= /usr +LIBUUID_H_PATH ?= $(LIBUUID_PATH)/include +LIBUUID_CPPFLAGS = -I$(LIBUUID_H_PATH) +LIBUUID_LIB_PATH ?= $(LIBUUID_PATH)/lib +LIBUUID_LDFLAGS = -L$(LIBUUID_LIB_PATH) -lossp-uuid + +# yet another +OPENSSL_PATH ?= /usr +OPENSSL_H_PATH ?= $(OPENSSL_PATH)/include +OPENSSL_CPPFLAGS = +OPENSSL_LIB_PATH ?= $(OPENSSL_PATH)/lib +OPENSSL_LDFLAGS = -lssl -lcrypto + +# barking moon lib +LUA_PATH ?= /usr +LUA_H_PATH ?= $(LUA_PATH)/include/lua5.4 +LUA_CPPFLAGS = -I$(LUA_H_PATH) +LUA_LIB_PATH ?= $(LUA_PATH)/lib +LUA_LDFLAGS = -L$(LUA_LIB_PATH) -llua5.4 + +# backing store +SQLITE_PATH ?= /usr +SQLITE_H_PATH ?= $(SQLITE_PATH)/include +SQLITE_CPPFLAGS = -I$(SQLITE_H_PATH) +SQLITE_LIB_PATH ?= $(SQLITE_PATH)/lib +SQLITE_LDFLAGS = -L$(SQLITE_LIB_PATH) -lsqlite3 + +# so many libraries, it's like playing with Lego bricks + +SOURCES = command.c common.c connections.c db.c lua_interface.c main.c notify.c server.c workqueue.c +OBJECTS = $(SOURCES:.c=.o) +TARGET = lemu + +# In order to use the OSSP uuid library on the darwin platform, +# _POSIX_SOURCE needs to be defined so that the platform doesn't +# attempt to provide its own uuid_t type. +CPPFLAGS += -D_POSIX_C_SOURCE=200112L +# Linux needs additional specifications when _POSIX_C_SOURCE is +# defined. +CPPFLAGS += -D_XOPEN_SOURCE=700 + +CPPFLAGS += -D_REENTRANT + +CFLAGS += -DDEBUG_SILLY + +CFLAGS += -g -Wall -Wextra -Werror +CPPFLAGS += $(LIBEVENT_CPPFLAGS) $(LIBUUID_CPPFLAGS) $(OPENSSL_CPPFLAGS) $(LUA_CPPFLAGS) $(SQLITE_CPPFLAGS) +LDFLAGS += $(LIBEVENT_LDFLAGS) $(LIBUUID_LDFLAGS) $(OPENSSL_LDFLAGS) $(LUA_LDFLAGS) $(SQLITE_LDFLAGS) + +ifeq ($(shell uname), Linux) +# Linux needs librt to see some clock and pthread functions. +LDFLAGS += -lrt +# Linux needs -pthread for pthread_mutexattr_init +LDFLAGS += -pthread +CFLAGS += -pthread +endif + +MAKEDEPEND = $(CC) $(CPPFLAGS) -MM + +.PHONY: all clean + +all: $(TARGET) Makefile.depend + +version.h: + ./version.sh 0 0 prototype + +Makefile.depend: $(SOURCES) version.h + $(MAKEDEPEND) $^ > $@ + +$(TARGET): $(OBJECTS) + $(CC) -o $@ $^ $(LDFLAGS) + +clean: + rm -f $(TARGET) *.o Makefile.depend + +-include Makefile.depend diff --git a/README.md b/README.md new file mode 100644 index 0000000..7321bf5 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# LEMU + +This is a (work-in-progress) text-based multi-user server engine, utilizing lua, libevent, and a concurrent work queue. +The intent is for client interaction to be primarily driven by lua scripting, with the server code itself only handling the heavy-lifting of IO, DB, et cetera. + +Nota bene: this is currently a toy implementation, as an architecture for the lua-side has yet to be fleshed out, and is being developed by feel, trial, and error. + +## Overview + +The main thread runs the event loop, which primarily accepts new connections and feeds their line-based inputs into the thread pool for processing. Connection hostname resolution and signal handling are also in the main thread. + +Each connection has its own queue of input. When a new line of input is added to a connection's queue and there were no other lines already queued, a new job is added to the workqueue to process that connection's input. + +The main unresolved issue here is consistent server state-keeping between each worker-thread's lua processor. The current stopgap solution is that each worker is totally independent and any global changes to the lua state will be queued as a task to update the common state. The imagined ideal is a lock-enable native lua environment/table which would be shared by all workers. + +## File Tour + + bsd_queue.h - BSD's sys/queue.h, included here because Linux's version is dreadful + command.c - processes user input + common.c - utility functions + connections.c - manage and interact with active clients + db.c - persistent data storage + lua_interface.c - lua bindings to server and connections + main.c - cli + notify.c - logging abstraction + server.c - event loop, network stuff, glues everything together + workqueue.c - concurrent task runner + diff --git a/bsd_queue.3 b/bsd_queue.3 new file mode 100644 index 0000000..d9b0daf --- /dev/null +++ b/bsd_queue.3 @@ -0,0 +1,1196 @@ +.\" $OpenBSD: queue.3,v 1.68 2020/12/30 13:33:38 millert Exp $ +.\" $NetBSD: queue.3,v 1.4 1995/07/03 00:25:36 mycroft Exp $ +.\" +.\" Copyright (c) 1993 The Regents of the University of California. +.\" All rights reserved. +.\" +.\" Redistribution and use in source and binary forms, with or without +.\" modification, are permitted provided that the following conditions +.\" are met: +.\" 1. Redistributions of source code must retain the above copyright +.\" notice, this list of conditions and the following disclaimer. +.\" 2. Redistributions in binary form must reproduce the above copyright +.\" notice, this list of conditions and the following disclaimer in the +.\" documentation and/or other materials provided with the distribution. +.\" 3. Neither the name of the University nor the names of its contributors +.\" may be used to endorse or promote products derived from this software +.\" without specific prior written permission. +.\" +.\" THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND +.\" ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +.\" ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE +.\" FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +.\" DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +.\" OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +.\" HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +.\" LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +.\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +.\" SUCH DAMAGE. +.\" +.\" @(#)queue.3 8.1 (Berkeley) 12/13/93 +.\" +.Dd $Mdocdate: December 30 2020 $ +.Dt SLIST_INIT 3 +.Os +.Sh NAME +.Nm SLIST_ENTRY , +.Nm SLIST_HEAD , +.Nm SLIST_HEAD_INITIALIZER , +.Nm SLIST_FIRST , +.Nm SLIST_NEXT , +.Nm SLIST_EMPTY , +.Nm SLIST_FOREACH , +.Nm SLIST_FOREACH_SAFE , +.Nm SLIST_INIT , +.Nm SLIST_INSERT_AFTER , +.Nm SLIST_INSERT_HEAD , +.Nm SLIST_REMOVE_AFTER , +.Nm SLIST_REMOVE_HEAD , +.Nm SLIST_REMOVE , +.Nm LIST_ENTRY , +.Nm LIST_HEAD , +.Nm LIST_HEAD_INITIALIZER , +.Nm LIST_FIRST , +.Nm LIST_NEXT , +.Nm LIST_EMPTY , +.Nm LIST_FOREACH , +.Nm LIST_FOREACH_SAFE , +.Nm LIST_INIT , +.Nm LIST_INSERT_AFTER , +.Nm LIST_INSERT_BEFORE , +.Nm LIST_INSERT_HEAD , +.Nm LIST_REMOVE , +.Nm LIST_REPLACE , +.Nm SIMPLEQ_ENTRY , +.Nm SIMPLEQ_HEAD , +.Nm SIMPLEQ_HEAD_INITIALIZER , +.Nm SIMPLEQ_FIRST , +.Nm SIMPLEQ_NEXT , +.Nm SIMPLEQ_EMPTY , +.Nm SIMPLEQ_FOREACH , +.Nm SIMPLEQ_FOREACH_SAFE , +.Nm SIMPLEQ_INIT , +.Nm SIMPLEQ_INSERT_AFTER , +.Nm SIMPLEQ_INSERT_HEAD , +.Nm SIMPLEQ_INSERT_TAIL , +.Nm SIMPLEQ_REMOVE_AFTER , +.Nm SIMPLEQ_REMOVE_HEAD , +.Nm SIMPLEQ_CONCAT , +.Nm STAILQ_ENTRY , +.Nm STAILQ_HEAD , +.Nm STAILQ_HEAD_INITIALIZER , +.Nm STAILQ_FIRST , +.Nm STAILQ_NEXT , +.Nm STAILQ_LAST , +.Nm STAILQ_EMPTY , +.Nm STAILQ_FOREACH , +.Nm STAILQ_FOREACH_SAFE , +.Nm STAILQ_INIT , +.Nm STAILQ_INSERT_AFTER , +.Nm STAILQ_INSERT_HEAD , +.Nm STAILQ_INSERT_TAIL , +.Nm STAILQ_REMOVE , +.Nm STAILQ_REMOVE_AFTER , +.Nm STAILQ_REMOVE_HEAD , +.Nm STAILQ_CONCAT , +.Nm TAILQ_ENTRY , +.Nm TAILQ_HEAD , +.Nm TAILQ_HEAD_INITIALIZER , +.Nm TAILQ_FIRST , +.Nm TAILQ_NEXT , +.Nm TAILQ_LAST , +.Nm TAILQ_PREV , +.Nm TAILQ_EMPTY , +.Nm TAILQ_FOREACH , +.Nm TAILQ_FOREACH_SAFE , +.Nm TAILQ_FOREACH_REVERSE , +.Nm TAILQ_FOREACH_REVERSE_SAFE , +.Nm TAILQ_INIT , +.Nm TAILQ_INSERT_AFTER , +.Nm TAILQ_INSERT_BEFORE , +.Nm TAILQ_INSERT_HEAD , +.Nm TAILQ_INSERT_TAIL , +.Nm TAILQ_REMOVE , +.Nm TAILQ_REPLACE , +.Nm TAILQ_CONCAT +.Nd intrusive singly-linked and doubly-linked lists, simple queues, singly-linked and doubly-linked tail queues +.Sh SYNOPSIS +.In sys/queue.h +.Pp +.Fn SLIST_ENTRY "TYPE" +.Fn SLIST_HEAD "HEADNAME" "TYPE" +.Fn SLIST_HEAD_INITIALIZER "SLIST_HEAD head" +.Ft "struct TYPE *" +.Fn SLIST_FIRST "SLIST_HEAD *head" +.Ft "struct TYPE *" +.Fn SLIST_NEXT "struct TYPE *listelm" "FIELDNAME" +.Ft int +.Fn SLIST_EMPTY "SLIST_HEAD *head" +.Fn SLIST_FOREACH "VARNAME" "SLIST_HEAD *head" "FIELDNAME" +.Fn SLIST_FOREACH_SAFE "VARNAME" "SLIST_HEAD *head" "FIELDNAME" "TEMP_VARNAME" +.Ft void +.Fn SLIST_INIT "SLIST_HEAD *head" +.Ft void +.Fn SLIST_INSERT_AFTER "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SLIST_INSERT_HEAD "SLIST_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SLIST_REMOVE_AFTER "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SLIST_REMOVE_HEAD "SLIST_HEAD *head" "FIELDNAME" +.Ft void +.Fn SLIST_REMOVE "SLIST_HEAD *head" "struct TYPE *elm" "TYPE" "FIELDNAME" +.Pp +.Fn LIST_ENTRY "TYPE" +.Fn LIST_HEAD "HEADNAME" "TYPE" +.Fn LIST_HEAD_INITIALIZER "LIST_HEAD head" +.Ft "struct TYPE *" +.Fn LIST_FIRST "LIST_HEAD *head" +.Ft "struct TYPE *" +.Fn LIST_NEXT "struct TYPE *listelm" "FIELDNAME" +.Ft int +.Fn LIST_EMPTY "LIST_HEAD *head" +.Fn LIST_FOREACH "VARNAME" "LIST_HEAD *head" "FIELDNAME" +.Fn LIST_FOREACH_SAFE "VARNAME" "LIST_HEAD *head" "FIELDNAME" "TEMP_VARNAME" +.Ft void +.Fn LIST_INIT "LIST_HEAD *head" +.Ft void +.Fn LIST_INSERT_AFTER "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn LIST_INSERT_BEFORE "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn LIST_INSERT_HEAD "LIST_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn LIST_REMOVE "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn LIST_REPLACE "struct TYPE *elm" "struct TYPE *elm2" "FIELDNAME" +.Pp +.Fn SIMPLEQ_ENTRY "TYPE" +.Fn SIMPLEQ_HEAD "HEADNAME" "TYPE" +.Fn SIMPLEQ_HEAD_INITIALIZER "SIMPLEQ_HEAD head" +.Ft "struct TYPE *" +.Fn SIMPLEQ_FIRST "SIMPLEQ_HEAD *head" +.Ft "struct TYPE *" +.Fn SIMPLEQ_NEXT "struct TYPE *listelm" "FIELDNAME" +.Ft int +.Fn SIMPLEQ_EMPTY "SIMPLEQ_HEAD *head" +.Fn SIMPLEQ_FOREACH "VARNAME" "SIMPLEQ_HEAD *head" "FIELDNAME" +.Fn SIMPLEQ_FOREACH_SAFE "VARNAME" "SIMPLEQ_HEAD *head" "FIELDNAME" "TEMP_VARNAME" +.Ft void +.Fn SIMPLEQ_INIT "SIMPLEQ_HEAD *head" +.Ft void +.Fn SIMPLEQ_INSERT_AFTER "SIMPLEQ_HEAD *head" "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SIMPLEQ_INSERT_HEAD "SIMPLEQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SIMPLEQ_INSERT_TAIL "SIMPLEQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SIMPLEQ_REMOVE_AFTER "SIMPLEQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn SIMPLEQ_REMOVE_HEAD "SIMPLEQ_HEAD *head" "FIELDNAME" +.Fn SIMPLEQ_CONCAT "SIMPLEQ_HEAD *head1" "SIMPLEQ_HEAD *head2" +.Pp +.Fn STAILQ_ENTRY "TYPE" +.Fn STAILQ_HEAD "HEADNAME" "TYPE" +.Fn STAILQ_HEAD_INITIALIZER "STAILQ_HEAD head" +.Fn STAILQ_FIRST "STAILQ_HEAD *head" +.Fn STAILQ_NEXT "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_LAST "STAILQ_HEAD *head" "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_EMPTY "STAILQ_HEAD *head" +.Fn STAILQ_FOREACH "TYPE *var" "STAILQ_HEAD *head" "STAILQ_ENTRY NAME" +.Fn STAILQ_FOREACH_SAFE "TYPE *var" "STAILQ_HEAD *head" "STAILQ_ENTRY NAME" "TYPE *temp_var" +.Fn STAILQ_INIT "STAILQ_HEAD *head" +.Fn STAILQ_INSERT_AFTER "STAILQ_HEAD *head" "TYPE *listelm" "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_INSERT_HEAD "STAILQ_HEAD *head" "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_INSERT_TAIL "STAILQ_HEAD *head" "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_REMOVE "STAILQ_HEAD *head" "TYPE *elm" "TYPE" "STAILQ_ENTRY NAME" +.Fn STAILQ_REMOVE_AFTER "STAILQ_HEAD *head" "TYPE *elm" "STAILQ_ENTRY NAME" +.Fn STAILQ_REMOVE_HEAD "STAILQ_HEAD *head" "STAILQ_ENTRY NAME" +.Fn STAILQ_CONCAT "STAILQ_HEAD *head1" "STAILQ_HEAD *head2" +.Pp +.Fn TAILQ_ENTRY "TYPE" +.Fn TAILQ_HEAD "HEADNAME" "TYPE" +.Fn TAILQ_HEAD_INITIALIZER "TAILQ_HEAD head" +.Ft "struct TYPE *" +.Fn TAILQ_FIRST "TAILQ_HEAD *head" +.Ft "struct TYPE *" +.Fn TAILQ_NEXT "struct TYPE *listelm" "FIELDNAME" +.Ft "struct TYPE *" +.Fn TAILQ_LAST "TAILQ_HEAD *head" "HEADNAME" +.Ft "struct TYPE *" +.Fn TAILQ_PREV "struct TYPE *listelm" "HEADNAME" "FIELDNAME" +.Ft int +.Fn TAILQ_EMPTY "TAILQ_HEAD *head" +.Fn TAILQ_FOREACH "VARNAME" "TAILQ_HEAD *head" "FIELDNAME" +.Fn TAILQ_FOREACH_SAFE "VARNAME" "TAILQ_HEAD *head" "FIELDNAME" "TEMP_VARNAME" +.Fn TAILQ_FOREACH_REVERSE "VARNAME" "TAILQ_HEAD *head" "HEADNAME" "FIELDNAME" +.Fn TAILQ_FOREACH_REVERSE_SAFE "VARNAME" "TAILQ_HEAD *head" "HEADNAME" "FIELDNAME" "TEMP_VARNAME" +.Ft void +.Fn TAILQ_INIT "TAILQ_HEAD *head" +.Ft void +.Fn TAILQ_INSERT_AFTER "TAILQ_HEAD *head" "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn TAILQ_INSERT_BEFORE "struct TYPE *listelm" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn TAILQ_INSERT_HEAD "TAILQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn TAILQ_INSERT_TAIL "TAILQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn TAILQ_REMOVE "TAILQ_HEAD *head" "struct TYPE *elm" "FIELDNAME" +.Ft void +.Fn TAILQ_REPLACE "TAILQ_HEAD *head" "struct TYPE *elm" "struct TYPE *elm2" "FIELDNAME" +.Fn TAILQ_CONCAT "TAILQ_HEAD *head1" "TAILQ_HEAD *head2" "FIELDNAME" +.Sh DESCRIPTION +These macros define and operate on five types of data structures: +singly-linked lists, simple queues, lists, singly-linked tail queues, +and tail queues. +All five structures support the following functionality: +.Pp +.Bl -enum -compact -offset indent +.It +Insertion of a new entry at the head of the list. +.It +Insertion of a new entry after any element in the list. +.It +Removal of an entry from the head of the list. +.It +Forward traversal through the list. +.El +.Pp +The following table provides a quick overview +of which types support which additional macros: +.Bl -column -offset 6n "LAST, PREV, FOREACH_REVERSE" SLIST LIST SIMPLEQ STAILQ TAILQ +.It LAST, PREV, FOREACH_REVERSE Ta - Ta - Ta - Ta - Ta TAILQ +.It INSERT_BEFORE, REPLACE Ta - Ta LIST Ta - Ta - Ta TAILQ +.It INSERT_TAIL, CONCAT Ta - Ta - Ta SIMPLEQ Ta STAILQ Ta TAILQ +.It REMOVE_AFTER, REMOVE_HEAD Ta SLIST Ta - Ta SIMPLEQ Ta STAILQ Ta - +.It REMOVE Ta SLIST Ta LIST Ta - Ta STAILQ Ta TAILQ +.El +.Pp +Singly-linked lists are the simplest of the five data structures +and support only the above functionality. +Singly-linked lists are ideal for applications with large datasets +and few or no removals, or for implementing a LIFO queue. +.Pp +Simple queues and singly-linked tail queues add the following functionality: +.Pp +.Bl -enum -compact -offset indent +.It +Entries can be added at the end of a list. +.El +.Pp +However: +.Pp +.Bl -enum -compact -offset indent +.It +All list insertions must specify the head of the list. +.It +Each head entry requires two pointers rather than one. +.It +Code size is about 15% greater and operations run about 20% slower +than singly-linked lists. +.El +.Pp +Simple queues and singly-linked tail queues are ideal for applications with +large datasets and few or no removals, or for implementing a FIFO queue. +.Pp +All doubly linked types of data structures (lists and tail queues) +additionally allow: +.Pp +.Bl -enum -compact -offset indent +.It +Insertion of a new entry before any element in the list. +.It +Removal of any entry in the list. +.El +.Pp +However: +.Pp +.Bl -enum -compact -offset indent +.It +Each element requires two pointers rather than one. +.It +Code size and execution time of operations (except for removal) is about +twice that of the singly-linked data-structures. +.El +.Pp +Lists are the simplest of the doubly linked data structures and support +only the above functionality over singly-linked lists. +.Pp +Tail queues add the following functionality: +.Pp +.Bl -enum -compact -offset indent +.It +Entries can be added at the end of a list. +.It +They may be traversed backwards, at a cost. +.El +.Pp +However: +.Pp +.Bl -enum -compact -offset indent +.It +All list insertions and removals must specify the head of the list. +.It +Each head entry requires two pointers rather than one. +.It +Code size is about 15% greater and operations run about 20% slower +than singly-linked lists. +.El +.Pp +An additional type of data structure, circular queues, violated the C +language aliasing rules and were miscompiled as a result. +All code using them should be converted to another structure; +tail queues are usually the easiest to convert to. +.Pp +All these lists and queues are intrusive: they link together user +defined structures containing a field of type +.Li SLIST_ENTRY , +.Li LIST_ENTRY , +.Li SIMPLEQ_ENTRY , +.Li STAILQ_ENTRY , +or +.Li TAILQ_ENTRY . +In the macro definitions, +.Fa TYPE +is the name tag of the user defined structure and +.Fa FIELDNAME +is the name of the +.Li *_ENTRY +field. +If an instance of the user defined structure needs to be a member of +multiple lists at the same time, the structure requires multiple +.Li *_ENTRY +fields, one for each list. +.Pp +The argument +.Fa HEADNAME +is the name tag of a user defined structure that must be declared +using the macros +.Fn SLIST_HEAD , +.Fn LIST_HEAD , +.Fn SIMPLEQ_HEAD , +.Fn STAILQ_HEAD , +or +.Fn TAILQ_HEAD . +See the examples below for further explanation of how these macros are used. +.Sh SINGLY-LINKED LISTS +A singly-linked list is headed by a structure defined by the +.Fn SLIST_HEAD +macro. +This structure contains a single pointer to the first element on the list. +The elements are singly linked for minimum space and pointer manipulation +overhead at the expense of O(n) removal for arbitrary elements. +New elements can be added to the list after an existing element or +at the head of the list. +A +.Fa SLIST_HEAD +structure is declared as follows: +.Bd -literal -offset indent +SLIST_HEAD(HEADNAME, TYPE) head; +.Ed +.Pp +where +.Fa HEADNAME +is the name of the structure to be defined, and struct +.Fa TYPE +is the type of the elements to be linked into the list. +A pointer to the head of the list can later be declared as: +.Bd -literal -offset indent +struct HEADNAME *headp; +.Ed +.Pp +(The names +.Li head +and +.Li headp +are user selectable.) +.Pp +The +.Fa HEADNAME +facility is often not used, leading to the following bizarre code: +.Bd -literal -offset indent +SLIST_HEAD(, TYPE) head, *headp; +.Ed +.Pp +The +.Fn SLIST_ENTRY +macro declares a structure that connects the elements in the list. +.Pp +The +.Fn SLIST_INIT +macro initializes the list referenced by +.Fa head . +.Pp +The list can also be initialized statically by using the +.Fn SLIST_HEAD_INITIALIZER +macro like this: +.Bd -literal -offset indent +SLIST_HEAD(HEADNAME, TYPE) head = SLIST_HEAD_INITIALIZER(head); +.Ed +.Pp +The +.Fn SLIST_INSERT_HEAD +macro inserts the new element +.Fa elm +at the head of the list. +.Pp +The +.Fn SLIST_INSERT_AFTER +macro inserts the new element +.Fa elm +after the element +.Fa listelm . +.Pp +The +.Fn SLIST_REMOVE_HEAD +macro removes the first element of the list pointed by +.Fa head . +.Pp +The +.Fn SLIST_REMOVE_AFTER +macro removes the list element immediately following +.Fa elm . +.Pp +The +.Fn SLIST_REMOVE +macro removes the element +.Fa elm +of the list pointed by +.Fa head . +.Pp +The +.Fn SLIST_FIRST +and +.Fn SLIST_NEXT +macros can be used to traverse the list: +.Bd -literal -offset indent +for (np = SLIST_FIRST(&head); np != NULL; np = SLIST_NEXT(np, FIELDNAME)) +.Ed +.Pp +Or, for simplicity, one can use the +.Fn SLIST_FOREACH +macro: +.Bd -literal -offset indent +SLIST_FOREACH(np, head, FIELDNAME) +.Ed +.Pp +The macro +.Fn SLIST_FOREACH_SAFE +traverses the list referenced by head in a +forward direction, assigning each element in turn to var. +However, unlike +.Fn SLIST_FOREACH +it is permitted to remove var as well +as free it from within the loop safely without interfering with the traversal. +.Pp +The +.Fn SLIST_EMPTY +macro should be used to check whether a simple list is empty. +.Sh SINGLY-LINKED LIST EXAMPLE +.Bd -literal +SLIST_HEAD(listhead, entry) head; +struct entry { + ... + SLIST_ENTRY(entry) entries; /* Simple list. */ + ... +} *n1, *n2, *np; + +SLIST_INIT(&head); /* Initialize simple list. */ + +n1 = malloc(sizeof(struct entry)); /* Insert at the head. */ +SLIST_INSERT_HEAD(&head, n1, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert after. */ +SLIST_INSERT_AFTER(n1, n2, entries); + +SLIST_FOREACH(np, &head, entries) /* Forward traversal. */ + np-> ... + +while (!SLIST_EMPTY(&head)) { /* Delete. */ + n1 = SLIST_FIRST(&head); + SLIST_REMOVE_HEAD(&head, entries); + free(n1); +} + +.Ed +.Sh LISTS +A list is headed by a structure defined by the +.Fn LIST_HEAD +macro. +This structure contains a single pointer to the first element on the list. +The elements are doubly linked so that an arbitrary element can be +removed without traversing the list. +New elements can be added to the list after an existing element, +before an existing element, or at the head of the list. +A +.Fa LIST_HEAD +structure is declared as follows: +.Bd -literal -offset indent +LIST_HEAD(HEADNAME, TYPE) head; +.Ed +.Pp +where +.Fa HEADNAME +is the name of the structure to be defined, and struct +.Fa TYPE +is the type of the elements to be linked into the list. +A pointer to the head of the list can later be declared as: +.Bd -literal -offset indent +struct HEADNAME *headp; +.Ed +.Pp +(The names +.Li head +and +.Li headp +are user selectable.) +.Pp +The +.Fa HEADNAME +facility is often not used, leading to the following bizarre code: +.Bd -literal -offset indent +LIST_HEAD(, TYPE) head, *headp; +.Ed +.Pp +The +.Fn LIST_ENTRY +macro declares a structure that connects the elements in the list. +.Pp +The +.Fn LIST_INIT +macro initializes the list referenced by +.Fa head . +.Pp +The list can also be initialized statically by using the +.Fn LIST_HEAD_INITIALIZER +macro like this: +.Bd -literal -offset indent +LIST_HEAD(HEADNAME, TYPE) head = LIST_HEAD_INITIALIZER(head); +.Ed +.Pp +The +.Fn LIST_INSERT_HEAD +macro inserts the new element +.Fa elm +at the head of the list. +.Pp +The +.Fn LIST_INSERT_AFTER +macro inserts the new element +.Fa elm +after the element +.Fa listelm . +.Pp +The +.Fn LIST_INSERT_BEFORE +macro inserts the new element +.Fa elm +before the element +.Fa listelm . +.Pp +The +.Fn LIST_REMOVE +macro removes the element +.Fa elm +from the list. +.Pp +The +.Fn LIST_REPLACE +macro replaces the list element +.Fa elm +with the new element +.Fa elm2 . +.Pp +The +.Fn LIST_FIRST +and +.Fn LIST_NEXT +macros can be used to traverse the list: +.Bd -literal -offset indent +for (np = LIST_FIRST(&head); np != NULL; np = LIST_NEXT(np, FIELDNAME)) +.Ed +.Pp +Or, for simplicity, one can use the +.Fn LIST_FOREACH +macro: +.Bd -literal -offset indent +LIST_FOREACH(np, head, FIELDNAME) +.Ed +.Pp +The macro +.Fn LIST_FOREACH_SAFE +traverses the list referenced by head in a +forward direction, assigning each element in turn to var. +However, unlike +.Fn LIST_FOREACH +it is permitted to remove var as well +as free it from within the loop safely without interfering with the traversal. +.Pp +The +.Fn LIST_EMPTY +macro should be used to check whether a list is empty. +.Sh LIST EXAMPLE +.Bd -literal +LIST_HEAD(listhead, entry) head; +struct entry { + ... + LIST_ENTRY(entry) entries; /* List. */ + ... +} *n1, *n2, *np; + +LIST_INIT(&head); /* Initialize list. */ + +n1 = malloc(sizeof(struct entry)); /* Insert at the head. */ +LIST_INSERT_HEAD(&head, n1, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert after. */ +LIST_INSERT_AFTER(n1, n2, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert before. */ +LIST_INSERT_BEFORE(n1, n2, entries); + /* Forward traversal. */ +LIST_FOREACH(np, &head, entries) + np-> ... + +while (!LIST_EMPTY(&head)) { /* Delete. */ + n1 = LIST_FIRST(&head); + LIST_REMOVE(n1, entries); + free(n1); +} +.Ed +.Sh SIMPLE QUEUES +A simple queue is headed by a structure defined by the +.Fn SIMPLEQ_HEAD +macro. +This structure contains a pair of pointers, one to the first element in the +simple queue and the other to the last element in the simple queue. +The elements are singly linked. +New elements can be added to the queue after an existing element, +at the head of the queue or at the tail of the queue. +A +.Fa SIMPLEQ_HEAD +structure is declared as follows: +.Bd -literal -offset indent +SIMPLEQ_HEAD(HEADNAME, TYPE) head; +.Ed +.Pp +where +.Fa HEADNAME +is the name of the structure to be defined, and struct +.Fa TYPE +is the type of the elements to be linked into the queue. +A pointer to the head of the queue can later be declared as: +.Bd -literal -offset indent +struct HEADNAME *headp; +.Ed +.Pp +(The names +.Li head +and +.Li headp +are user selectable.) +.Pp +The +.Fn SIMPLEQ_ENTRY +macro declares a structure that connects the elements in +the queue. +.Pp +The +.Fn SIMPLEQ_INIT +macro initializes the queue referenced by +.Fa head . +.Pp +The queue can also be initialized statically by using the +.Fn SIMPLEQ_HEAD_INITIALIZER +macro like this: +.Bd -literal -offset indent +SIMPLEQ_HEAD(HEADNAME, TYPE) head = SIMPLEQ_HEAD_INITIALIZER(head); +.Ed +.Pp +The +.Fn SIMPLEQ_INSERT_AFTER +macro inserts the new element +.Fa elm +after the element +.Fa listelm . +.Pp +The +.Fn SIMPLEQ_INSERT_HEAD +macro inserts the new element +.Fa elm +at the head of the queue. +.Pp +The +.Fn SIMPLEQ_INSERT_TAIL +macro inserts the new element +.Fa elm +at the end of the queue. +.Pp +The +.Fn SIMPLEQ_REMOVE_AFTER +macro removes the queue element immediately following +.Fa elm . +.Pp +The +.Fn SIMPLEQ_REMOVE_HEAD +macro removes the first element +from the queue. +.Pp +The +.Fn SIMPLEQ_CONCAT +macro concatenates all the elements of the queue referenced by +.Fa head2 +to the end of the queue referenced by +.Fa head1 , +emptying +.Fa head2 +in the process. +This is more efficient than removing and inserting the individual elements +as it does not actually traverse +.Fa head2 . +.Pp +The +.Fn SIMPLEQ_FIRST +and +.Fn SIMPLEQ_NEXT +macros can be used to traverse the queue. +The +.Fn SIMPLEQ_FOREACH +is used for queue traversal: +.Bd -literal -offset indent +SIMPLEQ_FOREACH(np, head, FIELDNAME) +.Ed +.Pp +The macro +.Fn SIMPLEQ_FOREACH_SAFE +traverses the queue referenced by head in a +forward direction, assigning each element in turn to var. +However, unlike +.Fn SIMPLEQ_FOREACH +it is permitted to remove var as well +as free it from within the loop safely without interfering with the traversal. +.Pp +The +.Fn SIMPLEQ_EMPTY +macro should be used to check whether a list is empty. +.Sh SIMPLE QUEUE EXAMPLE +.Bd -literal +SIMPLEQ_HEAD(listhead, entry) head = SIMPLEQ_HEAD_INITIALIZER(head); +struct entry { + ... + SIMPLEQ_ENTRY(entry) entries; /* Simple queue. */ + ... +} *n1, *n2, *np; + +n1 = malloc(sizeof(struct entry)); /* Insert at the head. */ +SIMPLEQ_INSERT_HEAD(&head, n1, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert after. */ +SIMPLEQ_INSERT_AFTER(&head, n1, n2, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert at the tail. */ +SIMPLEQ_INSERT_TAIL(&head, n2, entries); + /* Forward traversal. */ +SIMPLEQ_FOREACH(np, &head, entries) + np-> ... + /* Delete. */ +while (!SIMPLEQ_EMPTY(&head)) { + n1 = SIMPLEQ_FIRST(&head); + SIMPLEQ_REMOVE_HEAD(&head, entries); + free(n1); +} +.Ed +.Sh SINGLY-LINKED TAIL QUEUES +A singly-linked tail queue is headed by a structure defined by the +.Fn STAILQ_HEAD +macro. +This structure contains a pair of pointers, one to the first element in +the tail queue and the other to the last element in the tail queue. +The elements are singly linked for minimum space and pointer manipulation +overhead at the expense of O(n) removal for arbitrary elements. +New elements can be added to the tail queue after an existing element, +at the head of the tail queue or at the end of the tail queue. +A +.Fa STAILQ_HEAD +structure is declared as follows: +.Bd -literal -offset indent +STAILQ_HEAD(HEADNAME, TYPE) head; +.Ed +.Pp +where +.Fa HEADNAME +is the name of the structure to be defined, and struct +.Fa TYPE +is the type of the elements to be linked into the tail queue. +A pointer to the head of the tail queue can later be declared as: +.Bd -literal -offset indent +struct HEADNAME *headp; +.Ed +.Pp +(The names +.Li head +and +.Li headp +are user selectable.) +.Pp +The +.Fn STAILQ_ENTRY +macro declares a structure that connects the elements in +the tail queue. +.Pp +The +.Fn STAILQ_INIT +macro initializes the tail queue referenced by +.Fa head . +.Pp +The tail queue can also be initialized statically by using the +.Fn STAILQ_HEAD_INITIALIZER +macro like this: +.Bd -literal -offset indent +STAILQ_HEAD(HEADNAME, TYPE) head = STAILQ_HEAD_INITIALIZER(head); +.Ed +.Pp +The +.Fn STAILQ_INSERT_AFTER +macro inserts the new element +.Fa elm +after the element +.Fa listelm . +.Pp +The +.Fn STAILQ_INSERT_HEAD +macro inserts the new element +.Fa elm +at the head of the tail queue. +.Pp +The +.Fn STAILQ_INSERT_TAIL +macro inserts the new element +.Fa elm +at the end of the tail queue. +.Pp +The +.Fn STAILQ_REMOVE_AFTER +macro removes the queue element immediately following +.Fa elm . +Unlike +.Fa STAILQ_REMOVE , +this macro does not traverse the entire tail queue. +.Pp +The +.Fn STAILQ_REMOVE_HEAD +macro removes the first element +from the tail queue. +For optimum efficiency, +elements being removed from the head of the tail queue should +use this macro explicitly rather than the generic +.Fa STAILQ_REMOVE +macro. +.Pp +The +.Fn STAILQ_REMOVE +macro removes the element +.Fa elm +from the tail queue. +Use of this macro should be avoided as it traverses the entire list. +A doubly-linked tail queue should be used if this macro is needed in +high-usage code paths or to operate on long tail queues. +.Pp +The +.Fn STAILQ_CONCAT +macro concatenates all the elements of the tail queue referenced by +.Fa head2 +to the end of the tail queue referenced by +.Fa head1 , +emptying +.Fa head2 +in the process. +This is more efficient than removing and inserting the individual elements +as it does not actually traverse +.Fa head2 . +.Pp +The +.Fn STAILQ_FOREACH +is used for queue traversal: +.Bd -literal -offset indent +STAILQ_FOREACH(np, head, FIELDNAME) +.Ed +.Pp +The macro +.Fn STAILQ_FOREACH_SAFE +traverses the queue referenced by head in a +forward direction, assigning each element in turn to var. +However, unlike +.Fn STAILQ_FOREACH +it is permitted to remove var as well +as free it from within the loop safely without interfering with the traversal. +.Pp +The +.Fn STAILQ_FIRST +.Fn STAILQ_NEXT , +and +.Fn STAILQ_LAST +macros can be used to manually traverse a tail queue or an arbitrary part of +one. +The +.Fn STAILQ_EMPTY +macro should be used to check whether a tail queue is empty. +.Sh SINGLY-LINKED TAIL QUEUE EXAMPLE +.Bd -literal +STAILQ_HEAD(listhead, entry) head = STAILQ_HEAD_INITIALIZER(head); +struct entry { + ... + STAILQ_ENTRY(entry) entries; /* Singly-linked tail queue. */ + ... +} *n1, *n2, *np; + +n1 = malloc(sizeof(struct entry)); /* Insert at the head. */ +STAILQ_INSERT_HEAD(&head, n1, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert at the tail. */ +STAILQ_INSERT_TAIL(&head, n2, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert after. */ +STAILQ_INSERT_AFTER(&head, n1, n2, entries); + + /* Deletion. */ +STAILQ_REMOVE(&head, n2, entry, entries); +free(n2); + /* Deletion from the head. */ +n3 = STAILQ_FIRST(&head); +STAILQ_REMOVE_HEAD(&head, entries); +free(n3); + /* Forward traversal. */ +STAILQ_FOREACH(np, &head, entries) + np-> ... + /* Safe forward traversal. */ +STAILQ_FOREACH_SAFE(np, &head, entries, np_temp) { + np-> ... + STAILQ_REMOVE(&head, np, entry, entries); + free(np); +} + /* Delete. */ +while (!STAILQ_EMPTY(&head)) { + n1 = STAILQ_FIRST(&head); + STAILQ_REMOVE_HEAD(&head, entries); + free(n1); +} +.Ed +.Sh TAIL QUEUES +A tail queue is headed by a structure defined by the +.Fn TAILQ_HEAD +macro. +This structure contains a pair of pointers, +one to the first element in the tail queue and the other to +the last element in the tail queue. +The elements are doubly linked so that an arbitrary element can be +removed without traversing the tail queue. +New elements can be added to the queue after an existing element, +before an existing element, at the head of the queue, or at the end +of the queue. +A +.Fa TAILQ_HEAD +structure is declared as follows: +.Bd -literal -offset indent +TAILQ_HEAD(HEADNAME, TYPE) head; +.Ed +.Pp +where +.Fa HEADNAME +is the name of the structure to be defined, and struct +.Fa TYPE +is the type of the elements to be linked into the tail queue. +A pointer to the head of the tail queue can later be declared as: +.Bd -literal -offset indent +struct HEADNAME *headp; +.Ed +.Pp +(The names +.Li head +and +.Li headp +are user selectable.) +.Pp +The +.Fn TAILQ_ENTRY +macro declares a structure that connects the elements in +the tail queue. +.Pp +The +.Fn TAILQ_INIT +macro initializes the tail queue referenced by +.Fa head . +.Pp +The tail queue can also be initialized statically by using the +.Fn TAILQ_HEAD_INITIALIZER +macro. +.Pp +The +.Fn TAILQ_INSERT_HEAD +macro inserts the new element +.Fa elm +at the head of the tail queue. +.Pp +The +.Fn TAILQ_INSERT_TAIL +macro inserts the new element +.Fa elm +at the end of the tail queue. +.Pp +The +.Fn TAILQ_INSERT_AFTER +macro inserts the new element +.Fa elm +after the element +.Fa listelm . +.Pp +The +.Fn TAILQ_INSERT_BEFORE +macro inserts the new element +.Fa elm +before the element +.Fa listelm . +.Pp +The +.Fn TAILQ_REMOVE +macro removes the element +.Fa elm +from the tail queue. +.Pp +The +.Fn TAILQ_REPLACE +macro replaces the list element +.Fa elm +with the new element +.Fa elm2 . +.Pp +The +.Fn TAILQ_CONCAT +macro concatenates all the elements of the tail queue referenced by +.Fa head2 +to the end of the tail queue referenced by +.Fa head1 , +emptying +.Fa head2 +in the process. +This is more efficient than removing and inserting the individual elements +as it does not actually traverse +.Fa head2 . +.Pp +.Fn TAILQ_FOREACH +and +.Fn TAILQ_FOREACH_REVERSE +are used for traversing a tail queue. +.Fn TAILQ_FOREACH +starts at the first element and proceeds towards the last. +.Fn TAILQ_FOREACH_REVERSE +starts at the last element and proceeds towards the first. +.Bd -literal -offset indent +TAILQ_FOREACH(np, &head, FIELDNAME) +TAILQ_FOREACH_REVERSE(np, &head, HEADNAME, FIELDNAME) +.Ed +.Pp +The macros +.Fn TAILQ_FOREACH_SAFE +and +.Fn TAILQ_FOREACH_REVERSE_SAFE +traverse the list referenced by head +in a forward or reverse direction respectively, +assigning each element in turn to var. +However, unlike their unsafe counterparts, +they permit both the removal of var +as well as freeing it from within the loop safely +without interfering with the traversal. +.Pp +The +.Fn TAILQ_FIRST , +.Fn TAILQ_NEXT , +.Fn TAILQ_LAST +and +.Fn TAILQ_PREV +macros can be used to manually traverse a tail queue or an arbitrary part of +one. +.Pp +The +.Fn TAILQ_EMPTY +macro should be used to check whether a tail queue is empty. +.Sh TAIL QUEUE EXAMPLE +.Bd -literal +TAILQ_HEAD(tailhead, entry) head; +struct entry { + ... + TAILQ_ENTRY(entry) entries; /* Tail queue. */ + ... +} *n1, *n2, *np; + +TAILQ_INIT(&head); /* Initialize queue. */ + +n1 = malloc(sizeof(struct entry)); /* Insert at the head. */ +TAILQ_INSERT_HEAD(&head, n1, entries); + +n1 = malloc(sizeof(struct entry)); /* Insert at the tail. */ +TAILQ_INSERT_TAIL(&head, n1, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert after. */ +TAILQ_INSERT_AFTER(&head, n1, n2, entries); + +n2 = malloc(sizeof(struct entry)); /* Insert before. */ +TAILQ_INSERT_BEFORE(n1, n2, entries); + /* Forward traversal. */ +TAILQ_FOREACH(np, &head, entries) + np-> ... + /* Manual forward traversal. */ +for (np = n2; np != NULL; np = TAILQ_NEXT(np, entries)) + np-> ... + /* Delete. */ +while ((np = TAILQ_FIRST(&head))) { + TAILQ_REMOVE(&head, np, entries); + free(np); +} + +.Ed +.Sh SEE ALSO +.Xr tree 3 +.Sh NOTES +It is an error to assume the next and previous fields are preserved +after an element has been removed from a list or queue. +Using any macro (except the various forms of insertion) on an element +removed from a list or queue is incorrect. +An example of erroneous usage is removing the same element twice. +.Pp +The +.Fn SLIST_END , +.Fn LIST_END , +.Fn SIMPLEQ_END , +.Fn STAILQ_END +and +.Fn TAILQ_END +macros are deprecated; they provided symmetry with the historical +.Fn CIRCLEQ_END +and just expand to +.Dv NULL . +.Pp +Trying to free a list in the following way is a common error: +.Bd -literal -offset indent +LIST_FOREACH(var, head, entry) + free(var); +free(head); +.Ed +.Pp +Since +.Va var +is free'd, the FOREACH macros refer to a pointer that may have been +reallocated already. +A similar situation occurs when the current element is deleted +from the list. +In cases like these the data structure's FOREACH_SAFE macros should be used +instead. +.Sh HISTORY +The +.Nm queue +functions first appeared in +.Bx 4.4 . +The historical circle queue macros were deprecated in +.Ox 5.5 . diff --git a/bsd_queue.h b/bsd_queue.h new file mode 100644 index 0000000..18f04ea --- /dev/null +++ b/bsd_queue.h @@ -0,0 +1,633 @@ +/* $OpenBSD: queue.h,v 1.46 2020/12/30 13:33:12 millert Exp $ */ +/* $NetBSD: queue.h,v 1.11 1996/05/16 05:17:14 mycroft Exp $ */ + +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + */ + +#ifndef BSD_QUEUE_H +#define BSD_QUEUE_H + +// #include + +/* + * This file defines five types of data structures: singly-linked lists, + * lists, simple queues, tail queues and XOR simple queues. + * + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may only be traversed in the forward direction. + * + * A simple queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are singly + * linked to save space, so elements can only be removed from the + * head of the list. New elements can be added to the list before or after + * an existing element, at the head of the list, or at the end of the + * list. A simple queue may only be traversed in the forward direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * An XOR simple queue is used in the same way as a regular simple queue. + * The difference is that the head structure also includes a "cookie" that + * is XOR'd with the queue pointer (first, last or next) to generate the + * real pointer value. + * + * For details on the use of these macros, see the queue(3) manual page. + */ + +#if defined(QUEUE_MACRO_DEBUG) || (defined(_KERNEL) && defined(DIAGNOSTIC)) +#define _Q_INVALID ((void *)-1) +#define _Q_INVALIDATE(a) (a) = _Q_INVALID +#else +#define _Q_INVALIDATE(a) +#endif + +/* + * Singly-linked List definitions. + */ +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} + +#define SLIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define SLIST_ENTRY(type) \ +struct { \ + struct type *sle_next; /* next element */ \ +} + +/* + * Singly-linked List access methods. + */ +#define SLIST_FIRST(head) ((head)->slh_first) +#define SLIST_END(head) NULL +#define SLIST_EMPTY(head) (SLIST_FIRST(head) == SLIST_END(head)) +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) + +#define SLIST_FOREACH(var, head, field) \ + for((var) = SLIST_FIRST(head); \ + (var) != SLIST_END(head); \ + (var) = SLIST_NEXT(var, field)) + +#define SLIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = SLIST_FIRST(head); \ + (var) && ((tvar) = SLIST_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * Singly-linked List functions. + */ +#define SLIST_INIT(head) { \ + SLIST_FIRST(head) = SLIST_END(head); \ +} + +#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \ + (elm)->field.sle_next = (slistelm)->field.sle_next; \ + (slistelm)->field.sle_next = (elm); \ +} while (0) + +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.sle_next = (head)->slh_first; \ + (head)->slh_first = (elm); \ +} while (0) + +#define SLIST_REMOVE_AFTER(elm, field) do { \ + (elm)->field.sle_next = (elm)->field.sle_next->field.sle_next; \ +} while (0) + +#define SLIST_REMOVE_HEAD(head, field) do { \ + (head)->slh_first = (head)->slh_first->field.sle_next; \ +} while (0) + +#define SLIST_REMOVE(head, elm, type, field) do { \ + if ((head)->slh_first == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } else { \ + struct type *curelm = (head)->slh_first; \ + \ + while (curelm->field.sle_next != (elm)) \ + curelm = curelm->field.sle_next; \ + curelm->field.sle_next = \ + curelm->field.sle_next->field.sle_next; \ + } \ + _Q_INVALIDATE((elm)->field.sle_next); \ +} while (0) + +/* + * List definitions. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List access methods. + */ +#define LIST_FIRST(head) ((head)->lh_first) +#define LIST_END(head) NULL +#define LIST_EMPTY(head) (LIST_FIRST(head) == LIST_END(head)) +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_FOREACH(var, head, field) \ + for((var) = LIST_FIRST(head); \ + (var)!= LIST_END(head); \ + (var) = LIST_NEXT(var, field)) + +#define LIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = LIST_FIRST(head); \ + (var) && ((tvar) = LIST_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * List functions. + */ +#define LIST_INIT(head) do { \ + LIST_FIRST(head) = LIST_END(head); \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \ + (listelm)->field.le_next->field.le_prev = \ + &(elm)->field.le_next; \ + (listelm)->field.le_next = (elm); \ + (elm)->field.le_prev = &(listelm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + (elm)->field.le_next = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &(elm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.le_next = (head)->lh_first) != NULL) \ + (head)->lh_first->field.le_prev = &(elm)->field.le_next;\ + (head)->lh_first = (elm); \ + (elm)->field.le_prev = &(head)->lh_first; \ +} while (0) + +#define LIST_REMOVE(elm, field) do { \ + if ((elm)->field.le_next != NULL) \ + (elm)->field.le_next->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = (elm)->field.le_next; \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +#define LIST_REPLACE(elm, elm2, field) do { \ + if (((elm2)->field.le_next = (elm)->field.le_next) != NULL) \ + (elm2)->field.le_next->field.le_prev = \ + &(elm2)->field.le_next; \ + (elm2)->field.le_prev = (elm)->field.le_prev; \ + *(elm2)->field.le_prev = (elm2); \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +/* + * Simple queue definitions. + */ +#define SIMPLEQ_HEAD(name, type) \ +struct name { \ + struct type *sqh_first; /* first element */ \ + struct type **sqh_last; /* addr of last next element */ \ +} + +#define SIMPLEQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).sqh_first } + +#define SIMPLEQ_ENTRY(type) \ +struct { \ + struct type *sqe_next; /* next element */ \ +} + +/* + * Simple queue access methods. + */ +#define SIMPLEQ_FIRST(head) ((head)->sqh_first) +#define SIMPLEQ_END(head) NULL +#define SIMPLEQ_EMPTY(head) (SIMPLEQ_FIRST(head) == SIMPLEQ_END(head)) +#define SIMPLEQ_NEXT(elm, field) ((elm)->field.sqe_next) + +#define SIMPLEQ_FOREACH(var, head, field) \ + for((var) = SIMPLEQ_FIRST(head); \ + (var) != SIMPLEQ_END(head); \ + (var) = SIMPLEQ_NEXT(var, field)) + +#define SIMPLEQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = SIMPLEQ_FIRST(head); \ + (var) && ((tvar) = SIMPLEQ_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * Simple queue functions. + */ +#define SIMPLEQ_INIT(head) do { \ + (head)->sqh_first = NULL; \ + (head)->sqh_last = &(head)->sqh_first; \ +} while (0) + +#define SIMPLEQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.sqe_next = (head)->sqh_first) == NULL) \ + (head)->sqh_last = &(elm)->field.sqe_next; \ + (head)->sqh_first = (elm); \ +} while (0) + +#define SIMPLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.sqe_next = NULL; \ + *(head)->sqh_last = (elm); \ + (head)->sqh_last = &(elm)->field.sqe_next; \ +} while (0) + +#define SIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.sqe_next = (listelm)->field.sqe_next) == NULL)\ + (head)->sqh_last = &(elm)->field.sqe_next; \ + (listelm)->field.sqe_next = (elm); \ +} while (0) + +#define SIMPLEQ_REMOVE_HEAD(head, field) do { \ + if (((head)->sqh_first = (head)->sqh_first->field.sqe_next) == NULL) \ + (head)->sqh_last = &(head)->sqh_first; \ +} while (0) + +#define SIMPLEQ_REMOVE_AFTER(head, elm, field) do { \ + if (((elm)->field.sqe_next = (elm)->field.sqe_next->field.sqe_next) \ + == NULL) \ + (head)->sqh_last = &(elm)->field.sqe_next; \ +} while (0) + +#define SIMPLEQ_CONCAT(head1, head2) do { \ + if (!SIMPLEQ_EMPTY((head2))) { \ + *(head1)->sqh_last = (head2)->sqh_first; \ + (head1)->sqh_last = (head2)->sqh_last; \ + SIMPLEQ_INIT((head2)); \ + } \ +} while (0) + +/* + * XOR Simple queue definitions. + */ +#define XSIMPLEQ_HEAD(name, type) \ +struct name { \ + struct type *sqx_first; /* first element */ \ + struct type **sqx_last; /* addr of last next element */ \ + unsigned long sqx_cookie; \ +} + +#define XSIMPLEQ_ENTRY(type) \ +struct { \ + struct type *sqx_next; /* next element */ \ +} + +/* + * XOR Simple queue access methods. + */ +#define XSIMPLEQ_XOR(head, ptr) ((__typeof(ptr))((head)->sqx_cookie ^ \ + (unsigned long)(ptr))) +#define XSIMPLEQ_FIRST(head) XSIMPLEQ_XOR(head, ((head)->sqx_first)) +#define XSIMPLEQ_END(head) NULL +#define XSIMPLEQ_EMPTY(head) (XSIMPLEQ_FIRST(head) == XSIMPLEQ_END(head)) +#define XSIMPLEQ_NEXT(head, elm, field) XSIMPLEQ_XOR(head, ((elm)->field.sqx_next)) + + +#define XSIMPLEQ_FOREACH(var, head, field) \ + for ((var) = XSIMPLEQ_FIRST(head); \ + (var) != XSIMPLEQ_END(head); \ + (var) = XSIMPLEQ_NEXT(head, var, field)) + +#define XSIMPLEQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = XSIMPLEQ_FIRST(head); \ + (var) && ((tvar) = XSIMPLEQ_NEXT(head, var, field), 1); \ + (var) = (tvar)) + +/* + * XOR Simple queue functions. + */ +#define XSIMPLEQ_INIT(head) do { \ + arc4random_buf(&(head)->sqx_cookie, sizeof((head)->sqx_cookie)); \ + (head)->sqx_first = XSIMPLEQ_XOR(head, NULL); \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(head)->sqx_first); \ +} while (0) + +#define XSIMPLEQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.sqx_next = (head)->sqx_first) == \ + XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ + (head)->sqx_first = XSIMPLEQ_XOR(head, (elm)); \ +} while (0) + +#define XSIMPLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.sqx_next = XSIMPLEQ_XOR(head, NULL); \ + *(XSIMPLEQ_XOR(head, (head)->sqx_last)) = XSIMPLEQ_XOR(head, (elm)); \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ +} while (0) + +#define XSIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.sqx_next = (listelm)->field.sqx_next) == \ + XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ + (listelm)->field.sqx_next = XSIMPLEQ_XOR(head, (elm)); \ +} while (0) + +#define XSIMPLEQ_REMOVE_HEAD(head, field) do { \ + if (((head)->sqx_first = XSIMPLEQ_XOR(head, \ + (head)->sqx_first)->field.sqx_next) == XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(head)->sqx_first); \ +} while (0) + +#define XSIMPLEQ_REMOVE_AFTER(head, elm, field) do { \ + if (((elm)->field.sqx_next = XSIMPLEQ_XOR(head, \ + (elm)->field.sqx_next)->field.sqx_next) \ + == XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = \ + XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ +} while (0) + + +/* + * Tail queue definitions. + */ +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ +} + +#define TAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).tqh_first } + +#define TAILQ_ENTRY(type) \ +struct { \ + struct type *tqe_next; /* next element */ \ + struct type **tqe_prev; /* address of previous next element */ \ +} + +/* + * Tail queue access methods. + */ +#define TAILQ_FIRST(head) ((head)->tqh_first) +#define TAILQ_END(head) NULL +#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) +#define TAILQ_LAST(head, headname) \ + (*(((struct headname *)((head)->tqh_last))->tqh_last)) +/* XXX */ +#define TAILQ_PREV(elm, headname, field) \ + (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) +#define TAILQ_EMPTY(head) \ + (TAILQ_FIRST(head) == TAILQ_END(head)) + +#define TAILQ_FOREACH(var, head, field) \ + for((var) = TAILQ_FIRST(head); \ + (var) != TAILQ_END(head); \ + (var) = TAILQ_NEXT(var, field)) + +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST(head); \ + (var) != TAILQ_END(head) && \ + ((tvar) = TAILQ_NEXT(var, field), 1); \ + (var) = (tvar)) + + +#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \ + for((var) = TAILQ_LAST(head, headname); \ + (var) != TAILQ_END(head); \ + (var) = TAILQ_PREV(var, headname, field)) + +#define TAILQ_FOREACH_REVERSE_SAFE(var, head, headname, field, tvar) \ + for ((var) = TAILQ_LAST(head, headname); \ + (var) != TAILQ_END(head) && \ + ((tvar) = TAILQ_PREV(var, headname, field), 1); \ + (var) = (tvar)) + +/* + * Tail queue functions. + */ +#define TAILQ_INIT(head) do { \ + (head)->tqh_first = NULL; \ + (head)->tqh_last = &(head)->tqh_first; \ +} while (0) + +#define TAILQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.tqe_next = (head)->tqh_first) != NULL) \ + (head)->tqh_first->field.tqe_prev = \ + &(elm)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm)->field.tqe_next; \ + (head)->tqh_first = (elm); \ + (elm)->field.tqe_prev = &(head)->tqh_first; \ +} while (0) + +#define TAILQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.tqe_next = NULL; \ + (elm)->field.tqe_prev = (head)->tqh_last; \ + *(head)->tqh_last = (elm); \ + (head)->tqh_last = &(elm)->field.tqe_next; \ +} while (0) + +#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.tqe_next = (listelm)->field.tqe_next) != NULL)\ + (elm)->field.tqe_next->field.tqe_prev = \ + &(elm)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm)->field.tqe_next; \ + (listelm)->field.tqe_next = (elm); \ + (elm)->field.tqe_prev = &(listelm)->field.tqe_next; \ +} while (0) + +#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.tqe_prev = (listelm)->field.tqe_prev; \ + (elm)->field.tqe_next = (listelm); \ + *(listelm)->field.tqe_prev = (elm); \ + (listelm)->field.tqe_prev = &(elm)->field.tqe_next; \ +} while (0) + +#define TAILQ_REMOVE(head, elm, field) do { \ + if (((elm)->field.tqe_next) != NULL) \ + (elm)->field.tqe_next->field.tqe_prev = \ + (elm)->field.tqe_prev; \ + else \ + (head)->tqh_last = (elm)->field.tqe_prev; \ + *(elm)->field.tqe_prev = (elm)->field.tqe_next; \ + _Q_INVALIDATE((elm)->field.tqe_prev); \ + _Q_INVALIDATE((elm)->field.tqe_next); \ +} while (0) + +#define TAILQ_REPLACE(head, elm, elm2, field) do { \ + if (((elm2)->field.tqe_next = (elm)->field.tqe_next) != NULL) \ + (elm2)->field.tqe_next->field.tqe_prev = \ + &(elm2)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm2)->field.tqe_next; \ + (elm2)->field.tqe_prev = (elm)->field.tqe_prev; \ + *(elm2)->field.tqe_prev = (elm2); \ + _Q_INVALIDATE((elm)->field.tqe_prev); \ + _Q_INVALIDATE((elm)->field.tqe_next); \ +} while (0) + +#define TAILQ_CONCAT(head1, head2, field) do { \ + if (!TAILQ_EMPTY(head2)) { \ + *(head1)->tqh_last = (head2)->tqh_first; \ + (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ + (head1)->tqh_last = (head2)->tqh_last; \ + TAILQ_INIT((head2)); \ + } \ +} while (0) + +/* + * Singly-linked Tail queue declarations. + */ +#define STAILQ_HEAD(name, type) \ +struct name { \ + struct type *stqh_first; /* first element */ \ + struct type **stqh_last; /* addr of last next element */ \ +} + +#define STAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).stqh_first } + +#define STAILQ_ENTRY(type) \ +struct { \ + struct type *stqe_next; /* next element */ \ +} + +/* + * Singly-linked Tail queue access methods. + */ +#define STAILQ_FIRST(head) ((head)->stqh_first) +#define STAILQ_END(head) NULL +#define STAILQ_EMPTY(head) (STAILQ_FIRST(head) == STAILQ_END(head)) +#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next) + +#define STAILQ_FOREACH(var, head, field) \ + for ((var) = STAILQ_FIRST(head); \ + (var) != STAILQ_END(head); \ + (var) = STAILQ_NEXT(var, field)) + +#define STAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = STAILQ_FIRST(head); \ + (var) && ((tvar) = STAILQ_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * Singly-linked Tail queue functions. + */ +#define STAILQ_INIT(head) do { \ + STAILQ_FIRST((head)) = NULL; \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_FIRST((head)) = (elm); \ +} while (0) + +#define STAILQ_INSERT_TAIL(head, elm, field) do { \ + STAILQ_NEXT((elm), field) = NULL; \ + *(head)->stqh_last = (elm); \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((elm), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_NEXT((elm), field) = (elm); \ +} while (0) + +#define STAILQ_REMOVE_HEAD(head, field) do { \ + if ((STAILQ_FIRST((head)) = \ + STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_REMOVE_AFTER(head, elm, field) do { \ + if ((STAILQ_NEXT(elm, field) = \ + STAILQ_NEXT(STAILQ_NEXT(elm, field), field)) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_REMOVE(head, elm, type, field) do { \ + if (STAILQ_FIRST((head)) == (elm)) { \ + STAILQ_REMOVE_HEAD((head), field); \ + } else { \ + struct type *curelm = (head)->stqh_first; \ + while (STAILQ_NEXT(curelm, field) != (elm)) \ + curelm = STAILQ_NEXT(curelm, field); \ + STAILQ_REMOVE_AFTER(head, curelm, field); \ + } \ +} while (0) + +#define STAILQ_CONCAT(head1, head2) do { \ + if (!STAILQ_EMPTY((head2))) { \ + *(head1)->stqh_last = (head2)->stqh_first; \ + (head1)->stqh_last = (head2)->stqh_last; \ + STAILQ_INIT((head2)); \ + } \ +} while (0) + +#define STAILQ_LAST(head, type, field) \ + (STAILQ_EMPTY((head)) ? NULL : \ + ((struct type *)(void *) \ + ((char *)((head)->stqh_last) - offsetof(struct type, field)))) + +#endif /* !BSD_QUEUE_H */ diff --git a/command.c b/command.c new file mode 100644 index 0000000..20ca0fc --- /dev/null +++ b/command.c @@ -0,0 +1,248 @@ +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include "command.h" +#include "common.h" +#include "connections.h" +#include "db.h" +#include "lua_interface.h" +#include "notify.h" +#include "server.h" + +/** + * Return a new populated command. + */ +struct command * +command_new(unsigned char *command_line, size_t command_len, command_flags_t flags) +{ + struct command *command; + + command = calloc(1, sizeof *command); + if (command == NULL) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + return NULL; + } + + command->command = command_line; + command->command_len = command_len; + command->flags = flags; + + return command; +} + + +/** + * Release a command and its data. + */ +void +command_free(struct command *command) +{ + command->command_len = 0; + command->flags = 0; + if (command->command) { + free(command->command); + command->command = NULL; + } + free(command); +} + + +/** + * This is invoked when a connection has successfully joined the server. + */ +void +command_connected(struct connection *c) +{ + connection_printf(c, "Welcome!\n"); + connection_printf_broadcast(c, true, "[%s has connected]\n", c->name); +} + + +/** + * This is invoked when a connection has been disconnected from the server, + * before the connection is destroyed. + */ +void +command_disconnected(struct connection *c) +{ + connection_printf(c, "Goodbye!\n"); + connection_printf_broadcast(c, true, "[%s has disconnected]\n", c->name); + NOTIFY_DEBUG("%s disconnected", c->name); +} + + +/** + * process some simple commands, until we get fancier + */ +void +command_parse(struct connection *c, struct server_worker_context *ctx, size_t thread_id, struct command *command) +{ + NOTIFY_DEBUG("parsing '%s' from '%s' (%p) [%zu]", command->command, c->name, ctx, thread_id); + + if (command->flags & COMMAND_FLAG_EVENT_CONNECT) { + command_connected(c); + } + if (command->flags & COMMAND_FLAG_EVENT_DISCONNECT) { + command_disconnected(c); + } + + if (!command->command) { + return; + } + + if (strncasecmp((char *)command->command, "auth", 4) == 0) { + char *cmd, *user, *secret, *state; + int uid = 0; + int r = 0; + + cmd = strtok_r((char *)command->command, " \t\n\r", &state); + user = strtok_r(NULL, " \t\n\r", &state); + secret = strtok_r(NULL, " \t\n\r", &state); + + if (!cmd || !user || !secret) { + connection_printf(c, "Failed.\n"); + return; + } + + NOTIFY_DEBUG("authentication request for '%s'", user); + + r = db_uid_from_name(&c->server->db, (unsigned char *)user, &uid); + NOTIFY_DEBUG("r:%d uid: %d", r, uid); + + connection_printf(c, ">> r:%d uid: %d\n", r, uid); + + return; + } + + if (memcmp(command->command, "QUIT", command->command_len) == 0) { + bufferevent_disable(c->bev, EV_READ); + connection_printf(c, "Goodbye! You sent me a total of %zu bytes! (user_time:%ld.%.6ld sys_time:%ld.%.6ld)\n", + c->total_read, + c->utime.tv_sec, (long int) c->utime.tv_usec, + c->stime.tv_sec, (long int) c->utime.tv_usec); + c->state = CONNECTION_STATE_WANT_CLOSE; + return; + } + + if (memcmp(command->command, "@SHUTDOWN", command->command_len) == 0) { + struct timeval _now = { 0, 0 }; + struct event_base *base = c->bev->ev_base; /* no accessor like bufferevent_get_base() so need to directly grab this with */ + + connection_printf(c, "Killing server.\n"); + + event_base_loopexit(base, &_now); + return; + } + + if (memcmp(command->command, "INFO", command->command_len) == 0) { + char *uuid_buf = NULL; + size_t uuid_len; + uuid_rc_t rc; + + rc = uuid_export(c->uuid, UUID_FMT_STR, &uuid_buf, &uuid_len); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_export", uuid_error(rc)); + } + + connection_lock_output(c); + connection_printf(c, "You are connected from: %s\n", c->client_address); + connection_printf(c, "Your connection is %sencrypted.\n", c->flags & CONN_TYPE_SSL ? "" : "not "); + connection_printf(c, "Your UUID is: %s\n", uuid_buf); + connection_unlock_output(c); + + free(uuid_buf); + + return; + } + + if (memcmp(command->command, "WHO", command->command_len) == 0) { + struct connection **clist; + struct connection **clist_iter; + + clist_iter = clist = connections_all_as_array(&c->server->connections, NULL); + + connection_lock_output(c); + while (*clist_iter) { + connection_printf(c, "%s from %s%s%s\n", + (*clist_iter)->name, + (*clist_iter)->client_address, + (*clist_iter)->flags & CONN_TYPE_SSL ? " (via ssl)" : "", + (*clist_iter == c) ? " <- you" : ""); + connection_free(*clist_iter); + clist_iter++; + } + connection_unlock_output(c); + + free(clist); + + return; + } + +#define TOSD(__off__) NOTIFY_DEBUG("TOS[%d]: %s '%s'", lua_gettop(L) - (__off__), lua_typename(L, lua_type(L, -1 - (__off__))), lua_tostring(L, -1 - (__off__))) + if (*command->command == '!') { + lua_State *L = ctx->L; + const char *echo = (const char *) command->command + 1; + + + // lemu_cosnnection_push(L, c); // c + + // int t = lua_gettop(L); + // lua_pushnil(L); /* first key */ + // while (lua_next(L, t) != 0) { + // /* uses 'key' (at index -2) and 'value' (at index -1) */ + // printf("k:%s v:%s\n", + // lua_typename(L, lua_type(L, -2)), + // lua_typename(L, lua_type(L, -1))); + // /* removes 'value'; keeps 'key' for next iteration */ + // switch (lua_type(L, -2)) { + // case LUA_TSTRING: + // printf("\tk:%s\n", lua_tostring(L, -2)); + // break; + // case LUA_TNUMBER: + // printf("\tk:%f\n", lua_tonumber(L, -2)); + // break; + // default: + // printf("\tk:?\n"); + // } + // switch (lua_type(L, -1)) { + // case LUA_TSTRING: + // printf("\tv:%s\n", lua_tostring(L, -1)); + // break; + // case LUA_TNUMBER: + // printf("\tv:%f\n", lua_tonumber(L, -1)); + // break; + // default: + // printf("\tv:?\n"); + // } + // lua_pop(L, 1); + // } + + NOTIFY_DEBUG("lua echo on '%s'", echo); + TOSD(0); + lemu_connection_push(L, c); // c + TOSD(0); + lua_getfield(L, 1, "send"); // c sendfn + TOSD(0); + lua_rotate(L, 1, 1); // sendfn c + TOSD(0); + lua_pushfstring(L, "(lua echo) %s", echo); // sendfn c str + TOSD(0); + lua_call(L, 2, 0); + } + + /* default to broadcasting message */ + connection_printf_broadcast(c, true, "%s said: %s\n", c->name, command->command); + connection_printf(c, "%s said: %s\n", "You", command->command); +} diff --git a/command.h b/command.h new file mode 100644 index 0000000..b789927 --- /dev/null +++ b/command.h @@ -0,0 +1,24 @@ +#ifndef COMMAND_H +#define COMMAND_H + +#include "server.h" + +typedef uint_fast8_t command_flags_t; +enum command_flags { + COMMAND_FLAG_EVENT_CONNECT = (1<<0), + COMMAND_FLAG_EVENT_DISCONNECT = (1<<1), +}; + +struct command { + unsigned char *command; + size_t command_len; + command_flags_t flags; + STAILQ_ENTRY(command) stailq_entry; +}; + +struct command * command_new(unsigned char *command, size_t command_len, command_flags_t flags); +void command_free(struct command *command); + +void command_parse(struct connection *c, struct server_worker_context *ctx, size_t thread_id, struct command *command); + +#endif /* COMMAND_H */ \ No newline at end of file diff --git a/common.c b/common.c new file mode 100644 index 0000000..fb45589 --- /dev/null +++ b/common.c @@ -0,0 +1,150 @@ +/* + * General utility functions which don't have better places to live. + * + */ +#include +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "notify.h" + +/** timeval_diff + * Calculate the difference between two timevals. + * Lifted wholesale from the GNU libc manual. +**/ +int +timeval_diff(struct timeval *result, struct timeval *x, struct timeval *y) +{ + /* Perform the carry for the later subtraction by updating y. */ + if (x->tv_usec < y->tv_usec) { + int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1; + y->tv_usec -= 1000000 * nsec; + y->tv_sec += nsec; + } + if (x->tv_usec - y->tv_usec > 1000000) { + int nsec = (x->tv_usec - y->tv_usec) / 1000000; + y->tv_usec += 1000000 * nsec; + y->tv_sec -= nsec; + } + + /* Compute the time remaining to wait. + tv_usec is certainly positive. */ + result->tv_sec = x->tv_sec - y->tv_sec; + result->tv_usec = x->tv_usec - y->tv_usec; + + /* Return 1 if result is negative. */ + return x->tv_sec < y->tv_sec; +} + +/** timeval_increment + * Increment one timeval by another. +**/ +void +timeval_increment(struct timeval *result, struct timeval *diff) +{ + const long u_factor = 1000000; + + result->tv_sec += diff->tv_sec; + result->tv_usec += diff->tv_usec; + if (result->tv_usec < 0) { + result->tv_usec += u_factor; + result->tv_sec--; + } else if (result->tv_usec > u_factor) { + result->tv_usec -= u_factor; + result->tv_sec++; + } +} + +/** string_to_addrinfo_call + * Attempt to parse a sockaddr ip and port out of a string, + * calls ai_cb function with result (until one success, or + * for all results depending on flag values). + * + * IPv6 addresses must be [bracketed]. (Because of colons.) + * Accepts: + * [ipv6]:port + * ipv4:port + * Returns: + * 0 on valid parse + * -1 on error +**/ +#define _CB_ITER_ALL (1<<0) +int +string_to_addrinfo_call(char *in, unsigned int flags, int (*ai_cb)(struct addrinfo *, void *), void *cb_data) +{ + char *full_string, *port_start, *addr_start, *tmp; + struct addrinfo hints, *res=NULL, *res_iter; + int r; + + full_string = strdup(in); + if (!full_string) { + NOTIFY_ERROR("%s:%s", "strdup", strerror(errno) ); + return -1; + } + + addr_start = strchr(full_string, '['); + if (addr_start) { + *addr_start++ = '\0'; + tmp = strchr(addr_start, ']'); + if (!tmp) { + NOTIFY_ERROR("invalid %saddress '%s': %s", "IPv6 ", in, "unmatched brackets"); + free(full_string); + return -1; + } + *tmp++ = '\0'; + port_start = tmp; + } else { + addr_start = full_string; + port_start = addr_start; + } + + tmp = strrchr(port_start, ':'); + if (!tmp) { + NOTIFY_ERROR("invalid %saddress '%s': %s", "", in, "no port specified"); + free(full_string); + return -1; + } + *tmp++ = '\0'; + port_start = tmp; + + /* We now have address and port as separate strings */ + + memset(&hints, 0, sizeof hints); + + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + /* bind to equiv of INADDR_ANY for any AF, if address is specified as '*' */ + if (strcmp(addr_start, "*") == 0) { + hints.ai_flags |= AI_PASSIVE; + addr_start = NULL; + } + + r = getaddrinfo(addr_start, port_start, &hints, &res); + if (r) { + NOTIFY_ERROR("%s:%s", "getaddrinfo", gai_strerror(r)); + free(full_string); + return -1; + } + + for (res_iter = res; res_iter; res_iter = res_iter->ai_next) { + r = ai_cb(res_iter, cb_data); + if (r) { + NOTIFY_DEBUG("%s:%d", "(ai_cb)", r); + continue; + } + + if (! (flags & _CB_ITER_ALL)) { + break; + } + } + freeaddrinfo(res); + free(full_string); + + return 0; +} diff --git a/common.h b/common.h new file mode 100644 index 0000000..f7c0987 --- /dev/null +++ b/common.h @@ -0,0 +1,20 @@ +#ifndef COMMON_H +#define COMMON_H + +#include +#include +#include +#include + +#define UNUSED __attribute__((__unused__)) + +int +timeval_diff(struct timeval *result, struct timeval *x, struct timeval *y); + +void +timeval_increment(struct timeval *result, struct timeval *diff); + +int +string_to_addrinfo_call(char *in, unsigned int flags, int (*ai_cb)(struct addrinfo *, void *), void *cb_data); + +#endif /* COMMON_H */ diff --git a/connections.c b/connections.c new file mode 100644 index 0000000..fa35f79 --- /dev/null +++ b/connections.c @@ -0,0 +1,927 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#ifndef NI_MAXHOST +# define NI_MAXHOST 1025 +#endif +#ifndef NI_MAXSERV +# define NI_MAXSERV 32 +#endif + +#include // needs _GNU_SOURCE for RUSAGE_THREAD + +#include +#include +#include +#include + +#include + +#include "common.h" +#include "notify.h" +#include "connections.h" +#include "command.h" +#include "server.h" + +/** + * A reference-counted data buffer. + * Used internally here for sending out one message to multiple connections, without + * creating a copy for each connection. + * Frees data pointer when the last reference is removed. + * A connection thread increases the reference count as it sends the message to each + * destination connection, while the libevent thread decrements as it completes each + * transmission. + */ +struct rc_data_ { + pthread_mutex_t mutex; + size_t reference_count; + size_t data_len; + char *data; +}; + +/** + * Allocates, initializes, and returns a new struct rc_data_ entity. + */ +static struct rc_data_ * +rc_data_new_(char *data, size_t len) +{ + struct rc_data_ *rc; + int r; + + if (!data) { + NOTIFY_DEBUG("data:%p len:%zu", data, len); + return NULL; + } + + rc = malloc(sizeof *rc); + if (!rc) { + NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno)); + return NULL; + } + + if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + free(rc); + return NULL; + } + + rc->reference_count = 1; + rc->data_len = len; + rc->data = data; + + return rc; +} + +/** + * Increments the reference count of a struct rc_data_ entity. + */ +static void +rc_data_ref_inc_(struct rc_data_ *rc) +{ + int r; + + if ( (r = pthread_mutex_lock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + rc->reference_count += 1; + + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } +} + +/** + * Decrements the reference count of a struct rc_data_ entity + * and frees everything if there are no more references. + */ +static void +rc_data_free_(struct rc_data_ *rc) +{ + int r; + + if ( (r = pthread_mutex_lock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + rc->reference_count -= 1; + if (rc->reference_count > 0) { + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return; + } + + free(rc->data); + rc->data = NULL; + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + memset(rc, 0, sizeof *rc); + + free(rc); +} + +/** + * Wrapper for rc_data_free_() of the proper callback type to be used by + * evbuffer_add_reference() in connection_multi_send(). + */ +static void +rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg) +{ + struct rc_data_ *r = arg; + + rc_data_free_(r); +} + +/** + * Initializes a struct connections entity. + */ +int +connections_init(struct connections *cs) +{ + int r; + + cs->count = 0; + TAILQ_INIT(&cs->head); + + if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + return -1; + } + + return 0; +} + +/** + * Cleanup a struct connections. + */ +void +connections_fini(struct connections *cs) +{ + struct connection *c1, *c2; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("cs:%p", cs); +#endif + + /* free any connection entities */ + c1 = TAILQ_FIRST(&cs->head); + while (c1 != NULL) { + c2 = TAILQ_NEXT(c1, tailq_entry); + connection_free(c1); + c1 = c2; + } + TAILQ_INIT(&cs->head); + cs->count = 0; + + if ( (r = pthread_mutex_destroy(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +} + + +/** + * Inserts a connection at the end of connections list. + */ +void +connections_append(struct connection *c) +{ + struct connections *cs = &c->server->connections; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p", c); +#endif + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry); + cs->count += 1; + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Removes a connection from its struct connections list. + */ +void +connections_remove(struct connection *c) +{ + struct connections *cs = &c->server->connections; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p", c); +#endif + + if ( !c || !cs) { + NOTIFY_DEBUG("c:%p connections:%p", c, cs); + return; + } + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + TAILQ_REMOVE(&cs->head, c, tailq_entry); + cs->count -= 1; + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Allocates and returns a null-terminated array of pointers to all connection entities in + * connections list, minus the 'c_exclude' connection, if set. + * Used as a simple implementation of broadcasting a message. + * This increments the reference count of each connection added to the array + * so they need to individually be freed before freeing the array. + */ +struct connection ** +connections_all_as_array(struct connections *cs, struct connection *c_exclude) +{ + struct connection **all; + struct connection *c; + size_t i = 0; + int r; + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return NULL; + } + + all = calloc(cs->count + 1, sizeof *all); + if (!all) { + NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno)); + goto err_unlock; + } + + TAILQ_FOREACH(c, &cs->head, tailq_entry) { + if (c != c_exclude + && c->state < CONNECTION_STATE_WANT_CLOSE + && c->state > CONNECTION_STATE_INIT) { + connection_inc_ref(c); + all[i++] = c; + } + } + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + while (i) { + i--; + connection_free(c); + } + free(all); + return NULL; + } + + return all; + +err_unlock: + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return NULL; +} + + +/** + * Wrap ev timeout event to re-resolve hostname. + */ +static void +connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg) +{ + struct connection *c = arg; + + NOTIFY_DEBUG("retrying hostname resolution"); + + c->dns_retry_ev = NULL; + c->dns_retries += 1; + connection_resolve_hostname(c); +} + +/** + * A callback used by connection_init to populate the reverse-lookup of a connection's + * client_address. +**/ +static void +connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx) +{ + struct connection *c = ctx; + char *old_client_address = c->client_address; + + if (result != DNS_ERR_NONE) { + NOTIFY_ERROR("Error resolving: %s", + evdns_err_to_string(result) ); + c->evdns_request = NULL; + + // TODO: configurable + // if (server_get_config_integer(c->server, "dns_retries_max")) { + // if (server_get_config_integer(c->server, "dns_retry_seconds")) { + + if (c->dns_retries < 4) { + struct timeval _seconds = {30, 0}; + c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c); + event_add(c->dns_retry_ev, &_seconds); + }; + return; + } + + switch (type) { + case DNS_PTR: + if (count < 1) { + return; + } + if (count > 1) { + NOTIFY_DEBUG("multiple records returned, using first"); + } + c->client_address = strdup(((char **)addresses)[0]); + NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c); + break; + + default: + NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type); + break; + } + c->evdns_request = NULL; + + if (old_client_address) { + free(old_client_address); + } +} + + +/** + * Perform reverse lookup of connection address. + */ +void +connection_resolve_hostname(struct connection *c) +{ + if (c->evdns_request) { + NOTIFY_DEBUG("resolution already in progress"); + return; + } + + /* does libevent's evdns have a getnameinfo style call? */ + + switch (c->sa->sa_family) { + case AF_INET: + c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c); + break; + + case AF_INET6: + c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c); + break; + + default: + NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family); + } + if (!c->evdns_request) { + NOTIFY_DEBUG("could not submit PTR lookup request"); + } +} + + +/** + * Populates a connection with initial information and sets reference count. + */ +int +connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags) +{ + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + int r; + uuid_rc_t rc; + + if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err; + } + + if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r)); + goto err_rc_mutex_destroy; + } + + rc = uuid_create(&c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc)); + goto err_commands_mutex_destroy; + } + + rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc)); + goto err_uuid_destroy; + } + + c->state = CONNECTION_STATE_INIT; + c->bev = bev; + c->flags = flags; + c->server = s; + + /* render an IP from a sockaddr */ + r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); + if (r) { + NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r)); + strncpy(hbuf, "(unknown)", sizeof hbuf); + strncpy(sbuf, "(?)", sizeof sbuf); + } + c->client_address = strdup(hbuf); + if (!c->client_address) { + NOTIFY_ERROR("%s:%s", "strdup", strerror(errno)); + goto err_uuid_destroy; + } + + /* Past this point, errors are non-fatal. */ + c->reference_count = 1; + + /* Now try to resolve a name from client ip, in background. */ + c->sa = sock; + c->sa_len = socklen; + c->dns_retries = 0; + c->dns_retry_ev = NULL; + connection_resolve_hostname(c); + + /* FIXME: temporary name of connection for POC */ + static int current_connection_number; + r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++); + if ((size_t)r >= sizeof c->name) { + NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__); + } + +#if DARWIN + /* Darwin systems <10.12 lack clock_gettime (!) */ + if (gettimeofday(&c->connect_time, NULL)) { + NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); + } + NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec)); +#else + if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } + NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec)); +#endif /* DARWIN */ + + return 0; + +err_uuid_destroy: + rc = uuid_destroy(c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); + } +err_commands_mutex_destroy: + if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); + } +err_rc_mutex_destroy: + if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); + } +err: + return -1; +} + + +/** + * Process a connection's commands as long as there are commands to process. + * FIXME: track how much time has been spend consuming commands, re-queue + * processor job when over some threshold considering pending workqueue + */ +static void +connection_process_queue_(void *data, void *context, size_t id) +{ + struct connection *c = data; + struct server_worker_context *ctx = context; + struct command *command; + struct rusage ru_start, ru_end; + struct timespec ts_start, ts_current; + int r; + + if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) { + NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno)); + } + while (1) { + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + command = STAILQ_FIRST(&c->commands_head); + if (command) { + STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry); + } + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } + + if (command == NULL) { + break; + } + + if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) { + NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); + } + + command_parse(c, context, id, command); + + if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) { + NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); + } + + /* track how much time was spent procesing */ + connection_accounting_increment(c, &ru_start, &ru_end); + + command_free(command); + + if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } + + } + connection_free(c); + + // Done processing for a connection, force GC cycle to release additional c references. + lua_gc(ctx->L, LUA_GCCOLLECT); +} + + +/** + * Adds a command to a connection's list of things to process. + */ +int +connection_command_enqueue(struct connection *c, struct command *command) +{ + bool was_empty; + int r; + + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + command_free(command); + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return -1; + } + + was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false; + if (was_empty == true) { + STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry); + } else { + STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry); + } + +#if DARWIN + /* Darwin systems <10.12 lack clock_gettime (!) */ + if (gettimeofday(&c->last_received_time, NULL)) { + NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); + } +#else + if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } +#endif /* DARWIN */ + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return -1; + } + + /* if there were already queued commands, a processor should already be scheduled */ + if (was_empty == true) { + connection_inc_ref(c); + if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) { + NOTIFY_ERROR("%s:%s", "workqueue_add", "failed"); + return -1; + } + } + + return 0; +} + + +/** + * Add the utime and stime values to a connection's totals. + */ +void +connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end) +{ + struct timeval utime_diff, stime_diff; + + timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime); + timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime); + + timeval_increment(&c->utime, &utime_diff); + timeval_increment(&c->stime, &stime_diff); +} + +/** + * Locks the output buffer, for sending multiple + * contiguous messages without possible interleaving. + */ +void +connection_lock_output(struct connection *c) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + + evbuffer_lock(output); +} + +/** + * Unlocks the output buffer. + */ +void +connection_unlock_output(struct connection *c) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + + evbuffer_unlock(output); +} + +/** + * Sends a text string to a connection. + */ +int +connection_printf(struct connection *c, const char *fmt, ...) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + va_list ap; + int r; + + va_start(ap, fmt); + r = evbuffer_add_vprintf(output, fmt, ap); + va_end(ap); + + return r; +} + + +/** + * Sends a text string to a connection. + */ +int +connection_vprintf(struct connection *c, const char *fmt, va_list ap) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + int r; + + r = evbuffer_add_vprintf(output, fmt, ap); + + return r; +} + + +/** + * Send one data buffer to multiple connections. + * Data buffer must be malloced, and will be freed when last connection + * has finished with it. + * Returns number of items sent, or negative on error. + */ +int +connection_multi_send(struct connection **clist, char *data, size_t len) +{ + struct connection **clist_iter; + struct rc_data_ *rc; + int r = 0; + + if (!clist) { + NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__); + return -1; + } + + /* Track our data so it can be freed once all connections are done with it. */ + rc = rc_data_new_(data, len); + if (rc == NULL) { + NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len); + return -1; + } + + clist_iter = clist; + while (*clist_iter) { + struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev); + + /* FIXME: if connection is still valid... */ + + /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */ + rc_data_ref_inc_(rc); + if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) { + NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed"); + rc_data_free_(rc); + } else { + r++; + } + clist_iter++; + } + + /* FIXME: clang also claims this is use-after-free, same as above. */ + rc_data_free_(rc); + + return r; +} + + +int +connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...) +{ + struct connection **clist; + char *message = NULL; + ssize_t message_len = 0; + va_list ap; + int r; + + va_start(ap, fmt); + message_len = vsnprintf(message, message_len, fmt, ap); + va_end(ap); + if (message_len < 0) { + NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); + return -1; + } + message_len += 1; + message = malloc(message_len); + if (!message) { + NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno)); + return -1; + } + va_start(ap, fmt); + message_len = vsnprintf(message, message_len, fmt, ap); + va_end(ap); + if (message_len < 0) { + NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); + free(message); + return -1; + } + +#ifdef DEBUG_SILLY + NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message); +#endif + + clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL); + r = connection_multi_send(clist, message, message_len); + if (r < 0) { + NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed"); + } + for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) { + connection_free(*clist_iter); + } + free(clist); + + return r; +} + +/** + * + */ +void +connection_inc_ref(struct connection *c) +{ + int r; + + if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + c->reference_count += 1; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); +#endif + + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Frees a connection (and all data owned by a connection) if there are no more references to it. + */ +void +connection_free(struct connection *c) +{ + struct evbuffer *tmp; + evutil_socket_t fd; + int r; + + if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + c->reference_count -= 1; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); +#endif + + if (c->reference_count > 0) { + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + + return; + } + + NOTIFY_DEBUG("c:%p freeing", c); + + connections_remove(c); + + bufferevent_disable(c->bev, EV_READ|EV_WRITE); + tmp = bufferevent_get_output(c->bev); + evbuffer_drain(tmp, evbuffer_get_length(tmp)); + tmp = bufferevent_get_input(c->bev); + evbuffer_drain(tmp, evbuffer_get_length(tmp)); + + fd = bufferevent_getfd(c->bev); + if (fd != -1) { + EVUTIL_CLOSESOCKET(fd); + bufferevent_setfd(c->bev, -1); + } + + if (c->dns_retry_ev) { + if (event_del(c->dns_retry_ev)) { + NOTIFY_ERROR("%s:%s", "event_del", strerror(errno)); + } + c->dns_retry_ev = NULL; + } + + if (c->evdns_request) { + evdns_cancel_request(c->server->evdns_base, c->evdns_request); + c->evdns_request = NULL; + } + + if (c->client_address) { + free(c->client_address); + c->client_address = NULL; + } + + bufferevent_free(c->bev); + + if (c->uuid) { + uuid_rc_t rc; + + rc = uuid_destroy(c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); + } + c->uuid = NULL; + } + + /* empty out the command queue */ + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + } + + struct command *c1, *c2; + c1 = STAILQ_FIRST(&c->commands_head); + while (c1 != NULL) { + c2 = STAILQ_NEXT(c1, stailq_entry); + free(c1); + c1 = c2; + } + STAILQ_INIT(&c->commands_head); + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + memset(c, 0, sizeof *c); + + free(c); +} diff --git a/connections.h b/connections.h new file mode 100644 index 0000000..5c0a3f8 --- /dev/null +++ b/connections.h @@ -0,0 +1,109 @@ +#ifndef CONNECTIONS_H +#define CONNECTIONS_H + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "bsd_queue.h" +#include "workqueue.h" + +enum connection_state_enum { + CONNECTION_STATE_INIT = 0, + CONNECTION_STATE_CONNECTED, + CONNECTION_STATE_AUTHENTICATED, + CONNECTION_STATE_WANT_CLOSE, + CONNECTION_STATE_CLOSED +}; + +typedef uint_fast8_t connection_flags_t; +enum connection_flags { + CONN_TYPE_SSL = (1<<0), +}; + +struct connection { + pthread_mutex_t rc_mutex; /* mutex for ref count */ + size_t reference_count; /* */ + + struct server *server; + + enum connection_state_enum state; + connection_flags_t flags; + + struct bufferevent *bev; + uuid_t *uuid; /* uuid */ + + char name[256]; /* temporary auto-generated unique name */ + +#ifdef DARWIN + struct timeval connect_time; /* time of instantiation of this connection entity */ + struct timeval last_received_time; /* time of last data received from connection */ +#else + struct timespec connect_timespec; /* time this connection entity was initialized */ + struct timespec last_received_timespec; /* time of last data received from connection */ +#endif /* DARWIN */ + + size_t total_read; /* a tally of number of bytes read from connection */ + struct timeval utime; /* accounting: rusage usertime accumulation */ + struct timeval stime; /* accounting: rusage systime accumulation */ + + char *client_address; + struct sockaddr *sa; + int sa_len; + struct evdns_request *evdns_request; /* the request for looking up the reverse of sa */ + struct event *dns_retry_ev; /* timeout event for retrying failed dns */ + size_t dns_retries; + + pthread_mutex_t commands_mutex; + STAILQ_HEAD(commands_stailq_head, command) commands_head; + volatile struct thread_struct *owner; /* set if command queue is currently being consumed */ + + TAILQ_ENTRY(connection) tailq_entry; +}; + +struct connections { + pthread_mutex_t mutex; + + TAILQ_HEAD(connection_tailq_head, connection) head; + size_t count; +}; + + + +int connections_init(struct connections *cs); +void connections_fini(struct connections *cs); + +void connections_append(struct connection *c); +void connections_remove(struct connection *c); + +struct connection ** connections_all_as_array(struct connections *cs, struct connection *c_exclude); + +int connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags); +void connection_inc_ref(struct connection *c); +void connection_free(struct connection *c); + +int connection_command_enqueue(struct connection *c, struct command *command); + +void connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end); + +void connection_lock_output(struct connection *c); +void connection_unlock_output(struct connection *c); + +int connection_printf(struct connection *c, const char *fmt, ...); +int connection_vprintf(struct connection *c, const char *fmt, va_list ap); +int connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...); + +int connection_multi_send(struct connection **clist, char *data, size_t len); + +void connection_resolve_hostname(struct connection *c); + + +#endif /* CONNECTIONS_H */ diff --git a/db.c b/db.c new file mode 100644 index 0000000..d089231 --- /dev/null +++ b/db.c @@ -0,0 +1,179 @@ +#include +#include +#include + +#include + +#include "common.h" +#include "notify.h" +#include "db.h" + +static const char db_engine_[] = "sqlite3"; + +struct prepared_statement { + const char *statement_name; + const char *arguments; + const char *query; + sqlite3_stmt *statement; +} prepared_statements[] = { + { "begin_transaction", "", "BEGIN TRANSACTION", NULL }, + { "end_transaction", "", "END TRANSACTION", NULL }, + { "vacuum", "", "VACUUM", NULL }, + { "uid_from_name", "s", "SELECT uid FROM uid_name WHERE name = ?", NULL }, + { NULL, NULL, NULL, NULL } +}; + +enum prepared_statement_enums { + PS_BEGIN, + PS_END, + PS_VACUUM, + PS_UID_FROM_NAME, + _NUM_PS_ENUMS +}; + +/** + * pre-prepare all queries used + */ +static int +db_init_all_statements_(sqlite3 *db) +{ + struct prepared_statement *s; + int r; + + for (s = prepared_statements; s->statement_name; s++) { + r = sqlite3_prepare_v2(db, s->query, -1, &s->statement, NULL); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_prepare_v2", s->statement_name, r, sqlite3_errmsg(db)); + return -1; + } + } + + return 0; +} + +/** + * finalize all queries used + */ +static void +db_finalize_all_statements_(sqlite3 *db) +{ + struct prepared_statement *s; + int r; + + for (s = prepared_statements; s->statement_name; s++) { + if (s->statement) { + r = sqlite3_finalize(s->statement); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_finalize", s->statement_name, r, sqlite3_errmsg(db)); + } + s->statement = NULL; + } + } +} + +const char * +db_engine() +{ + return db_engine_; +} + +const char * +db_version() +{ + return sqlite3_libversion(); +} + + +/** + * + */ +int +db_init(struct database *db, struct database_options *db_opts) +{ + int r; + + /* Ensure sqlite header matches library. */ + assert(sqlite3_libversion_number() == SQLITE_VERSION_NUMBER); + assert(strncmp(sqlite3_sourceid(), SQLITE_SOURCE_ID, 80) == 0); + assert(strcmp(sqlite3_libversion(), SQLITE_VERSION) == 0); + + r = sqlite3_open_v2(db_opts->db_file, &db->db, SQLITE_OPEN_READWRITE, NULL); + if (r != SQLITE_OK) { + NOTIFY_ERROR("error opening %s '%s': %s", "db_file", db_opts->db_file, sqlite3_errmsg(db->db)); + return -1; + } + + if (db_init_all_statements_(db->db)) { + NOTIFY_ERROR("%s:%s", "db_init_all_statements", "failed"); + return -1; + } + + return 0; +} + +/** + * + */ +void +db_fini(struct database *db) +{ + int r; + + db_finalize_all_statements_(db->db); + r = sqlite3_close(db->db); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s:%s", "sqlite3_close", sqlite3_errmsg(db->db)); + } + db->db = NULL; +} + +/** + * given a string, returns the uid which matches + */ +int +db_uid_from_name(struct database *db, unsigned char *name, int *uid) +{ + struct prepared_statement *ps = &prepared_statements[PS_UID_FROM_NAME]; + int retval = -1; + int r; + + r = sqlite3_bind_text(ps->statement, 1, (char *)name, -1, SQLITE_TRANSIENT); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_bind_text", name, r, sqlite3_errmsg(db->db)); + return -2; + } + + do { + r = sqlite3_step(ps->statement); + if (r == SQLITE_ROW) { + if (retval == 0) { + NOTIFY_ERROR("%s:%s", "internal db error", "multiple uids for name"); + } + *uid = sqlite3_column_int(ps->statement, 0); + retval = 0; + + if (*uid == 0) { + if (SQLITE_INTEGER != sqlite3_column_type(ps->statement, 0)) { + NOTIFY_ERROR("%s:%s", "internal db error", "uid is not an integer"); + retval = -3; + } + } + } + } while (r == SQLITE_ROW || r == SQLITE_BUSY); + + if (r != SQLITE_DONE) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_step", name, r, sqlite3_errmsg(db->db)); + retval = -3; + } + + r = sqlite3_reset(ps->statement); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_reset", name, r, sqlite3_errmsg(db->db)); + } + r = sqlite3_clear_bindings(ps->statement); + if (r != SQLITE_OK) { + NOTIFY_ERROR("%s('%s'):%d:%s", "sqlite3_clear_bindings", name, r, sqlite3_errmsg(db->db)); + } + + return retval; +} diff --git a/db.h b/db.h new file mode 100644 index 0000000..a1f929f --- /dev/null +++ b/db.h @@ -0,0 +1,22 @@ +#ifndef DB_H +#define DB_H + +#include + +struct database { + sqlite3 *db; +}; + +struct database_options { + char *db_file; +}; + +int db_init(struct database *db, struct database_options *db_opts); +void db_fini(struct database *db); + +const char *db_engine(); +const char *db_version(); + +int db_uid_from_name(struct database *db, unsigned char *name, int *uid); + +#endif /* DB_H */ \ No newline at end of file diff --git a/lua_interface.c b/lua_interface.c new file mode 100644 index 0000000..03496e8 --- /dev/null +++ b/lua_interface.c @@ -0,0 +1,168 @@ +#include + +#include +#include +#include + +#include "connections.h" +#include "lua_interface.h" +#include "notify.h" + +#define CONNECTION_META_TABLE "MetaConnection" +#define CONNECTION_TABLE "Connection" +#define CONNECTION_TABLE_UD_FIELD "core_" + +/** + * HARD: shared common server state, updatable commands table + * Interim: independent states, any global changes queued to all workers + * Idea: can a proxy table be crafted to access a shared table-like c entity + * Idea: eventual-consistency between lua states (somehow) + * + */ + + +/** + * Create a new referenced coroutine on lua state. + * release with: + * luaL_unref(L, LUA_REGISTRYINDEX, refkey); + */ +int +lua_new_coroutine_ref(lua_State *L, lua_State **coL, int *refkey) +{ + *refkey = LUA_NOREF; + + /* Create new coroutine on top of stack. */ + *coL = lua_newthread(L); + if (!*coL) { + NOTIFY_ERROR("%s:%s", "lua_newthread", "failed"); + return -1; + } + + /* Store coroutine reference in state registry. */ + *refkey = luaL_ref(L, LUA_REGISTRYINDEX); + + return 0; +} + + +/** + * Connection Object + * is a table with metatable for methods and userdata field + * + */ + + +/** + * Add a connection reference to lua stack + * + */ +int +lemu_connection_push(lua_State *L, struct connection *c) +{ + connection_inc_ref(c); + + lua_newtable(L); // new connection table + luaL_getmetatable(L, CONNECTION_META_TABLE); // locate meta + lua_setmetatable(L, -2); // assign meta to connection table + + /* keeping a pointer in userdata, rather than light userdata, to get metatable support */ + struct connection **cud = lua_newuserdatauv(L, sizeof c, 0); + *cud = c; + luaL_getmetatable(L, CONNECTION_META_TABLE); + lua_setmetatable(L, -2); + + lua_setfield(L, -2, CONNECTION_TABLE_UD_FIELD); + return 1; +} + + +/** + * I guess this one would take a stack value to find a connection + */ +static int +lemu_connection_create_(lua_State *L) +{ + struct connection *c; + + luaL_checktype(L, 1, LUA_TSTRING); + // c = connection_lookup(lua_tostring(L, 1)); + c = NULL; + + return lemu_connection_push(L, c); +} + + +/** + * this will only get called on userdata (how does that happen??) + */ +static int +lemu_connection_destroy_(lua_State *L) +{ + struct connection **cud = luaL_checkudata(L, 1, CONNECTION_META_TABLE); + struct connection *c = *cud; + + connection_free(c); + + return 0; +} + + +/** + * Send a fixed string to a connection. + */ +static int +lemu_connection_send_(lua_State *L) +{ + if (lua_gettop(L) < 2) { + lua_pushliteral(L, "too few arguments"); + lua_error(L); + } + luaL_checktype(L, 1, LUA_TTABLE); + lua_getfield(L, 1, CONNECTION_TABLE_UD_FIELD); + luaL_checktype(L, -1, LUA_TUSERDATA); + struct connection **cud = lua_touserdata(L, -1); + struct connection *c = *cud; + + luaL_checktype(L, 2, LUA_TSTRING); + const char *message = lua_tostring(L, 2); + + connection_printf(c, "%s\n", message); + + return 0; +} + + +static const luaL_Reg Connection_funcs[] = { + { "create", lemu_connection_create_ }, + { NULL, NULL } +}; + + +static const luaL_Reg Connection_methods[] = { + { "__gc", lemu_connection_destroy_ }, + { "send", lemu_connection_send_ }, + { NULL, NULL } +}; + + +/** + * Initialize the Connection object prototypes + */ +int +lemu_connection_luainit(lua_State *L) +{ + luaL_newmetatable(L, CONNECTION_META_TABLE); // new userdata metatable, __name = CONNECTION_META_TABLE + lua_pushstring(L, "__index"); + lua_pushvalue(L, -2); // meta table + lua_settable(L, -3); // becomes its own index table + luaL_setfuncs(L, Connection_methods, 0); // add methods to metatable + lua_pop(L, 1); + + luaL_newlibtable(L, Connection_funcs); // new table with functions + luaL_setfuncs(L, Connection_funcs, 0); + lua_setglobal(L, CONNECTION_TABLE); // make it available + + return 0; +} + +// int luaopen_lemu(lua_State *L); diff --git a/lua_interface.h b/lua_interface.h new file mode 100644 index 0000000..7f897b0 --- /dev/null +++ b/lua_interface.h @@ -0,0 +1,8 @@ +#ifndef LUA_INTERFACE_H +#define LUA_INTERFACE_H + +int lemu_connection_luainit(lua_State *); + +int lemu_connection_push(lua_State *, struct connection *); + +#endif /* LUA_INTERFACE_H */ diff --git a/main.c b/main.c new file mode 100644 index 0000000..9b0d98d --- /dev/null +++ b/main.c @@ -0,0 +1,93 @@ +/* + * toy server + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include "version.h" +#include "notify.h" +#include "server.h" + +#define DEFAULT_CONFIG_FILE "conf/lemu.conf" + +#define USAGE_FLAGS_LONG (1<<0) +static void +usage_(char *prog, unsigned int usage_flags) +{ + char *x; + + x = strrchr(prog, '/'); + if (x && *(x + 1)) { + prog = x + 1; + } + + printf("Usage: %s [-c config_file]\n", prog); + if (! (usage_flags & USAGE_FLAGS_LONG)) { + return; + } + printf( + "\t-h \t : this help\n" + "\t-c config_file\t : loads the specified config file [default: '" DEFAULT_CONFIG_FILE "']\n" + ); +} + +int +main(int argc, char **argv) +{ + struct server *server; + char *conf_file = DEFAULT_CONFIG_FILE; + int opt; + int r; + + /* enable threading in libevent before anything tries to use it */ + if ( (r = evthread_use_pthreads()) ) { + NOTIFY_ERROR("%s:%d", "evthread_use_pthreads", r); + } + + SSL_load_error_strings(); + SSL_library_init(); + OpenSSL_add_all_algorithms(); + + while ( (opt = getopt(argc, argv, "c:h")) != -1 ) { + switch (opt) { + case 'c': + conf_file = optarg; + break; + case 'h': + usage_(argv[0], USAGE_FLAGS_LONG); + exit(EXIT_SUCCESS); + break; + default: + usage_(argv[0], 0); + exit(EXIT_FAILURE); + } + } + + server = server_new(); + if (!server) { + NOTIFY_FATAL("server_new"); + exit(EXIT_FAILURE); + } + + if (server_init(server, conf_file)) { + NOTIFY_FATAL("error parsing config file '%s'", conf_file); + exit(EXIT_FAILURE); + } + + server_free(server); + + exit(EXIT_SUCCESS); +} diff --git a/notify.c b/notify.c new file mode 100644 index 0000000..8662f05 --- /dev/null +++ b/notify.c @@ -0,0 +1,52 @@ +#define _GNU_SOURCE 1 + +#include +#include +#include +#include +#include + +#include "notify.h" + +/* + * Generic console notifier. + */ + +static void +notify_fn_default_(int level, const char * func_name, const char * fmt, ...) +{ + FILE *f = (level <= 1) ? stderr : stdout; + static const char * const levels[] = { "FATAL", "ERROR", "INFO", "DEBUG" }; + const int levels_num = sizeof levels / sizeof *levels; + va_list ap; + + flockfile(f); + + fputs_unlocked((func_name && *func_name) ? func_name : "[null]", f); + fputs_unlocked(": --", f); + if (level >= 0 + && level <= levels_num) { + fputs_unlocked(levels[level], f); + } else { + fprintf(f, "%d", level); + } + fputs_unlocked("-- ", f); + + va_start(ap, fmt); + vfprintf(f, fmt, ap); + va_end(ap); + + fputc_unlocked('\n', f); + fflush_unlocked(f); + + funlockfile(f); + +} + +void (* notify_fn)(int level, const char *func_name, const char *msg, ...) __attribute__((format(printf, 3, 4))) = notify_fn_default_; + +void +notify_fn_set(void (* fn)(int level, const char *func_name, const char *fmt, ...)) +{ + notify_fn = fn; +} diff --git a/notify.h b/notify.h new file mode 100644 index 0000000..7a31faa --- /dev/null +++ b/notify.h @@ -0,0 +1,14 @@ +#ifndef NOTIFY_H +#define NOTIFY_H + +#define NOTIFY_FATAL(msg, args...) do { if (notify_fn) notify_fn(0, __func__, msg , ##args); } while (0) +#define NOTIFY_ERROR(msg, args...) do { if (notify_fn) notify_fn(1, __func__, msg , ##args); } while (0) +#define NOTIFY_INFO(msg, args...) do { if (notify_fn) notify_fn(2, __func__, msg , ##args); } while (0) +#define NOTIFY_DEBUG(msg, args...) do { if (notify_fn) notify_fn(3, __func__, msg , ##args); } while (0) + +extern +void (* notify_fn)(int level, const char *func_name, const char *msg, ...) __attribute__((format(printf, 3, 4))); + +void notify_fn_set(void (* notify_fn)(int level, const char *func_name, const char *fmt, ...)); + +#endif /* NOTIFY_H */ diff --git a/server.c b/server.c new file mode 100644 index 0000000..f421383 --- /dev/null +++ b/server.c @@ -0,0 +1,839 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +#include "common.h" +#include "command.h" +#include "db.h" +#include "notify.h" +#include "lua_interface.h" +#include "server.h" +#include "workqueue.h" +#include "version.h" + + +/* Wrapper struct for passing multiple values through a voidstar. + * Used when calling server_add_listener_addrinfo_(). + */ +struct server_listener_convenience_ { + struct server *s; + evconnlistener_cb cb; + char *type; +}; + + +/** + * Load all lua libs except for io and os modules, + * TODO: this should be more fine-grained and properly sandboxed. Also, add server lib (needs to be created). + */ +static void +server_lua_openlibs_(lua_State *L) +{ + const luaL_Reg lualibs[] = { + { "", luaopen_base }, + { LUA_LOADLIBNAME, luaopen_package }, + { LUA_TABLIBNAME, luaopen_table }, + { LUA_STRLIBNAME, luaopen_string }, + { LUA_MATHLIBNAME, luaopen_math }, + // { LUA_DBLIBNAME, luaopen_debug }, + { NULL, NULL } + }; + const luaL_Reg *lib; + for (lib = lualibs ; lib->func; lib++) { + lua_pushcfunction(L, lib->func); + lua_pushstring(L, lib->name); + lua_call(L, 1, 0); + } + + lemu_connection_luainit(L); +} + + +/** + * Allocate and initialize a new per-thread context. + */ +static struct server_worker_context * +server_worker_context_create_(struct server *server) +{ + struct server_worker_context *ctx; + + ctx = calloc(1, sizeof *ctx); + if (ctx == NULL) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + return NULL; + } + ctx->server = server; + ctx->L = luaL_newstate(); + if (!ctx->L) { + NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed"); + free(ctx); + return NULL; + } + server_lua_openlibs_(ctx->L); + + return ctx; +} + +/** + * Free a thread context. + * workqueue:worker_ctx_free_fn + */ +static void +server_worker_context_destroy_(void *data) +{ + struct server_worker_context *ctx = data; + lua_close(ctx->L); + free(ctx); +} + +/** + * Adds a worker thread to the pool. + */ +static int +server_init_add_thread_(struct server *s) +{ + struct server_worker_context *ctx; + ssize_t worker_id; + + ctx = server_worker_context_create_(s); + if (ctx == NULL) { + NOTIFY_ERROR("%s:%s", "server_worker_context_create_", "failed"); + return -1; + } + + worker_id = workqueue_worker_add(&s->workqueue, ctx, 0); + if (worker_id < 0) { + NOTIFY_ERROR("%s:%s", "workqueue_worker_add", "failed"); + server_worker_context_destroy_(ctx); + return -1; + } + + NOTIFY_DEBUG("added worker thread %zd", worker_id); + + return 0; +} + +/** + * Handle accept errors. + */ +static void +server_accept_error_cb_(struct evconnlistener *listener UNUSED, void *ctx UNUSED) +{ + int err = EVUTIL_SOCKET_ERROR(); + NOTIFY_ERROR("%s (%d)", evutil_socket_error_to_string(err), err); +} + + +/** + * Move incoming data to connection queue. + * TBD: is it worth work-queueing this data shuffling, or just let main thread handle it? + */ +static void +server_read_event_(struct bufferevent *bev, void *ctx) +{ + struct connection *c = ctx; + struct evbuffer *input = bufferevent_get_input(bev); + size_t line_len; + unsigned char *line; + + while ( (line = (unsigned char *)evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF)) ) { + c->total_read += line_len; /* this drops the newlines from the total_read tally */ + + struct command *command = command_new(line, line_len, 0); + if (command == NULL) { + NOTIFY_ERROR("%s:%s", "command_new", "failed"); + free(line); + return; + } + + if (connection_command_enqueue(c, command)) { + NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); + command_free(command); + return; + } + } +} + + +/** + * Handle write events. + */ +static void +server_write_event_(struct bufferevent *bev UNUSED, void *ctx) +{ + struct connection *c = ctx; + + if (c->state == CONNECTION_STATE_WANT_CLOSE) { + c->state = CONNECTION_STATE_CLOSED; + + struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT); + if (command == NULL) { + NOTIFY_ERROR("%s:%s", "command_new", "failed"); + return; + } + if (connection_command_enqueue(c, command)) { + NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); + command_free(command); + } + connection_free(c); + } +} + + +/** + * Handle some events. + */ +static void +server_event_(struct bufferevent *bev UNUSED, short events, void *ctx) +{ + struct connection *c = ctx; + int finished = 0; + + if (events & BEV_EVENT_READING) { + NOTIFY_ERROR("reading error from '%s'", c->name); + finished = 1; + } + + if (events & BEV_EVENT_WRITING) { + NOTIFY_ERROR("writing error from '%s'", c->name); + finished = 1; + } + + if (events & BEV_EVENT_EOF) { + NOTIFY_DEBUG("eof from '%s'", c->name); + finished = 1; + } + + if (events & BEV_EVENT_ERROR) { + NOTIFY_ERROR("unrecoverable error from '%s': %s", + c->name, + evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()) ); + finished = 1; + } + + if (events & BEV_EVENT_TIMEOUT) { + NOTIFY_ERROR("timeout from '%s'", c->name); + finished = 1; + } + + if (events & BEV_EVENT_CONNECTED) { + NOTIFY_DEBUG("connected from '%s'", c->name); + } + + if (events & ~(BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT|BEV_EVENT_CONNECTED)) { + NOTIFY_ERROR("unrecognized event from '%s': %hd", c->name, events); + } + + if (finished) { + struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT); + if (command == NULL) { + NOTIFY_ERROR("%s:%s", "command_new", "failed"); + } else { + if (connection_command_enqueue(c, command)) { + NOTIFY_ERROR("%s:%s", "connection_comand_enqueue", "failed"); + command_free(command); + } + } + connection_free(c); + } +} + + +/** + * Everything needed to accept a new connection, regardless of transport type. + */ +static void +server_accept_conn_common_(struct server *server, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags) +{ + struct connection *c; + + c = calloc(1, sizeof *c); + if (!c) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + bufferevent_free(bev); + return; + } + + if (connection_init(server, c, bev, sock, socklen, flags)) { + NOTIFY_ERROR("%s:%s", "connection_init", "failed"); + bufferevent_free(bev); + free(c); + return; + } + c->state = CONNECTION_STATE_CONNECTED; + + /* a bufferevent_add_cb interface would be better, but there isn't one yet */ + bufferevent_setcb(c->bev, server_read_event_, server_write_event_, server_event_, c); + bufferevent_enable(c->bev, EV_READ|EV_WRITE); + + connections_append(c); + + struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_CONNECT); + if (command == NULL) { + NOTIFY_ERROR("%s:%s", "command_new", "failed"); + return; + } + if (connection_command_enqueue(c, command)) { + NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); + command_free(command); + } +} + + +/** + * Accept a new plain-text connection. + */ +static void +server_accept_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx) +{ + struct server *server = ctx; + struct event_base *base = evconnlistener_get_base(listener); + struct bufferevent *bev; + + bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS); + if (!bev) { + NOTIFY_ERROR("%s:%s", "bufferevent_socket_new", "failed"); + return; + } + + server_accept_conn_common_(server, bev, sock, socklen, 0); +} + +/** + * Accept a new ssl connection. + */ +static void +server_accept_ssl_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx) +{ + struct server *server = ctx; + struct event_base *base = evconnlistener_get_base(listener); + struct bufferevent *bev; + SSL *ssl; + + ssl = SSL_new(server->ssl_ctx); + + bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS); + if (!bev) { + NOTIFY_ERROR("%s:%s", "bufferevent_openssl_socket_new", "failed"); + SSL_free(ssl); + return; + } + + server_accept_conn_common_(server, bev, sock, socklen, CONN_TYPE_SSL); +} + + +/** + * Add a new listener binding to server for the provided address. + */ +static int +server_add_listener_addrinfo_(struct addrinfo *ai, void *data) +{ + struct server *s = ((struct server_listener_convenience_ *)data)->s; + evconnlistener_cb cb = ((struct server_listener_convenience_ *)data)->cb; + struct evconnlistener **l; + int retval = 0; + int r; + + if ( (r = pthread_mutex_lock(&s->listeners_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return -1; + } + + if (s->listeners_used == s->listeners_allocated) { + void *new_ptr; + size_t new_allocated = s->listeners_allocated + 8; + + new_ptr = realloc(s->listeners, new_allocated * sizeof *(s->listeners)); + if (!new_ptr) { + NOTIFY_ERROR("realloc(%zu): %s", new_allocated * sizeof *(s->listeners), strerror(errno)); + retval = -1; + goto done; + } + s->listeners = new_ptr; + s->listeners_allocated = new_allocated; + + /* recalloc */ + memset(&s->listeners[s->listeners_used], 0, (s->listeners_allocated - s->listeners_used - 1) * sizeof *s->listeners); + } + +#if DEBUG_SILLY + NOTIFY_DEBUG("new listener in slot %zu", s->listeners_used); +#endif + + s->listeners_used++; + l = &(s->listeners[s->listeners_used - 1]); + *l = evconnlistener_new_bind(s->base, cb, s, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, ai->ai_addr, ai->ai_addrlen); + if (!*l) { + NOTIFY_ERROR("%s:%s", "evconnlistener_new_bind", strerror(errno)); + *l = NULL; + s->listeners_used--; + retval = -1; + goto done; + } + evconnlistener_set_error_cb(*l, server_accept_error_cb_); + +done: + if ( (r = pthread_mutex_unlock(&s->listeners_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + retval = -1; + } + + return retval; +} + + +/** + * Allocates a new server structure, initializes all the structures which don't need + * any specific configuration. Such further setup is handled by server_init(). + */ +struct server * +server_new(void) +{ + pthread_mutexattr_t attr; + struct server *s; + int r; + + s = calloc(1, sizeof *s); + if (!s) { + NOTIFY_ERROR("calloc(%zu, %zu): %s", (size_t)1, sizeof *s, strerror(errno)); + return NULL; + } + + s->base = event_base_new(); + if (!s->base) { + NOTIFY_ERROR("%s:%s", "event_base_new", "failed"); + goto err_free_server; + } + + s->evdns_base = evdns_base_new(s->base, 1); + if (!s->evdns_base) { + NOTIFY_ERROR("%s:%s", "evdns_base", "failed"); + goto err_free_event_base; + } + + if (workqueue_init(&s->workqueue, server_worker_context_destroy_, 0)) { + NOTIFY_ERROR("%s:%s", "workqueue_init", "failed"); + goto err_free_evdns_base; + } + + if ( (r = pthread_mutexattr_init(&attr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r)); + goto err_fini_workqueue; + } + + if ( (r = pthread_mutex_init(&s->listeners_mutex, &attr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_attr; + } + + if (connections_init(&s->connections) < 0) { + NOTIFY_ERROR("%s:%s", "connections_init", "&s->connections failed"); + goto err_destroy_mutex; + } + + s->L = luaL_newstate(); + if (!s->L) { + NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed"); + goto err_free_connections; + } + + if ( (r = pthread_mutexattr_destroy(&attr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); + goto err_close_lua; + } + + return s; + +err_close_lua: + lua_close(s->L); +err_free_connections: + connections_fini(&s->connections); +err_destroy_mutex: + pthread_mutex_destroy(&s->listeners_mutex); +err_destroy_attr: + pthread_mutexattr_destroy(&attr); +err_fini_workqueue: + workqueue_fini(&s->workqueue, true); +err_free_evdns_base: + evdns_base_free(s->evdns_base, 0); +err_free_event_base: + event_base_free(s->base); +err_free_server: + free(s); + return NULL; +} + +/** + * Looking at top of lua stack as string, parse as address and port, + * then add as listener to server. + */ +static void +server_init_listener_directive_a_string_(lua_State *L, struct server_listener_convenience_ *c_data) +{ + char *l_str; + + l_str = (char *)lua_tostring(L, -1); + if (string_to_addrinfo_call(l_str, 0, server_add_listener_addrinfo_, c_data)) { + NOTIFY_ERROR("failed to add '%s' value '%s'", c_data->type, l_str ? l_str : "[unknown]"); + } + NOTIFY_INFO("listening on %s (%s)", l_str, c_data->type); +} + +/** + * Using configuration key provided in c_data->type, create all listeners. + */ +static void +server_init_listener_directive_(lua_State *L, struct server_listener_convenience_ *c_data) +{ + lua_getglobal(L, "server_config"); + lua_getfield(L, -1, c_data->type); + if (lua_istable(L, -1)) { + lua_pushnil(L); + while (lua_next(L, -2) != 0) { + if (lua_isstring(L, -1)) { + server_init_listener_directive_a_string_(L, c_data); + } else { + char *l_str = (char *)lua_tostring(L, -2); /* table key name */ + NOTIFY_ERROR("could not fathom value for '%s' at index '%s'", c_data->type, l_str ? l_str : "[unknown]"); + } + lua_pop(L, 1); + } + } else if (lua_isstring(L, -1)) { + server_init_listener_directive_a_string_(L, c_data); + } else { + NOTIFY_ERROR("could not fathom '%s' value", c_data->type); + } + lua_pop(L, 2); +} + +/** + * Return an allocated copy of value of global lua variable. + */ +static int +get_lua_config_string_(lua_State *L, const char *key, char **value, size_t *value_len) +{ + int retval = 0; + const char *l_str; + size_t l_str_len; + assert(key); + assert(value); + *value = NULL; + + lua_getglobal(L, "server_config"); + lua_getfield(L, -1, key); + + if (!lua_isstring(L, -1)) { + NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "string", lua_typename(L, lua_type(L, -1))); + retval = -1; + goto done; + } + l_str = lua_tolstring(L, -1, &l_str_len); + if (!l_str) { + NOTIFY_ERROR("'%s' value invalid", key); + retval = -1; + goto done; + } + if (value_len) { + *value_len = l_str_len; + } + l_str_len += 1; // \0 + *value = malloc(l_str_len); + if (!*value) { + NOTIFY_ERROR("%s:%s", "malloc", strerror(errno)); + retval = -1; + goto done; + } + memcpy(*value, l_str, l_str_len); +done: + lua_pop(L, 1); + return retval; +} + +static int +get_lua_config_integer_(lua_State *L, const char *key, int *value) +{ + int retval = 0; + assert(key); + assert(value); + + lua_getglobal(L, "server_config"); + lua_getfield(L, -1, key); + + if (!lua_isinteger(L, -1)) { + NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "integer", lua_typename(L, lua_type(L, -1))); + retval = -1; + goto done; + } + *value = (int)lua_tointeger(L, -1); +done: + lua_pop(L, 1); + return retval; +} + +/** + * Load or reload the ssl context, with cert and settings. + */ +int +server_refresh_ssl_context(lua_State *L, struct server *s) +{ + SSL_CTX *new_ssl_ctx, *old_ssl_ctx; + char *l_str; + + new_ssl_ctx = SSL_CTX_new(SSLv23_server_method()); + if (!new_ssl_ctx) { + NOTIFY_ERROR("%s:%s", "SSL_CTX_new", ERR_error_string(ERR_get_error(), NULL)); + return -1; + } + + // load cert + if (get_lua_config_string_(L, "ssl_keyfile", &l_str, NULL)) { + return -1; + } + if (SSL_CTX_use_PrivateKey_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) { + NOTIFY_ERROR("%s file '%s': %s", "key", l_str, ERR_error_string(ERR_get_error(), NULL)); + free(l_str); + return -1; + } + free(l_str); + + if (get_lua_config_string_(L, "ssl_certfile", &l_str, NULL)) { + return -1; + } + if (SSL_CTX_use_certificate_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) { + NOTIFY_ERROR("%s file '%s': %s", "certificate", l_str, ERR_error_string(ERR_get_error(), NULL)); + free(l_str); + return -1; + } + free(l_str); + + old_ssl_ctx = s->ssl_ctx; + s->ssl_ctx = new_ssl_ctx; + + if (old_ssl_ctx) { + SSL_CTX_free(old_ssl_ctx); + } + + return 0; +} + + +/** + * Example heartbeat worker job. + * N.B. server as data instead of connection. + */ +static void +server_heartbeat_worker_(void *data, void *context UNUSED, size_t id UNUSED) +{ + struct server *s = data; + struct connection **clist, **clist_iter; + char *message; + int r; + + clist = connections_all_as_array(&(s->connections), NULL); + if (!clist) { + return; + } + + message = strdup("[heartbeat]\n"); + if (!message) { + NOTIFY_ERROR("%s:%s", "strdup", strerror(errno)); + return; + } + + r = connection_multi_send(clist, message, strlen(message)); + if (r < 0) { + NOTIFY_ERROR("could not broadcast heartbeat"); + } else { + NOTIFY_DEBUG("broadcast heartbeat to %d connection%s", r, (r > 1) ? "s" : ""); + } + + clist_iter = clist; + while (*clist_iter) { + connection_free(*clist_iter); + clist_iter++; + } + free(clist); +} + + +/** + * Periodic heartbeat handler. + */ +static void +server_heartbeat_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg) +{ + struct server *server = arg; + + if (workqueue_add(&server->workqueue, server_heartbeat_worker_, server)) { + NOTIFY_ERROR("%s:%s", "workqueue_add", "failed"); + } +} + + +/** + * Configure the server based on the provided lua file. + */ +int +server_init(struct server *s, char *conf_file) +{ + struct server_listener_convenience_ c_data; + int l_int; + + /* Set up initial server state, in order to process configuration file. */ + server_lua_openlibs_(s->L); + + /* Create config table, which will be used as global env for loading config file. */ + lua_newtable(s->L); + lua_setglobal(s->L, "server_config"); + + + if (luaL_loadfile(s->L, conf_file)) { + NOTIFY_ERROR("could not %s configuration file '%s': %s", + "load", + conf_file, + lua_tostring(s->L, -1)); + return -1; + } + /* server_config table becomes global env for executing config file */ + lua_getglobal(s->L, "server_config"); + lua_setupvalue(s->L, -2, 1); + + if (lua_pcall(s->L, 0, 0, 0) != LUA_OK) { + NOTIFY_ERROR("could not %s configuration file '%s': %s", + "parse", + conf_file, + lua_tostring(s->L, -1)); + return -1; + } + + // FIXME: let db fetch its own options + struct database_options db_options = { 0 }; + if (get_lua_config_string_(s->L, "db_file", &db_options.db_file, NULL)) { + return -1; + } + + if (db_init(&s->db, &db_options)) { + NOTIFY_ERROR("%s:%s", "db_init", "failed"); + free(db_options.db_file); + return -1; + } + free(db_options.db_file); + + if (server_refresh_ssl_context(s->L, s)) { + return -1; + } + + if (get_lua_config_integer_(s->L, "processing_threads", &l_int)) { + return -1; + } + /* FIXME: define reasonable limits */ + if (l_int < 1 || l_int > 64) { + NOTIFY_ERROR("'%s' is not a reasonable value", "processing_threads"); + return -1; + } + // FIXME: while l_int != workqueue_workers() + while (l_int) { + if (server_init_add_thread_(s)) { + NOTIFY_ERROR("could not start processing thread"); + return -1; + } + l_int--; + } + + c_data.s = s; + c_data.cb = server_accept_conn_event_; + c_data.type = "listen"; + server_init_listener_directive_(s->L, &c_data); + + c_data.cb = server_accept_ssl_conn_event_; + c_data.type = "listen_ssl"; + server_init_listener_directive_(s->L, &c_data); + + do { + struct timeval _seconds = {30, 0}; + struct event *ev; + /* FIXME: instead of persist, reset from event */ + ev = event_new(s->base, -1, EV_TIMEOUT | EV_PERSIST, server_heartbeat_event_, s); + event_add(ev, &_seconds); + } while (0); + + NOTIFY_INFO("server starting...\n" + "\t%s %s (%s)\n" + "\tlibevent %s [%s]\n" + "\t%s %s\n" + "\t%s\n" + "\t%s", + "lemu", VERSION_STR, VERSION_DATE, + event_get_version(), event_base_get_method(s->base), + db_engine(), db_version(), + OpenSSL_version(OPENSSL_VERSION), + LUA_RELEASE); + + event_base_dispatch(s->base); + + return 0; +} + +void +server_free(struct server *s) +{ + int r; + + while (s->listeners_used > 0) { + s->listeners_used--; + if (s->listeners[s->listeners_used]) { + evconnlistener_free(s->listeners[s->listeners_used]); + s->listeners[s->listeners_used] = NULL; + } + NOTIFY_DEBUG("freed listener %zu", s->listeners_used); + } + free(s->listeners); + s->listeners = NULL; + + workqueue_fini(&s->workqueue, false); + + if (s->ssl_ctx) { + SSL_CTX_free(s->ssl_ctx); + s->ssl_ctx = NULL; + } + + if (s->evdns_base) { + evdns_base_free(s->evdns_base, 0); + s->evdns_base = NULL; + } + + if (s->base) { + event_base_free(s->base); + s->base = NULL; + } + + if ( (r = pthread_mutex_destroy(&s->connections.mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + if ( (r = pthread_mutex_destroy(&s->listeners_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + lua_close(s->L); + + db_fini(&s->db); + + free(s); +} diff --git a/server.h b/server.h new file mode 100644 index 0000000..e4b0705 --- /dev/null +++ b/server.h @@ -0,0 +1,55 @@ +#ifndef SERVER_H +#define SERVER_H + +#include +#include + +#include + +#include + +#include "common.h" +#include "connections.h" +#include "db.h" +#include "workqueue.h" + +struct server { + struct event_base *base; + struct evdns_base *evdns_base; + + pthread_mutex_t listeners_mutex; + struct evconnlistener **listeners; + size_t listeners_allocated; + size_t listeners_used; + + SSL_CTX *ssl_ctx; + + struct database db; + + /* a collection of all connections */ + struct connections connections; + + /* worker thread wrangler */ + struct workqueue workqueue; + + /* server lua context, currently to load and parse config file */ + lua_State *L; +}; + +/* each worker thread gets a reference to the server and its own lua state */ +struct server_worker_context { + struct server *server; + lua_State *L; +}; + +struct server * server_new(void); + +int server_init(struct server *s, char *conf_file); + +void server_free(struct server *s); + +void server_heartbeat_event(evutil_socket_t, short, void *); + +int server_refresh_ssl_context(lua_State *, struct server *); + +#endif /* SERVER_H */ \ No newline at end of file diff --git a/version.h b/version.h new file mode 100644 index 0000000..92d07ef --- /dev/null +++ b/version.h @@ -0,0 +1,12 @@ +#ifndef VERSION_H +#define VERSION_H + +#define VERSION_MAJOR 0 +#define VERSION_MINOR 0 +#define VERSION_EXTRA_STR "prototype" +#define VERSION_STR "0.0-prototype" +#define VERSION_DATE "2021-02-23" +#define VERSION_COMMIT "863cf651c2fcb4cc1735f8f7df1620025352f2b1" +#define VERSION_BRANCH "master" + +#endif diff --git a/version.sh b/version.sh new file mode 100755 index 0000000..c443233 --- /dev/null +++ b/version.sh @@ -0,0 +1,41 @@ +#!/bin/sh +# generate new version.h + +set -e + +if [ $# -lt 2 ] + then + echo "usage: `basename $0` []" + exit 1 +fi + +maj=`printf "%u" $1` +min=`printf "%u" $2` +shift 2 +ext="$@" + +str="${maj}.${min}" +if [ -n "${ext}" ] + then + str="${str}-${ext}" +fi +now=`TZ=UTC date "+%Y-%m-%d"` + +commit=`git rev-parse HEAD` +branch=`git rev-parse --abbrev-ref HEAD` + +cat>"version.h"< +#include +#include +#include +#include + +#include "notify.h" +#include "workqueue.h" + +#ifdef DEBUG_SILLY +# define D(fmt, ...) NOTIFY_DEBUG(fmt, ##__VA_ARGS__) +#else +# define D(fmt, ...) +#endif /* DEBUG_SILLY */ + +#define FLAG_IS_TERMINATE(__flags__) ((__flags__) & WQF_TERMINATE) +#define FLAG_IS_NOCOMPLETE(__flags__) ((__flags__) & WQF_NOCOMPLETE) +#define FLAG_IS_TERMINATE_NOCOMPLETE(__flags__) ((__flags__) && (WQF_TERMINATE|WQF_NOCOMPLETE) == (WQF_TERMINATE|WQF_NOCOMPLETE)) +#define FLAG_IS_NORECYCLE(__flags__) ((__flags__) & WQF_NORECYCLE) + +#if defined(__GNUC__) || defined(__clang__) +# define ALWAYS_INLINE __attribute__((always_inline)) +#else +# define ALWAYS_INLINE +#endif + +/** + * Shorthand for locking with error report. + */ +inline static int +lock_(pthread_mutex_t *mutex) +{ + assert(mutex != NULL); + int r; + if ( (r = pthread_mutex_lock(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + if (r == EOWNERDEAD) { + /* sanity check call here */ + if ( (r = pthread_mutex_consistent(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_consistent", strerror(r)); + } + } + } + return r; +} ALWAYS_INLINE + +/** + * Shorthand for unlocking with error report. + */ +inline static int +unlock_(pthread_mutex_t *mutex) +{ + assert(mutex != NULL); + int r; + if ( (r = pthread_mutex_unlock(mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return r; +} ALWAYS_INLINE + +/** + * Release a work struct, either saving it to the free list or freeing the allocation. + */ +inline static void +work_free_(struct workqueue *wq, struct work *work) +{ + assert(wq != NULL); + if (work) { + if (FLAG_IS_NORECYCLE(wq->flags)) { + free(work); + } else { + if (lock_(&(wq->free_mutex))) { + D("freeing work %p", work); + free(work); + return; + } + + STAILQ_INSERT_TAIL(&(wq->free_head), work, stailq); + wq->n_free += 1; + + (void)unlock_(&(wq->free_mutex)); + D("recycling work %p, %zu available", work, wq->n_free); + } + } +} + +/** + * Allocate a work struct, either new or from free list. + */ +inline static struct work * +work_alloc_(struct workqueue *wq) +{ + assert(wq != NULL); + struct work *work = NULL; + + if (!FLAG_IS_NORECYCLE(wq->flags)) { + if (lock_(&(wq->free_mutex))) { + return NULL; + } + + work = STAILQ_FIRST(&(wq->free_head)); + if (work) { + STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); + wq->n_free -= 1; + } + + (void)unlock_(&(wq->free_mutex)); + } + + if (!work) { + work = calloc(1, sizeof *work); + if (!work) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + } + D("new work %p, %zu", work, wq->n_free); + } else { + D("recycled work %p, %zu available", work, wq->n_free); + } + + return work; +} + +/** + * Trim free list down to #retain items. + */ +ssize_t +workqueue_release_work(struct workqueue *wq, size_t retain) { + assert(wq != NULL); + struct work *work; + ssize_t released = 0; + + if (lock_(&(wq->free_mutex))) { + return -1; + } + + while (wq->n_free > retain) { + work = STAILQ_FIRST(&(wq->free_head)); + STAILQ_REMOVE_HEAD(&(wq->free_head), stailq); + wq->n_free -= 1; + released += 1; + free(work); + D("freeing work %p", work); + } + + if (unlock_(&(wq->free_mutex))) { + return -1; + } + + return released; +} + +static void * +worker_(void *data) +{ + assert(data != NULL); + struct worker * const worker = data; + struct workqueue * const wq = worker->workqueue; + void *retval = NULL; + int r; + struct work *work = NULL, *priority_work = NULL; + + D("[%zu] started", worker->id); + + while (!FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { + if (lock_(&(wq->work_mutex))) { + retval = (void *)-1; + goto done; + } + + D("[%zu] looking for work", worker->id); + while ((priority_work = STAILQ_FIRST(&(worker->priority_work_head))) == NULL + && (work = STAILQ_FIRST(&(wq->work_head))) == NULL) { + if (FLAG_IS_TERMINATE(wq->flags | worker->flags)) { + D("[%zu] no more work", worker->id); + if (unlock_(&(wq->work_mutex))) { + retval = (void *)-1; + } + goto done; + } + + if ( (r = pthread_cond_wait(&(wq->work_cond), &(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_wait", strerror(r)); + retval = (void *)-1; + } + D("[%zu] woke (flags %d|%d)", worker->id, worker->flags, wq->flags); + if (r + || FLAG_IS_TERMINATE_NOCOMPLETE(wq->flags | worker->flags)) { + if (unlock_(&(wq->work_mutex))) { + retval = (void *)-1; + } + goto done; + } + } + + if (priority_work) { + D("[%zu] got priority work %p", worker->id, work); + STAILQ_REMOVE_HEAD(&(worker->priority_work_head), stailq); + worker->n_priority_work -= 1; + work = priority_work; + } else { + D("[%zu] got work %p", worker->id, work); + STAILQ_REMOVE_HEAD(&(wq->work_head), stailq); + wq->n_work -= 1; + } + + r = 0; + if (STAILQ_FIRST(&(wq->work_head))) { + if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); + retval = (void *)-1; + } + } + + if (unlock_(&(wq->work_mutex)) || r) { + retval = (void *)-1; + goto done; + } + + work->fn(work->data, worker->ctx, worker->id); + work_free_(wq, work); + work = NULL; + } +done: + D("[%zu] done (%p)", worker->id, retval); + pthread_exit(retval); +} + +int +workqueue_init(struct workqueue *wq, worker_ctx_free_fn_t *worker_ctx_free_fn, workqueue_flags_t flags) +{ + assert(wq != NULL); + int r; + pthread_mutexattr_t mutexattr; + + D("initializing"); + + wq->flags = flags; + wq->worker_ctx_free_fn = worker_ctx_free_fn; + wq->n_workers = 0; + wq->workers_next_id = 0; + wq->n_free = 0; + wq->n_work = 0; + wq->n_work_highwater = 0; + + STAILQ_INIT(&(wq->work_head)); + STAILQ_INIT(&(wq->free_head)); + LIST_INIT(&(wq->workers_head)); + + if ( (r = pthread_mutexattr_init(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r)); + goto err_init; + } + if ( (r = pthread_mutexattr_setrobust(&mutexattr, 1)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_setrobust", strerror(r)); + goto err_destroy_mutexattr; + } + + if ( (r = pthread_mutex_init(&(wq->free_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_mutexattr; + } + + if ( (r = pthread_mutex_init(&(wq->workers_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_freemutex; + } + + if ( (r = pthread_mutex_init(&(wq->work_mutex), &mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err_destroy_workersmutex; + } + if ( (r = pthread_cond_init(&(wq->work_cond), NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_init", strerror(r)); + goto err_destroy_workmutex; + } + + if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); + goto err_destroy_workcond; + } + + return 0; + +err_destroy_workcond: + if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); + } +err_destroy_workmutex: + if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_workersmutex: + if ( (r = pthread_mutex_destroy(&(wq->workers_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_freemutex: + if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +err_destroy_mutexattr: + if ( (r = pthread_mutexattr_destroy(&mutexattr)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); + } +err_init: + return -1; +} + +void +workqueue_fini(struct workqueue *wq, bool flush_unfinished) +{ + assert(wq != NULL); + struct worker *worker, *worker_tmp; + struct work *w1, *w2; + void *res; + int r; + + D("destroying"); + + workqueue_flags_t new_flags = WQF_TERMINATE; + if (flush_unfinished == true) { + new_flags |= WQF_NOCOMPLETE; + } + + wq->flags |= new_flags; + + if (flush_unfinished == true) { + (void)lock_(&(wq->work_mutex)); + w1 = STAILQ_FIRST(&(wq->work_head)); + while (w1 != NULL) { + D("flushing unfinished work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->work_head)); + (void)unlock_(&(wq->work_mutex)); + } + + (void)lock_(&(wq->workers_mutex)); + + LIST_FOREACH(worker, &(wq->workers_head), list) { + worker->flags |= new_flags; + } + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + + LIST_FOREACH_SAFE(worker, &(wq->workers_head), list, worker_tmp) { + LIST_REMOVE(worker, list); + wq->n_workers -= 1; + + D("joining worker %zu", worker->id); + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + + w1 = STAILQ_FIRST(&(worker->priority_work_head)); + while (w1 != NULL) { + D("flushing unfinished priority work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(worker->priority_work_head)); + + if (wq->worker_ctx_free_fn) { + wq->worker_ctx_free_fn(worker->ctx); + } + + free(worker); + } + + (void)unlock_(&(wq->workers_mutex)); + + if ( (r = pthread_cond_destroy(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_destroy", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&(wq->work_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&(wq->free_mutex))) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + w1 = STAILQ_FIRST(&(wq->free_head)); + while (w1 != NULL) { + D("freeing work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->free_head)); +} + +ssize_t +workqueue_worker_add(struct workqueue *wq, void *ctx, workqueue_flags_t flags) +{ + assert(wq != NULL); + struct worker *worker; + sigset_t set, oldset; + int r; + + if (sigfillset(&set) < 0) { + NOTIFY_ERROR("%s:%s", "sigfillset", strerror(errno)); + return -1; + } + + worker = calloc(1, sizeof *worker); + if (worker == NULL) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + return -1; + } + + worker->workqueue = wq; + worker->ctx = ctx; + worker->flags = flags; + + STAILQ_INIT(&(worker->priority_work_head)); + worker->n_priority_work = 0; + + if ( (r = pthread_sigmask(SIG_BLOCK, &set, &oldset)) ) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + goto err_free_worker; + } + + if ( (r = pthread_create(&(worker->thread), NULL, worker_, worker)) ) { + NOTIFY_ERROR("%s:%s", "pthread_create", strerror(r)); + if ((r = pthread_sigmask(SIG_SETMASK, &oldset, NULL))) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + } + goto err_cancel; + } + + if ( (r = pthread_sigmask(SIG_SETMASK, &oldset, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_sigmask", strerror(r)); + goto err_cancel; + } + + if (lock_(&(wq->workers_mutex))) { + goto err_cancel; + } + + worker->id = wq->workers_next_id; + wq->workers_next_id += 1; + + LIST_INSERT_HEAD(&(wq->workers_head), worker, list); + + if (unlock_(&(wq->workers_mutex))) { + goto err_cancel; + } + + return worker->id; + +err_cancel: + worker->flags |= WQF_TERMINATE|WQF_NOCOMPLETE; + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + void *res; + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + +err_free_worker: + free(worker); + return -1; +} + +ssize_t +workqueue_worker_remove(struct workqueue *wq) +{ + assert(wq != NULL); + struct worker *worker; + struct work *w1, *w2; + ssize_t retval = -1; + int r; + + if (lock_(&(wq->workers_mutex))) { + return -1; + } + + worker = LIST_FIRST(&(wq->workers_head)); + if (worker) { + LIST_REMOVE(worker, list); + wq->n_workers -= 1; + retval = worker->id; + } + + if (unlock_(&(wq->workers_mutex))) { + retval = -1; + } + + if (worker == NULL) { + NOTIFY_ERROR("no workers to remove"); + return -1; + } + + worker->flags |= WQF_TERMINATE | WQF_NOCOMPLETE; + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + retval = -1; + } + + void *res; + if ( (r = pthread_join(worker->thread, &res)) ) { + NOTIFY_ERROR("%s:%s", "pthread_join", strerror(r)); + } + + w1 = STAILQ_FIRST(&(worker->priority_work_head)); + while (w1 != NULL) { + D("freeing work %p", w1); + w2 = STAILQ_NEXT(w1, stailq); + free(w1); + w1 = w2; + } + STAILQ_INIT(&(wq->free_head)); + + if (wq->worker_ctx_free_fn) { + wq->worker_ctx_free_fn(worker->ctx); + } + + free(worker); + + return retval; +} + +int +workqueue_add(struct workqueue *wq, work_fn_t *fn, void *data) +{ + assert(wq != NULL); + assert(fn != NULL); + struct work *work; + int r; + + if (FLAG_IS_TERMINATE(wq->flags)) { + NOTIFY_ERROR("not adding work to terminating queue"); + return -1; + } + + work = work_alloc_(wq); + if (work == NULL) { + return -1; + } + work->fn = fn; + work->data = data; + + if (lock_(&(wq->work_mutex))) { + free(work); + return -1; + } + + STAILQ_INSERT_TAIL(&(wq->work_head), work, stailq); + wq->n_work += 1; + if (wq->n_work > wq->n_work_highwater) { + wq->n_work_highwater = wq->n_work; + } + + if ( (r = pthread_cond_signal(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_signal", strerror(r)); + } + + if (unlock_(&(wq->work_mutex))) { + return -1; + } + + return 0; +} + +int +workqueue_add_all_workers(struct workqueue *wq, work_fn_t *fn, void *data) +{ + assert(wq != NULL); + assert(fn != NULL); + struct worker *worker; + struct work *work; + int retval = 0; + int r; + + if (FLAG_IS_TERMINATE(wq->flags)) { + NOTIFY_ERROR("not adding work to terminating queue"); + retval = -1; + goto done; + } + + if (lock_(&(wq->workers_mutex))) { + retval = -1; + goto done; + } + + if (lock_(&(wq->work_mutex))) { + retval = -1; + goto err_unlock_workers; + } + + LIST_FOREACH(worker, &(wq->workers_head), list) { + work = work_alloc_(wq); + if (work == NULL) { + retval = -1; + goto err_unlock_work; + } + work->fn = fn; + work->data = data; + + STAILQ_INSERT_TAIL(&(worker->priority_work_head), work, stailq); + worker->n_priority_work += 1; + } + + if ( (r = pthread_cond_broadcast(&(wq->work_cond))) ) { + NOTIFY_ERROR("%s:%s", "pthread_cond_broadcast", strerror(r)); + } + +err_unlock_work: + if (unlock_(&(wq->work_mutex))) { + retval = -1; + goto err_unlock_workers; + } + +err_unlock_workers: + if (unlock_(&(wq->workers_mutex))) { + return -1; + } +done: + return retval; +} diff --git a/workqueue.h b/workqueue.h new file mode 100644 index 0000000..0c4d40e --- /dev/null +++ b/workqueue.h @@ -0,0 +1,108 @@ +#ifndef _WORKQUEUE_H_ +#define _WORKQUEUE_H_ + +#include +#include +#include +#include + +#include "bsd_queue.h" + +/** + * Function signature to execute with user data, thread context, and thread id. + */ +typedef void (work_fn_t)(void *, void *, size_t); + +/** + * Function signature to free a worker thread's ctx. + */ +typedef void (worker_ctx_free_fn_t)(void *); + +/** + * Function signature for receiving error strings. + */ +typedef int (printf_fn_t) (const char *format, ...); + +/** + * Flag type for controlling worker behavior. + * We only need three. + */ +typedef uint_fast8_t workqueue_flags_t; + +/** + * Internal unit of work. + */ +struct work { + STAILQ_ENTRY(work) stailq; + work_fn_t *fn; + void *data; +}; + +enum workqueue_flags { + WQF_TERMINATE = (1<<0), /* worker thread will exit when out of work */ + WQF_NOCOMPLETE = (1<<1), /* worker thread will not finish all work before exiting */ + WQF_NORECYCLE = (1<<2), /* do not reuse work allocations */ +}; + +/** + * A worker thread, with related data. + */ +struct worker { + pthread_t thread; + struct workqueue *workqueue; + void *ctx; + LIST_ENTRY(worker) list; + size_t id; + volatile workqueue_flags_t flags; + + /* Queue of worker-specific work to perform, before consuming general work queue. */ + /* Guarded by workqueue->work_mutex */ + STAILQ_HEAD(priority_work_head, work) priority_work_head; + size_t n_priority_work; + /* End of workqueue->work_mutex guard */ +}; + +/** + * + */ +struct workqueue { + /* List of active worker threads. */ + pthread_mutex_t workers_mutex; + LIST_HEAD(workers_head, worker) workers_head; + size_t n_workers; + size_t workers_next_id; + /* End of workers_mutex guard */ + + worker_ctx_free_fn_t *worker_ctx_free_fn; + + /* Queue of work units awaiting processing. */ + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + STAILQ_HEAD(work_head, work) work_head; + size_t n_work; + size_t n_work_highwater; + /* End of work_mutex guard */ + + /* Queue of allocated unutilized work units. */ + pthread_mutex_t free_mutex; + STAILQ_HEAD(free_head, work) free_head; + size_t n_free; + /* End of free_mutex guard */ + + volatile workqueue_flags_t flags; +}; + +/** + * Initialize a workqueue. + */ +int workqueue_init(struct workqueue *, worker_ctx_free_fn_t *, workqueue_flags_t); +void workqueue_fini(struct workqueue *, bool); + +ssize_t workqueue_worker_add(struct workqueue *, void *, workqueue_flags_t); +ssize_t workqueue_worker_remove(struct workqueue *); + +ssize_t workqueue_release_work(struct workqueue *, size_t); + +int workqueue_add(struct workqueue *, work_fn_t *, void *); + +#endif /* _WORKQUEUE_H_ */ -- 2.45.2