rough framework
[lemu] / server.c
diff --git a/server.c b/server.c
new file mode 100644 (file)
index 0000000..f421383
--- /dev/null
+++ b/server.c
@@ -0,0 +1,839 @@
+#include <pthread.h>
+#include <string.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <assert.h>
+
+#include <event2/bufferevent_ssl.h>
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+#include <event2/listener.h>
+#include <event2/dns.h>
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#include <lua.h>
+#include <lualib.h>
+#include <lauxlib.h>
+
+#include "common.h"
+#include "command.h"
+#include "db.h"
+#include "notify.h"
+#include "lua_interface.h"
+#include "server.h"
+#include "workqueue.h"
+#include "version.h"
+
+
+/*  Wrapper struct for passing multiple values through a voidstar.
+ *  Used when calling server_add_listener_addrinfo_().
+ */
+struct server_listener_convenience_ {
+       struct server *s;
+       evconnlistener_cb cb;
+       char *type;
+};
+
+
+/**
+ * Load all lua libs except for io and os modules,
+ * TODO: this should be more fine-grained and properly sandboxed.  Also, add server lib (needs to be created).
+ */
+static void
+server_lua_openlibs_(lua_State *L)
+{
+       const luaL_Reg lualibs[] = {
+               { "", luaopen_base },
+               { LUA_LOADLIBNAME, luaopen_package },
+               { LUA_TABLIBNAME, luaopen_table },
+               { LUA_STRLIBNAME, luaopen_string },
+               { LUA_MATHLIBNAME, luaopen_math },
+               // { LUA_DBLIBNAME, luaopen_debug },
+               { NULL, NULL }
+       };
+       const luaL_Reg *lib;
+       for (lib = lualibs ; lib->func; lib++) {
+               lua_pushcfunction(L, lib->func);
+               lua_pushstring(L, lib->name);
+               lua_call(L, 1, 0);
+       }
+
+       lemu_connection_luainit(L);
+}
+
+
+/**
+ * Allocate and initialize a new per-thread context.
+ */
+static struct server_worker_context *
+server_worker_context_create_(struct server *server)
+{
+       struct server_worker_context *ctx;
+
+       ctx = calloc(1, sizeof *ctx);
+       if (ctx == NULL) {
+               NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+               return NULL;
+       }
+       ctx->server = server;
+       ctx->L = luaL_newstate();
+       if (!ctx->L) {
+               NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
+               free(ctx);
+               return NULL;
+       }
+       server_lua_openlibs_(ctx->L);
+
+       return ctx;     
+}
+
+/**
+ * Free a thread context.
+ * workqueue:worker_ctx_free_fn
+ */
+static void
+server_worker_context_destroy_(void *data)
+{
+       struct server_worker_context *ctx = data;
+       lua_close(ctx->L);
+       free(ctx);
+}
+
+/**
+ * Adds a worker thread to the pool.
+ */
+static int
+server_init_add_thread_(struct server *s)
+{
+       struct server_worker_context *ctx;
+       ssize_t worker_id;
+
+       ctx = server_worker_context_create_(s);
+       if (ctx == NULL) {
+               NOTIFY_ERROR("%s:%s", "server_worker_context_create_", "failed");
+               return -1;
+       }
+
+       worker_id = workqueue_worker_add(&s->workqueue, ctx, 0);
+       if (worker_id < 0) {
+               NOTIFY_ERROR("%s:%s", "workqueue_worker_add", "failed");
+               server_worker_context_destroy_(ctx);
+               return -1;
+       }
+
+       NOTIFY_DEBUG("added worker thread %zd", worker_id);
+
+       return 0;
+}
+
+/**
+ * Handle accept errors.
+ */
+static void
+server_accept_error_cb_(struct evconnlistener *listener UNUSED, void *ctx UNUSED)
+{
+       int err = EVUTIL_SOCKET_ERROR();
+       NOTIFY_ERROR("%s (%d)", evutil_socket_error_to_string(err), err);
+}
+
+
+/**
+ * Move incoming data to connection queue.
+ * TBD: is it worth work-queueing this data shuffling, or just let main thread handle it?
+ */
+static void
+server_read_event_(struct bufferevent *bev, void *ctx)
+{
+       struct connection *c = ctx;
+       struct evbuffer *input = bufferevent_get_input(bev);
+       size_t line_len;
+       unsigned char *line;
+
+       while ( (line = (unsigned char *)evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF)) ) {
+               c->total_read += line_len; /* this drops the newlines from the total_read tally */
+
+               struct command *command = command_new(line, line_len, 0);
+               if (command == NULL) {
+                       NOTIFY_ERROR("%s:%s", "command_new", "failed");
+                       free(line);
+                       return;
+               }
+
+               if (connection_command_enqueue(c, command)) {
+                       NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
+                       command_free(command);
+                       return;
+               }
+       }
+}
+
+
+/**
+ * Handle write events.
+ */
+static void
+server_write_event_(struct bufferevent *bev UNUSED, void *ctx)
+{
+       struct connection *c = ctx;
+
+       if (c->state == CONNECTION_STATE_WANT_CLOSE) {
+               c->state = CONNECTION_STATE_CLOSED;
+
+               struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT);
+               if (command == NULL) {
+                       NOTIFY_ERROR("%s:%s", "command_new", "failed");
+                       return;
+               }
+               if (connection_command_enqueue(c, command)) {
+                       NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
+                       command_free(command);
+               }
+               connection_free(c);
+       }
+}
+
+
+/**
+ * Handle some events.
+ */
+static void
+server_event_(struct bufferevent *bev UNUSED, short events, void *ctx)
+{
+       struct connection *c = ctx;
+       int finished = 0;
+
+       if (events & BEV_EVENT_READING) {
+               NOTIFY_ERROR("reading error from '%s'", c->name);
+               finished = 1;
+       }
+
+       if (events & BEV_EVENT_WRITING) {
+               NOTIFY_ERROR("writing error from '%s'", c->name);
+               finished = 1;
+       }
+
+       if (events & BEV_EVENT_EOF) {
+               NOTIFY_DEBUG("eof from '%s'", c->name);
+               finished = 1;
+       }
+
+       if (events & BEV_EVENT_ERROR) {
+               NOTIFY_ERROR("unrecoverable error from '%s': %s",
+                            c->name,
+                            evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()) );
+               finished = 1;
+       }
+
+       if (events & BEV_EVENT_TIMEOUT) {
+               NOTIFY_ERROR("timeout from '%s'", c->name);
+               finished = 1;
+       }
+
+       if (events & BEV_EVENT_CONNECTED) {
+               NOTIFY_DEBUG("connected from '%s'", c->name);
+       }
+
+       if (events & ~(BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT|BEV_EVENT_CONNECTED)) {
+               NOTIFY_ERROR("unrecognized event from '%s': %hd", c->name, events);
+       }
+
+       if (finished) {
+               struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT);
+               if (command == NULL) {
+                       NOTIFY_ERROR("%s:%s", "command_new", "failed");
+               } else {
+                       if (connection_command_enqueue(c, command)) {
+                               NOTIFY_ERROR("%s:%s", "connection_comand_enqueue", "failed");
+                               command_free(command);
+                       }
+               }
+               connection_free(c);
+       }
+}
+
+
+/**
+ * Everything needed to accept a new connection, regardless of transport type.
+ */
+static void
+server_accept_conn_common_(struct server *server, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags)
+{
+       struct connection *c;
+
+       c = calloc(1, sizeof *c);
+       if (!c) {
+               NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+               bufferevent_free(bev);
+               return;
+       }
+
+       if (connection_init(server, c, bev, sock, socklen, flags)) {
+               NOTIFY_ERROR("%s:%s", "connection_init", "failed");
+               bufferevent_free(bev);
+               free(c);
+               return;
+       }
+       c->state = CONNECTION_STATE_CONNECTED;
+
+       /* a bufferevent_add_cb interface would be better, but there isn't one yet */
+       bufferevent_setcb(c->bev, server_read_event_, server_write_event_, server_event_, c);
+       bufferevent_enable(c->bev, EV_READ|EV_WRITE);
+
+       connections_append(c);
+
+       struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_CONNECT);
+       if (command == NULL) {
+               NOTIFY_ERROR("%s:%s", "command_new", "failed");
+               return;
+       }
+       if (connection_command_enqueue(c, command)) {
+               NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
+               command_free(command);
+       }
+}
+
+
+/**
+ * Accept a new plain-text connection.
+ */
+static void
+server_accept_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx)
+{
+       struct server *server = ctx;
+       struct event_base *base = evconnlistener_get_base(listener);
+       struct bufferevent *bev;
+
+       bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
+       if (!bev) {
+               NOTIFY_ERROR("%s:%s", "bufferevent_socket_new", "failed");
+               return;
+       }
+
+       server_accept_conn_common_(server, bev, sock, socklen, 0);
+}
+
+/**
+ * Accept a new ssl connection.
+ */
+static void
+server_accept_ssl_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx)
+{
+       struct server *server = ctx;
+       struct event_base *base = evconnlistener_get_base(listener);
+       struct bufferevent *bev;
+       SSL *ssl;
+
+       ssl = SSL_new(server->ssl_ctx);
+
+       bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
+       if (!bev) {
+               NOTIFY_ERROR("%s:%s", "bufferevent_openssl_socket_new", "failed");
+               SSL_free(ssl);
+               return;
+       }
+
+       server_accept_conn_common_(server, bev, sock, socklen, CONN_TYPE_SSL);
+}
+
+
+/**
+ * Add a new listener binding to server for the provided address.
+ */
+static int
+server_add_listener_addrinfo_(struct addrinfo *ai, void *data)
+{
+       struct server *s = ((struct server_listener_convenience_ *)data)->s;
+       evconnlistener_cb cb = ((struct server_listener_convenience_ *)data)->cb;
+       struct evconnlistener **l;
+       int retval = 0;
+       int r;
+
+       if ( (r = pthread_mutex_lock(&s->listeners_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return -1;
+       }
+
+       if (s->listeners_used == s->listeners_allocated) {
+               void *new_ptr;
+               size_t new_allocated = s->listeners_allocated + 8;
+
+               new_ptr = realloc(s->listeners, new_allocated * sizeof *(s->listeners));
+               if (!new_ptr) {
+                       NOTIFY_ERROR("realloc(%zu): %s", new_allocated * sizeof *(s->listeners), strerror(errno));
+                       retval = -1;
+                       goto done;
+               }
+               s->listeners = new_ptr;
+               s->listeners_allocated = new_allocated;
+
+               /* recalloc */
+               memset(&s->listeners[s->listeners_used], 0, (s->listeners_allocated - s->listeners_used - 1) * sizeof *s->listeners);
+       }
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("new listener in slot %zu", s->listeners_used);
+#endif
+
+       s->listeners_used++;
+       l = &(s->listeners[s->listeners_used - 1]);
+       *l = evconnlistener_new_bind(s->base, cb, s, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, ai->ai_addr, ai->ai_addrlen);
+       if (!*l) {
+               NOTIFY_ERROR("%s:%s", "evconnlistener_new_bind", strerror(errno));
+               *l = NULL;
+               s->listeners_used--;
+               retval = -1;
+               goto done;
+       }
+       evconnlistener_set_error_cb(*l, server_accept_error_cb_);
+
+done:
+       if ( (r = pthread_mutex_unlock(&s->listeners_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               retval = -1;
+       }
+
+       return retval;
+}
+
+
+/**
+ * Allocates a new server structure, initializes all the structures which don't need
+ * any specific configuration.  Such further setup is handled by server_init().
+ */
+struct server *
+server_new(void)
+{
+       pthread_mutexattr_t attr;
+       struct server *s;
+       int r;
+
+       s = calloc(1, sizeof *s);
+       if (!s) {
+               NOTIFY_ERROR("calloc(%zu, %zu): %s", (size_t)1, sizeof *s, strerror(errno));
+               return NULL;
+       }
+
+       s->base = event_base_new();
+       if (!s->base) {
+               NOTIFY_ERROR("%s:%s", "event_base_new", "failed");
+               goto err_free_server;
+       }
+
+       s->evdns_base = evdns_base_new(s->base, 1);
+       if (!s->evdns_base) {
+               NOTIFY_ERROR("%s:%s", "evdns_base", "failed");
+               goto err_free_event_base;
+       }
+
+       if (workqueue_init(&s->workqueue, server_worker_context_destroy_, 0)) {
+               NOTIFY_ERROR("%s:%s", "workqueue_init", "failed");
+               goto err_free_evdns_base;
+       }
+
+       if ( (r = pthread_mutexattr_init(&attr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r));
+               goto err_fini_workqueue;
+       }
+
+       if ( (r = pthread_mutex_init(&s->listeners_mutex, &attr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               goto err_destroy_attr;
+       }
+
+       if (connections_init(&s->connections) < 0) {
+               NOTIFY_ERROR("%s:%s", "connections_init", "&s->connections failed");
+               goto err_destroy_mutex;
+       }
+
+       s->L = luaL_newstate();
+       if (!s->L) {
+               NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
+               goto err_free_connections;
+       }
+
+       if ( (r = pthread_mutexattr_destroy(&attr)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
+               goto err_close_lua;
+       }
+
+       return s;
+
+err_close_lua:
+       lua_close(s->L);
+err_free_connections:
+       connections_fini(&s->connections);
+err_destroy_mutex:
+       pthread_mutex_destroy(&s->listeners_mutex);
+err_destroy_attr:
+       pthread_mutexattr_destroy(&attr);
+err_fini_workqueue:
+       workqueue_fini(&s->workqueue, true);
+err_free_evdns_base:
+       evdns_base_free(s->evdns_base, 0);
+err_free_event_base:
+       event_base_free(s->base);
+err_free_server:
+       free(s);
+       return NULL;
+}
+
+/**
+ * Looking at top of lua stack as string, parse as address and port,
+ * then add as listener to server.
+ */
+static void
+server_init_listener_directive_a_string_(lua_State *L, struct server_listener_convenience_ *c_data)
+{
+       char *l_str;
+
+       l_str = (char *)lua_tostring(L, -1);
+       if (string_to_addrinfo_call(l_str, 0, server_add_listener_addrinfo_, c_data)) {
+               NOTIFY_ERROR("failed to add '%s' value '%s'", c_data->type, l_str ? l_str : "[unknown]");
+       }
+       NOTIFY_INFO("listening on %s (%s)", l_str, c_data->type);
+}
+
+/**
+ * Using configuration key provided in c_data->type, create all listeners.
+ */
+static void
+server_init_listener_directive_(lua_State *L, struct server_listener_convenience_ *c_data)
+{
+       lua_getglobal(L, "server_config");
+       lua_getfield(L, -1, c_data->type);
+       if (lua_istable(L, -1)) {
+               lua_pushnil(L);
+               while (lua_next(L, -2) != 0) {
+                       if (lua_isstring(L, -1)) {
+                               server_init_listener_directive_a_string_(L, c_data);
+                       } else {
+                               char *l_str = (char *)lua_tostring(L, -2); /* table key name */
+                               NOTIFY_ERROR("could not fathom value for '%s' at index '%s'", c_data->type, l_str ? l_str : "[unknown]");
+                       }
+                       lua_pop(L, 1);
+               }
+       } else if (lua_isstring(L, -1)) {
+               server_init_listener_directive_a_string_(L, c_data);
+       } else {
+               NOTIFY_ERROR("could not fathom '%s' value", c_data->type);
+       }
+       lua_pop(L, 2);
+}
+
+/**
+ * Return an allocated copy of value of global lua variable.
+ */
+static int
+get_lua_config_string_(lua_State *L, const char *key, char **value, size_t *value_len)
+{
+       int retval = 0;
+       const char *l_str;
+       size_t l_str_len;
+       assert(key);
+       assert(value);
+       *value = NULL;
+
+       lua_getglobal(L, "server_config");
+       lua_getfield(L, -1, key);
+
+       if (!lua_isstring(L, -1)) {
+               NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "string", lua_typename(L, lua_type(L, -1)));
+               retval = -1;
+               goto done;
+       }
+       l_str = lua_tolstring(L, -1, &l_str_len);
+       if (!l_str) {
+               NOTIFY_ERROR("'%s' value invalid", key);
+               retval = -1;
+               goto done;
+       }
+       if (value_len) {
+               *value_len = l_str_len;
+       }
+       l_str_len += 1; // \0
+       *value = malloc(l_str_len);
+       if (!*value) {
+               NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
+               retval = -1;
+               goto done;
+       }
+       memcpy(*value, l_str, l_str_len);
+done:
+       lua_pop(L, 1);
+       return retval;
+}
+
+static int
+get_lua_config_integer_(lua_State *L, const char *key, int *value)
+{
+       int retval = 0;
+       assert(key);
+       assert(value);
+
+       lua_getglobal(L, "server_config");
+       lua_getfield(L, -1, key);
+
+       if (!lua_isinteger(L, -1)) {
+               NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "integer", lua_typename(L, lua_type(L, -1)));
+               retval = -1;
+               goto done;
+       }
+       *value = (int)lua_tointeger(L, -1);
+done:
+       lua_pop(L, 1);
+       return retval;
+}
+
+/**
+ * Load or reload the ssl context, with cert and settings.
+ */
+int
+server_refresh_ssl_context(lua_State *L, struct server *s)
+{
+       SSL_CTX *new_ssl_ctx, *old_ssl_ctx;
+       char *l_str;
+
+       new_ssl_ctx = SSL_CTX_new(SSLv23_server_method());
+       if (!new_ssl_ctx) {
+               NOTIFY_ERROR("%s:%s", "SSL_CTX_new", ERR_error_string(ERR_get_error(), NULL));
+               return -1;
+       }
+
+       // load cert
+       if (get_lua_config_string_(L, "ssl_keyfile", &l_str, NULL)) {
+               return -1;
+       }
+       if (SSL_CTX_use_PrivateKey_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) {
+               NOTIFY_ERROR("%s file '%s': %s", "key", l_str, ERR_error_string(ERR_get_error(), NULL));
+               free(l_str);
+               return -1;
+       }
+       free(l_str);
+
+       if (get_lua_config_string_(L, "ssl_certfile", &l_str, NULL)) {
+               return -1;
+       }
+       if (SSL_CTX_use_certificate_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) {
+               NOTIFY_ERROR("%s file '%s': %s", "certificate", l_str, ERR_error_string(ERR_get_error(), NULL));
+               free(l_str);
+               return -1;
+       }
+       free(l_str);
+
+       old_ssl_ctx = s->ssl_ctx;
+       s->ssl_ctx = new_ssl_ctx;
+
+       if (old_ssl_ctx) {
+               SSL_CTX_free(old_ssl_ctx);
+       }
+
+       return 0;
+}
+
+
+/**
+ * Example heartbeat worker job.
+ * N.B. server as data instead of connection.
+ */
+static void
+server_heartbeat_worker_(void *data, void *context UNUSED, size_t id UNUSED)
+{
+       struct server *s = data;
+       struct connection **clist, **clist_iter;
+       char *message;
+       int r;
+
+       clist = connections_all_as_array(&(s->connections), NULL);
+       if (!clist) {
+               return;
+       }
+
+       message = strdup("[heartbeat]\n");
+       if (!message) {
+               NOTIFY_ERROR("%s:%s", "strdup", strerror(errno));
+               return;
+       }
+
+       r = connection_multi_send(clist, message, strlen(message));
+       if (r < 0) {
+               NOTIFY_ERROR("could not broadcast heartbeat");
+       } else {
+               NOTIFY_DEBUG("broadcast heartbeat to %d connection%s", r, (r > 1) ? "s" : "");
+       }
+
+       clist_iter = clist;
+       while (*clist_iter) {
+               connection_free(*clist_iter);
+               clist_iter++;
+       }
+       free(clist);
+}
+
+
+/**
+ * Periodic heartbeat handler.
+ */
+static void
+server_heartbeat_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg)
+{
+       struct server *server = arg;
+
+       if (workqueue_add(&server->workqueue, server_heartbeat_worker_, server)) {
+               NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
+       }
+}
+
+
+/**
+ * Configure the server based on the provided lua file.
+ */
+int
+server_init(struct server *s, char *conf_file)
+{
+       struct server_listener_convenience_ c_data;
+       int l_int;
+
+       /* Set up initial server state, in order to process configuration file. */
+       server_lua_openlibs_(s->L);
+
+       /* Create config table, which will be used as global env for loading config file. */
+       lua_newtable(s->L);
+       lua_setglobal(s->L, "server_config");
+
+
+       if (luaL_loadfile(s->L, conf_file)) {
+               NOTIFY_ERROR("could not %s configuration file '%s': %s",
+                            "load",
+                            conf_file,
+                            lua_tostring(s->L, -1));
+               return -1;
+       }
+       /* server_config table becomes global env for executing config file */
+       lua_getglobal(s->L, "server_config");
+       lua_setupvalue(s->L, -2, 1);
+
+       if (lua_pcall(s->L, 0, 0, 0) != LUA_OK) {
+               NOTIFY_ERROR("could not %s configuration file '%s': %s",
+                            "parse",
+                            conf_file,
+                            lua_tostring(s->L, -1));
+               return -1;
+       }
+
+       // FIXME: let db fetch its own options
+       struct database_options db_options =  { 0 };
+       if (get_lua_config_string_(s->L, "db_file", &db_options.db_file, NULL)) {
+               return -1;
+       }
+
+       if (db_init(&s->db, &db_options)) {
+               NOTIFY_ERROR("%s:%s", "db_init", "failed");
+               free(db_options.db_file);
+               return -1;
+       }
+       free(db_options.db_file);
+
+       if (server_refresh_ssl_context(s->L, s)) {
+               return -1;
+       }
+
+       if (get_lua_config_integer_(s->L, "processing_threads", &l_int)) {
+               return -1;
+       }
+       /* FIXME: define reasonable limits */
+       if (l_int < 1 || l_int > 64) {
+               NOTIFY_ERROR("'%s' is not a reasonable value", "processing_threads");
+               return -1;
+       }
+       // FIXME: while l_int != workqueue_workers()
+       while (l_int) {
+               if (server_init_add_thread_(s)) {
+                       NOTIFY_ERROR("could not start processing thread");
+                       return -1;
+               }
+               l_int--;
+       }
+
+       c_data.s = s;
+       c_data.cb = server_accept_conn_event_;
+       c_data.type = "listen";
+       server_init_listener_directive_(s->L, &c_data);
+
+       c_data.cb = server_accept_ssl_conn_event_;
+       c_data.type = "listen_ssl";
+       server_init_listener_directive_(s->L, &c_data);
+
+       do {
+               struct timeval _seconds = {30, 0};
+               struct event *ev;
+               /* FIXME: instead of persist, reset from event */
+               ev = event_new(s->base, -1, EV_TIMEOUT | EV_PERSIST, server_heartbeat_event_, s);
+               event_add(ev, &_seconds);
+       } while (0);
+
+       NOTIFY_INFO("server starting...\n"
+                    "\t%s %s (%s)\n"
+                    "\tlibevent %s [%s]\n"
+                    "\t%s %s\n"
+                    "\t%s\n"
+                    "\t%s",
+                    "lemu", VERSION_STR, VERSION_DATE,
+                    event_get_version(), event_base_get_method(s->base),
+                    db_engine(), db_version(),
+                    OpenSSL_version(OPENSSL_VERSION),
+                    LUA_RELEASE);
+
+       event_base_dispatch(s->base);
+
+       return 0;
+}
+
+void
+server_free(struct server *s)
+{
+       int r;
+
+       while (s->listeners_used > 0) {
+               s->listeners_used--;
+               if (s->listeners[s->listeners_used]) {
+                       evconnlistener_free(s->listeners[s->listeners_used]);
+                       s->listeners[s->listeners_used] = NULL;
+               }
+               NOTIFY_DEBUG("freed listener %zu", s->listeners_used);
+       }
+       free(s->listeners);
+       s->listeners = NULL;
+
+       workqueue_fini(&s->workqueue, false);
+
+       if (s->ssl_ctx) {
+               SSL_CTX_free(s->ssl_ctx);
+               s->ssl_ctx = NULL;
+       }
+
+       if (s->evdns_base) {
+               evdns_base_free(s->evdns_base, 0);
+               s->evdns_base = NULL;
+       }
+
+       if (s->base) {
+               event_base_free(s->base);
+               s->base = NULL;
+       }
+
+       if ( (r = pthread_mutex_destroy(&s->connections.mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       if ( (r = pthread_mutex_destroy(&s->listeners_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       lua_close(s->L);
+
+       db_fini(&s->db);
+
+       free(s);
+}