#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" #include "command.h" #include "db.h" #include "notify.h" #include "lua_interface.h" #include "server.h" #include "workqueue.h" #include "version.h" /* Wrapper struct for passing multiple values through a voidstar. * Used when calling server_add_listener_addrinfo_(). */ struct server_listener_convenience_ { struct server *s; evconnlistener_cb cb; char *type; }; /** * Load all lua libs except for io and os modules, * TODO: this should be more fine-grained and properly sandboxed. Also, add server lib (needs to be created). */ static void server_lua_openlibs_(lua_State *L) { const luaL_Reg lualibs[] = { { "", luaopen_base }, { LUA_LOADLIBNAME, luaopen_package }, { LUA_TABLIBNAME, luaopen_table }, { LUA_STRLIBNAME, luaopen_string }, { LUA_MATHLIBNAME, luaopen_math }, // { LUA_DBLIBNAME, luaopen_debug }, { NULL, NULL } }; const luaL_Reg *lib; for (lib = lualibs ; lib->func; lib++) { lua_pushcfunction(L, lib->func); lua_pushstring(L, lib->name); lua_call(L, 1, 0); } lemu_connection_luainit(L); } /** * Allocate and initialize a new per-thread context. */ static struct server_worker_context * server_worker_context_create_(struct server *server) { struct server_worker_context *ctx; ctx = calloc(1, sizeof *ctx); if (ctx == NULL) { NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); return NULL; } ctx->server = server; ctx->L = luaL_newstate(); if (!ctx->L) { NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed"); free(ctx); return NULL; } server_lua_openlibs_(ctx->L); return ctx; } /** * Free a thread context. * workqueue:worker_ctx_free_fn */ static void server_worker_context_destroy_(void *data) { struct server_worker_context *ctx = data; lua_close(ctx->L); free(ctx); } /** * Adds a worker thread to the pool. */ static int server_init_add_thread_(struct server *s) { struct server_worker_context *ctx; ssize_t worker_id; ctx = server_worker_context_create_(s); if (ctx == NULL) { NOTIFY_ERROR("%s:%s", "server_worker_context_create_", "failed"); return -1; } worker_id = workqueue_worker_add(&s->workqueue, ctx, 0); if (worker_id < 0) { NOTIFY_ERROR("%s:%s", "workqueue_worker_add", "failed"); server_worker_context_destroy_(ctx); return -1; } NOTIFY_DEBUG("added worker thread %zd", worker_id); return 0; } /** * Handle accept errors. */ static void server_accept_error_cb_(struct evconnlistener *listener UNUSED, void *ctx UNUSED) { int err = EVUTIL_SOCKET_ERROR(); NOTIFY_ERROR("%s (%d)", evutil_socket_error_to_string(err), err); } /** * Move incoming data to connection queue. * TBD: is it worth work-queueing this data shuffling, or just let main thread handle it? */ static void server_read_event_(struct bufferevent *bev, void *ctx) { struct connection *c = ctx; struct evbuffer *input = bufferevent_get_input(bev); size_t line_len; unsigned char *line; while ( (line = (unsigned char *)evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF)) ) { c->total_read += line_len; /* this drops the newlines from the total_read tally */ struct command *command = command_new(line, line_len, 0); if (command == NULL) { NOTIFY_ERROR("%s:%s", "command_new", "failed"); free(line); return; } if (connection_command_enqueue(c, command)) { NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); command_free(command); return; } } } /** * Handle write events. */ static void server_write_event_(struct bufferevent *bev UNUSED, void *ctx) { struct connection *c = ctx; if (c->state == CONNECTION_STATE_WANT_CLOSE) { c->state = CONNECTION_STATE_CLOSED; struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT); if (command == NULL) { NOTIFY_ERROR("%s:%s", "command_new", "failed"); return; } if (connection_command_enqueue(c, command)) { NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); command_free(command); } connection_free(c); } } /** * Handle some events. */ static void server_event_(struct bufferevent *bev UNUSED, short events, void *ctx) { struct connection *c = ctx; int finished = 0; if (events & BEV_EVENT_READING) { NOTIFY_ERROR("reading error from '%s'", c->name); finished = 1; } if (events & BEV_EVENT_WRITING) { NOTIFY_ERROR("writing error from '%s'", c->name); finished = 1; } if (events & BEV_EVENT_EOF) { NOTIFY_DEBUG("eof from '%s'", c->name); finished = 1; } if (events & BEV_EVENT_ERROR) { NOTIFY_ERROR("unrecoverable error from '%s': %s", c->name, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()) ); finished = 1; } if (events & BEV_EVENT_TIMEOUT) { NOTIFY_ERROR("timeout from '%s'", c->name); finished = 1; } if (events & BEV_EVENT_CONNECTED) { NOTIFY_DEBUG("connected from '%s'", c->name); } if (events & ~(BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT|BEV_EVENT_CONNECTED)) { NOTIFY_ERROR("unrecognized event from '%s': %hd", c->name, events); } if (finished) { struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT); if (command == NULL) { NOTIFY_ERROR("%s:%s", "command_new", "failed"); } else { if (connection_command_enqueue(c, command)) { NOTIFY_ERROR("%s:%s", "connection_comand_enqueue", "failed"); command_free(command); } } connection_free(c); } } /** * Everything needed to accept a new connection, regardless of transport type. */ static void server_accept_conn_common_(struct server *server, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags) { struct connection *c; c = calloc(1, sizeof *c); if (!c) { NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); bufferevent_free(bev); return; } if (connection_init(server, c, bev, sock, socklen, flags)) { NOTIFY_ERROR("%s:%s", "connection_init", "failed"); bufferevent_free(bev); free(c); return; } c->state = CONNECTION_STATE_CONNECTED; /* a bufferevent_add_cb interface would be better, but there isn't one yet */ bufferevent_setcb(c->bev, server_read_event_, server_write_event_, server_event_, c); bufferevent_enable(c->bev, EV_READ|EV_WRITE); connections_append(c); struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_CONNECT); if (command == NULL) { NOTIFY_ERROR("%s:%s", "command_new", "failed"); return; } if (connection_command_enqueue(c, command)) { NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed"); command_free(command); } } /** * Accept a new plain-text connection. */ static void server_accept_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx) { struct server *server = ctx; struct event_base *base = evconnlistener_get_base(listener); struct bufferevent *bev; bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS); if (!bev) { NOTIFY_ERROR("%s:%s", "bufferevent_socket_new", "failed"); return; } server_accept_conn_common_(server, bev, sock, socklen, 0); } /** * Accept a new ssl connection. */ static void server_accept_ssl_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx) { struct server *server = ctx; struct event_base *base = evconnlistener_get_base(listener); struct bufferevent *bev; SSL *ssl; ssl = SSL_new(server->ssl_ctx); bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS); if (!bev) { NOTIFY_ERROR("%s:%s", "bufferevent_openssl_socket_new", "failed"); SSL_free(ssl); return; } server_accept_conn_common_(server, bev, sock, socklen, CONN_TYPE_SSL); } /** * Add a new listener binding to server for the provided address. */ static int server_add_listener_addrinfo_(struct addrinfo *ai, void *data) { struct server *s = ((struct server_listener_convenience_ *)data)->s; evconnlistener_cb cb = ((struct server_listener_convenience_ *)data)->cb; struct evconnlistener **l; int retval = 0; int r; if ( (r = pthread_mutex_lock(&s->listeners_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return -1; } if (s->listeners_used == s->listeners_allocated) { void *new_ptr; size_t new_allocated = s->listeners_allocated + 8; new_ptr = realloc(s->listeners, new_allocated * sizeof *(s->listeners)); if (!new_ptr) { NOTIFY_ERROR("realloc(%zu): %s", new_allocated * sizeof *(s->listeners), strerror(errno)); retval = -1; goto done; } s->listeners = new_ptr; s->listeners_allocated = new_allocated; /* recalloc */ memset(&s->listeners[s->listeners_used], 0, (s->listeners_allocated - s->listeners_used - 1) * sizeof *s->listeners); } #if DEBUG_SILLY NOTIFY_DEBUG("new listener in slot %zu", s->listeners_used); #endif s->listeners_used++; l = &(s->listeners[s->listeners_used - 1]); *l = evconnlistener_new_bind(s->base, cb, s, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, ai->ai_addr, ai->ai_addrlen); if (!*l) { NOTIFY_ERROR("%s:%s", "evconnlistener_new_bind", strerror(errno)); *l = NULL; s->listeners_used--; retval = -1; goto done; } evconnlistener_set_error_cb(*l, server_accept_error_cb_); done: if ( (r = pthread_mutex_unlock(&s->listeners_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); retval = -1; } return retval; } /** * Allocates a new server structure, initializes all the structures which don't need * any specific configuration. Such further setup is handled by server_init(). */ struct server * server_new(void) { pthread_mutexattr_t attr; struct server *s; int r; s = calloc(1, sizeof *s); if (!s) { NOTIFY_ERROR("calloc(%zu, %zu): %s", (size_t)1, sizeof *s, strerror(errno)); return NULL; } s->base = event_base_new(); if (!s->base) { NOTIFY_ERROR("%s:%s", "event_base_new", "failed"); goto err_free_server; } s->evdns_base = evdns_base_new(s->base, 1); if (!s->evdns_base) { NOTIFY_ERROR("%s:%s", "evdns_base", "failed"); goto err_free_event_base; } if (workqueue_init(&s->workqueue, server_worker_context_destroy_, 0)) { NOTIFY_ERROR("%s:%s", "workqueue_init", "failed"); goto err_free_evdns_base; } if ( (r = pthread_mutexattr_init(&attr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r)); goto err_fini_workqueue; } if ( (r = pthread_mutex_init(&s->listeners_mutex, &attr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); goto err_destroy_attr; } if (connections_init(&s->connections) < 0) { NOTIFY_ERROR("%s:%s", "connections_init", "&s->connections failed"); goto err_destroy_mutex; } s->L = luaL_newstate(); if (!s->L) { NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed"); goto err_free_connections; } if ( (r = pthread_mutexattr_destroy(&attr)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r)); goto err_close_lua; } return s; err_close_lua: lua_close(s->L); err_free_connections: connections_fini(&s->connections); err_destroy_mutex: pthread_mutex_destroy(&s->listeners_mutex); err_destroy_attr: pthread_mutexattr_destroy(&attr); err_fini_workqueue: workqueue_fini(&s->workqueue, true); err_free_evdns_base: evdns_base_free(s->evdns_base, 0); err_free_event_base: event_base_free(s->base); err_free_server: free(s); return NULL; } /** * Looking at top of lua stack as string, parse as address and port, * then add as listener to server. */ static void server_init_listener_directive_a_string_(lua_State *L, struct server_listener_convenience_ *c_data) { char *l_str; l_str = (char *)lua_tostring(L, -1); if (string_to_addrinfo_call(l_str, 0, server_add_listener_addrinfo_, c_data)) { NOTIFY_ERROR("failed to add '%s' value '%s'", c_data->type, l_str ? l_str : "[unknown]"); } NOTIFY_INFO("listening on %s (%s)", l_str, c_data->type); } /** * Using configuration key provided in c_data->type, create all listeners. */ static void server_init_listener_directive_(lua_State *L, struct server_listener_convenience_ *c_data) { lua_getglobal(L, "server_config"); lua_getfield(L, -1, c_data->type); if (lua_istable(L, -1)) { lua_pushnil(L); while (lua_next(L, -2) != 0) { if (lua_isstring(L, -1)) { server_init_listener_directive_a_string_(L, c_data); } else { char *l_str = (char *)lua_tostring(L, -2); /* table key name */ NOTIFY_ERROR("could not fathom value for '%s' at index '%s'", c_data->type, l_str ? l_str : "[unknown]"); } lua_pop(L, 1); } } else if (lua_isstring(L, -1)) { server_init_listener_directive_a_string_(L, c_data); } else { NOTIFY_ERROR("could not fathom '%s' value", c_data->type); } lua_pop(L, 2); } /** * Return an allocated copy of value of global lua variable. */ static int get_lua_config_string_(lua_State *L, const char *key, char **value, size_t *value_len) { int retval = 0; const char *l_str; size_t l_str_len; assert(key); assert(value); *value = NULL; lua_getglobal(L, "server_config"); lua_getfield(L, -1, key); if (!lua_isstring(L, -1)) { NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "string", lua_typename(L, lua_type(L, -1))); retval = -1; goto done; } l_str = lua_tolstring(L, -1, &l_str_len); if (!l_str) { NOTIFY_ERROR("'%s' value invalid", key); retval = -1; goto done; } if (value_len) { *value_len = l_str_len; } l_str_len += 1; // \0 *value = malloc(l_str_len); if (!*value) { NOTIFY_ERROR("%s:%s", "malloc", strerror(errno)); retval = -1; goto done; } memcpy(*value, l_str, l_str_len); done: lua_pop(L, 1); return retval; } static int get_lua_config_integer_(lua_State *L, const char *key, int *value) { int retval = 0; assert(key); assert(value); lua_getglobal(L, "server_config"); lua_getfield(L, -1, key); if (!lua_isinteger(L, -1)) { NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "integer", lua_typename(L, lua_type(L, -1))); retval = -1; goto done; } *value = (int)lua_tointeger(L, -1); done: lua_pop(L, 1); return retval; } /** * Load or reload the ssl context, with cert and settings. */ int server_refresh_ssl_context(lua_State *L, struct server *s) { SSL_CTX *new_ssl_ctx, *old_ssl_ctx; char *l_str; new_ssl_ctx = SSL_CTX_new(SSLv23_server_method()); if (!new_ssl_ctx) { NOTIFY_ERROR("%s:%s", "SSL_CTX_new", ERR_error_string(ERR_get_error(), NULL)); return -1; } // load cert if (get_lua_config_string_(L, "ssl_keyfile", &l_str, NULL)) { return -1; } if (SSL_CTX_use_PrivateKey_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) { NOTIFY_ERROR("%s file '%s': %s", "key", l_str, ERR_error_string(ERR_get_error(), NULL)); free(l_str); return -1; } free(l_str); if (get_lua_config_string_(L, "ssl_certfile", &l_str, NULL)) { return -1; } if (SSL_CTX_use_certificate_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) { NOTIFY_ERROR("%s file '%s': %s", "certificate", l_str, ERR_error_string(ERR_get_error(), NULL)); free(l_str); return -1; } free(l_str); old_ssl_ctx = s->ssl_ctx; s->ssl_ctx = new_ssl_ctx; if (old_ssl_ctx) { SSL_CTX_free(old_ssl_ctx); } return 0; } /** * Example heartbeat worker job. * N.B. server as data instead of connection. */ static void server_heartbeat_worker_(void *data, void *context UNUSED, size_t id UNUSED) { struct server *s = data; struct connection **clist, **clist_iter; char *message; int r; clist = connections_all_as_array(&(s->connections), NULL); if (!clist) { return; } message = strdup("[heartbeat]\n"); if (!message) { NOTIFY_ERROR("%s:%s", "strdup", strerror(errno)); return; } r = connection_multi_send(clist, message, strlen(message)); if (r < 0) { NOTIFY_ERROR("could not broadcast heartbeat"); } else { NOTIFY_DEBUG("broadcast heartbeat to %d connection%s", r, (r > 1) ? "s" : ""); } clist_iter = clist; while (*clist_iter) { connection_free(*clist_iter); clist_iter++; } free(clist); } /** * Periodic heartbeat handler. */ static void server_heartbeat_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg) { struct server *server = arg; if (workqueue_add(&server->workqueue, server_heartbeat_worker_, server)) { NOTIFY_ERROR("%s:%s", "workqueue_add", "failed"); } } /** * Configure the server based on the provided lua file. */ int server_init(struct server *s, char *conf_file) { struct server_listener_convenience_ c_data; int l_int; /* Set up initial server state, in order to process configuration file. */ server_lua_openlibs_(s->L); /* Create config table, which will be used as global env for loading config file. */ lua_newtable(s->L); lua_setglobal(s->L, "server_config"); if (luaL_loadfile(s->L, conf_file)) { NOTIFY_ERROR("could not %s configuration file '%s': %s", "load", conf_file, lua_tostring(s->L, -1)); return -1; } /* server_config table becomes global env for executing config file */ lua_getglobal(s->L, "server_config"); lua_setupvalue(s->L, -2, 1); if (lua_pcall(s->L, 0, 0, 0) != LUA_OK) { NOTIFY_ERROR("could not %s configuration file '%s': %s", "parse", conf_file, lua_tostring(s->L, -1)); return -1; } // FIXME: let db fetch its own options struct database_options db_options = { 0 }; if (get_lua_config_string_(s->L, "db_file", &db_options.db_file, NULL)) { return -1; } if (db_init(&s->db, &db_options)) { NOTIFY_ERROR("%s:%s", "db_init", "failed"); free(db_options.db_file); return -1; } free(db_options.db_file); if (server_refresh_ssl_context(s->L, s)) { return -1; } if (get_lua_config_integer_(s->L, "processing_threads", &l_int)) { return -1; } /* FIXME: define reasonable limits */ if (l_int < 1 || l_int > 64) { NOTIFY_ERROR("'%s' is not a reasonable value", "processing_threads"); return -1; } // FIXME: while l_int != workqueue_workers() while (l_int) { if (server_init_add_thread_(s)) { NOTIFY_ERROR("could not start processing thread"); return -1; } l_int--; } c_data.s = s; c_data.cb = server_accept_conn_event_; c_data.type = "listen"; server_init_listener_directive_(s->L, &c_data); c_data.cb = server_accept_ssl_conn_event_; c_data.type = "listen_ssl"; server_init_listener_directive_(s->L, &c_data); do { struct timeval _seconds = {30, 0}; struct event *ev; /* FIXME: instead of persist, reset from event */ ev = event_new(s->base, -1, EV_TIMEOUT | EV_PERSIST, server_heartbeat_event_, s); event_add(ev, &_seconds); } while (0); NOTIFY_INFO("server starting...\n" "\t%s %s (%s)\n" "\tlibevent %s [%s]\n" "\t%s %s\n" "\t%s\n" "\t%s", "lemu", VERSION_STR, VERSION_DATE, event_get_version(), event_base_get_method(s->base), db_engine(), db_version(), OpenSSL_version(OPENSSL_VERSION), LUA_RELEASE); event_base_dispatch(s->base); return 0; } void server_free(struct server *s) { int r; while (s->listeners_used > 0) { s->listeners_used--; if (s->listeners[s->listeners_used]) { evconnlistener_free(s->listeners[s->listeners_used]); s->listeners[s->listeners_used] = NULL; } NOTIFY_DEBUG("freed listener %zu", s->listeners_used); } free(s->listeners); s->listeners = NULL; workqueue_fini(&s->workqueue, false); if (s->ssl_ctx) { SSL_CTX_free(s->ssl_ctx); s->ssl_ctx = NULL; } if (s->evdns_base) { evdns_base_free(s->evdns_base, 0); s->evdns_base = NULL; } if (s->base) { event_base_free(s->base); s->base = NULL; } if ( (r = pthread_mutex_destroy(&s->connections.mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } if ( (r = pthread_mutex_destroy(&s->listeners_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } lua_close(s->L); db_fini(&s->db); free(s); }