rough framework
[lemu] / server.c
1 #include <pthread.h>
2 #include <string.h>
3 #include <stdarg.h>
4 #include <errno.h>
5 #include <assert.h>
6
7 #include <event2/bufferevent_ssl.h>
8 #include <event2/bufferevent.h>
9 #include <event2/buffer.h>
10 #include <event2/listener.h>
11 #include <event2/dns.h>
12
13 #include <openssl/ssl.h>
14 #include <openssl/err.h>
15
16 #include <lua.h>
17 #include <lualib.h>
18 #include <lauxlib.h>
19
20 #include "common.h"
21 #include "command.h"
22 #include "db.h"
23 #include "notify.h"
24 #include "lua_interface.h"
25 #include "server.h"
26 #include "workqueue.h"
27 #include "version.h"
28
29
30 /* Wrapper struct for passing multiple values through a voidstar.
31 * Used when calling server_add_listener_addrinfo_().
32 */
33 struct server_listener_convenience_ {
34 struct server *s;
35 evconnlistener_cb cb;
36 char *type;
37 };
38
39
40 /**
41 * Load all lua libs except for io and os modules,
42 * TODO: this should be more fine-grained and properly sandboxed. Also, add server lib (needs to be created).
43 */
44 static void
45 server_lua_openlibs_(lua_State *L)
46 {
47 const luaL_Reg lualibs[] = {
48 { "", luaopen_base },
49 { LUA_LOADLIBNAME, luaopen_package },
50 { LUA_TABLIBNAME, luaopen_table },
51 { LUA_STRLIBNAME, luaopen_string },
52 { LUA_MATHLIBNAME, luaopen_math },
53 // { LUA_DBLIBNAME, luaopen_debug },
54 { NULL, NULL }
55 };
56 const luaL_Reg *lib;
57 for (lib = lualibs ; lib->func; lib++) {
58 lua_pushcfunction(L, lib->func);
59 lua_pushstring(L, lib->name);
60 lua_call(L, 1, 0);
61 }
62
63 lemu_connection_luainit(L);
64 }
65
66
67 /**
68 * Allocate and initialize a new per-thread context.
69 */
70 static struct server_worker_context *
71 server_worker_context_create_(struct server *server)
72 {
73 struct server_worker_context *ctx;
74
75 ctx = calloc(1, sizeof *ctx);
76 if (ctx == NULL) {
77 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
78 return NULL;
79 }
80 ctx->server = server;
81 ctx->L = luaL_newstate();
82 if (!ctx->L) {
83 NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
84 free(ctx);
85 return NULL;
86 }
87 server_lua_openlibs_(ctx->L);
88
89 return ctx;
90 }
91
92 /**
93 * Free a thread context.
94 * workqueue:worker_ctx_free_fn
95 */
96 static void
97 server_worker_context_destroy_(void *data)
98 {
99 struct server_worker_context *ctx = data;
100 lua_close(ctx->L);
101 free(ctx);
102 }
103
104 /**
105 * Adds a worker thread to the pool.
106 */
107 static int
108 server_init_add_thread_(struct server *s)
109 {
110 struct server_worker_context *ctx;
111 ssize_t worker_id;
112
113 ctx = server_worker_context_create_(s);
114 if (ctx == NULL) {
115 NOTIFY_ERROR("%s:%s", "server_worker_context_create_", "failed");
116 return -1;
117 }
118
119 worker_id = workqueue_worker_add(&s->workqueue, ctx, 0);
120 if (worker_id < 0) {
121 NOTIFY_ERROR("%s:%s", "workqueue_worker_add", "failed");
122 server_worker_context_destroy_(ctx);
123 return -1;
124 }
125
126 NOTIFY_DEBUG("added worker thread %zd", worker_id);
127
128 return 0;
129 }
130
131 /**
132 * Handle accept errors.
133 */
134 static void
135 server_accept_error_cb_(struct evconnlistener *listener UNUSED, void *ctx UNUSED)
136 {
137 int err = EVUTIL_SOCKET_ERROR();
138 NOTIFY_ERROR("%s (%d)", evutil_socket_error_to_string(err), err);
139 }
140
141
142 /**
143 * Move incoming data to connection queue.
144 * TBD: is it worth work-queueing this data shuffling, or just let main thread handle it?
145 */
146 static void
147 server_read_event_(struct bufferevent *bev, void *ctx)
148 {
149 struct connection *c = ctx;
150 struct evbuffer *input = bufferevent_get_input(bev);
151 size_t line_len;
152 unsigned char *line;
153
154 while ( (line = (unsigned char *)evbuffer_readln(input, &line_len, EVBUFFER_EOL_CRLF)) ) {
155 c->total_read += line_len; /* this drops the newlines from the total_read tally */
156
157 struct command *command = command_new(line, line_len, 0);
158 if (command == NULL) {
159 NOTIFY_ERROR("%s:%s", "command_new", "failed");
160 free(line);
161 return;
162 }
163
164 if (connection_command_enqueue(c, command)) {
165 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
166 command_free(command);
167 return;
168 }
169 }
170 }
171
172
173 /**
174 * Handle write events.
175 */
176 static void
177 server_write_event_(struct bufferevent *bev UNUSED, void *ctx)
178 {
179 struct connection *c = ctx;
180
181 if (c->state == CONNECTION_STATE_WANT_CLOSE) {
182 c->state = CONNECTION_STATE_CLOSED;
183
184 struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT);
185 if (command == NULL) {
186 NOTIFY_ERROR("%s:%s", "command_new", "failed");
187 return;
188 }
189 if (connection_command_enqueue(c, command)) {
190 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
191 command_free(command);
192 }
193 connection_free(c);
194 }
195 }
196
197
198 /**
199 * Handle some events.
200 */
201 static void
202 server_event_(struct bufferevent *bev UNUSED, short events, void *ctx)
203 {
204 struct connection *c = ctx;
205 int finished = 0;
206
207 if (events & BEV_EVENT_READING) {
208 NOTIFY_ERROR("reading error from '%s'", c->name);
209 finished = 1;
210 }
211
212 if (events & BEV_EVENT_WRITING) {
213 NOTIFY_ERROR("writing error from '%s'", c->name);
214 finished = 1;
215 }
216
217 if (events & BEV_EVENT_EOF) {
218 NOTIFY_DEBUG("eof from '%s'", c->name);
219 finished = 1;
220 }
221
222 if (events & BEV_EVENT_ERROR) {
223 NOTIFY_ERROR("unrecoverable error from '%s': %s",
224 c->name,
225 evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()) );
226 finished = 1;
227 }
228
229 if (events & BEV_EVENT_TIMEOUT) {
230 NOTIFY_ERROR("timeout from '%s'", c->name);
231 finished = 1;
232 }
233
234 if (events & BEV_EVENT_CONNECTED) {
235 NOTIFY_DEBUG("connected from '%s'", c->name);
236 }
237
238 if (events & ~(BEV_EVENT_READING|BEV_EVENT_WRITING|BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT|BEV_EVENT_CONNECTED)) {
239 NOTIFY_ERROR("unrecognized event from '%s': %hd", c->name, events);
240 }
241
242 if (finished) {
243 struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_DISCONNECT);
244 if (command == NULL) {
245 NOTIFY_ERROR("%s:%s", "command_new", "failed");
246 } else {
247 if (connection_command_enqueue(c, command)) {
248 NOTIFY_ERROR("%s:%s", "connection_comand_enqueue", "failed");
249 command_free(command);
250 }
251 }
252 connection_free(c);
253 }
254 }
255
256
257 /**
258 * Everything needed to accept a new connection, regardless of transport type.
259 */
260 static void
261 server_accept_conn_common_(struct server *server, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags)
262 {
263 struct connection *c;
264
265 c = calloc(1, sizeof *c);
266 if (!c) {
267 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
268 bufferevent_free(bev);
269 return;
270 }
271
272 if (connection_init(server, c, bev, sock, socklen, flags)) {
273 NOTIFY_ERROR("%s:%s", "connection_init", "failed");
274 bufferevent_free(bev);
275 free(c);
276 return;
277 }
278 c->state = CONNECTION_STATE_CONNECTED;
279
280 /* a bufferevent_add_cb interface would be better, but there isn't one yet */
281 bufferevent_setcb(c->bev, server_read_event_, server_write_event_, server_event_, c);
282 bufferevent_enable(c->bev, EV_READ|EV_WRITE);
283
284 connections_append(c);
285
286 struct command *command = command_new(NULL, 0, COMMAND_FLAG_EVENT_CONNECT);
287 if (command == NULL) {
288 NOTIFY_ERROR("%s:%s", "command_new", "failed");
289 return;
290 }
291 if (connection_command_enqueue(c, command)) {
292 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
293 command_free(command);
294 }
295 }
296
297
298 /**
299 * Accept a new plain-text connection.
300 */
301 static void
302 server_accept_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx)
303 {
304 struct server *server = ctx;
305 struct event_base *base = evconnlistener_get_base(listener);
306 struct bufferevent *bev;
307
308 bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
309 if (!bev) {
310 NOTIFY_ERROR("%s:%s", "bufferevent_socket_new", "failed");
311 return;
312 }
313
314 server_accept_conn_common_(server, bev, sock, socklen, 0);
315 }
316
317 /**
318 * Accept a new ssl connection.
319 */
320 static void
321 server_accept_ssl_conn_event_(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *ctx)
322 {
323 struct server *server = ctx;
324 struct event_base *base = evconnlistener_get_base(listener);
325 struct bufferevent *bev;
326 SSL *ssl;
327
328 ssl = SSL_new(server->ssl_ctx);
329
330 bev = bufferevent_openssl_socket_new(base, fd, ssl, BUFFEREVENT_SSL_ACCEPTING, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
331 if (!bev) {
332 NOTIFY_ERROR("%s:%s", "bufferevent_openssl_socket_new", "failed");
333 SSL_free(ssl);
334 return;
335 }
336
337 server_accept_conn_common_(server, bev, sock, socklen, CONN_TYPE_SSL);
338 }
339
340
341 /**
342 * Add a new listener binding to server for the provided address.
343 */
344 static int
345 server_add_listener_addrinfo_(struct addrinfo *ai, void *data)
346 {
347 struct server *s = ((struct server_listener_convenience_ *)data)->s;
348 evconnlistener_cb cb = ((struct server_listener_convenience_ *)data)->cb;
349 struct evconnlistener **l;
350 int retval = 0;
351 int r;
352
353 if ( (r = pthread_mutex_lock(&s->listeners_mutex)) ) {
354 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
355 return -1;
356 }
357
358 if (s->listeners_used == s->listeners_allocated) {
359 void *new_ptr;
360 size_t new_allocated = s->listeners_allocated + 8;
361
362 new_ptr = realloc(s->listeners, new_allocated * sizeof *(s->listeners));
363 if (!new_ptr) {
364 NOTIFY_ERROR("realloc(%zu): %s", new_allocated * sizeof *(s->listeners), strerror(errno));
365 retval = -1;
366 goto done;
367 }
368 s->listeners = new_ptr;
369 s->listeners_allocated = new_allocated;
370
371 /* recalloc */
372 memset(&s->listeners[s->listeners_used], 0, (s->listeners_allocated - s->listeners_used - 1) * sizeof *s->listeners);
373 }
374
375 #if DEBUG_SILLY
376 NOTIFY_DEBUG("new listener in slot %zu", s->listeners_used);
377 #endif
378
379 s->listeners_used++;
380 l = &(s->listeners[s->listeners_used - 1]);
381 *l = evconnlistener_new_bind(s->base, cb, s, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1, ai->ai_addr, ai->ai_addrlen);
382 if (!*l) {
383 NOTIFY_ERROR("%s:%s", "evconnlistener_new_bind", strerror(errno));
384 *l = NULL;
385 s->listeners_used--;
386 retval = -1;
387 goto done;
388 }
389 evconnlistener_set_error_cb(*l, server_accept_error_cb_);
390
391 done:
392 if ( (r = pthread_mutex_unlock(&s->listeners_mutex)) ) {
393 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
394 retval = -1;
395 }
396
397 return retval;
398 }
399
400
401 /**
402 * Allocates a new server structure, initializes all the structures which don't need
403 * any specific configuration. Such further setup is handled by server_init().
404 */
405 struct server *
406 server_new(void)
407 {
408 pthread_mutexattr_t attr;
409 struct server *s;
410 int r;
411
412 s = calloc(1, sizeof *s);
413 if (!s) {
414 NOTIFY_ERROR("calloc(%zu, %zu): %s", (size_t)1, sizeof *s, strerror(errno));
415 return NULL;
416 }
417
418 s->base = event_base_new();
419 if (!s->base) {
420 NOTIFY_ERROR("%s:%s", "event_base_new", "failed");
421 goto err_free_server;
422 }
423
424 s->evdns_base = evdns_base_new(s->base, 1);
425 if (!s->evdns_base) {
426 NOTIFY_ERROR("%s:%s", "evdns_base", "failed");
427 goto err_free_event_base;
428 }
429
430 if (workqueue_init(&s->workqueue, server_worker_context_destroy_, 0)) {
431 NOTIFY_ERROR("%s:%s", "workqueue_init", "failed");
432 goto err_free_evdns_base;
433 }
434
435 if ( (r = pthread_mutexattr_init(&attr)) ) {
436 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r));
437 goto err_fini_workqueue;
438 }
439
440 if ( (r = pthread_mutex_init(&s->listeners_mutex, &attr)) ) {
441 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
442 goto err_destroy_attr;
443 }
444
445 if (connections_init(&s->connections) < 0) {
446 NOTIFY_ERROR("%s:%s", "connections_init", "&s->connections failed");
447 goto err_destroy_mutex;
448 }
449
450 s->L = luaL_newstate();
451 if (!s->L) {
452 NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
453 goto err_free_connections;
454 }
455
456 if ( (r = pthread_mutexattr_destroy(&attr)) ) {
457 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r));
458 goto err_close_lua;
459 }
460
461 return s;
462
463 err_close_lua:
464 lua_close(s->L);
465 err_free_connections:
466 connections_fini(&s->connections);
467 err_destroy_mutex:
468 pthread_mutex_destroy(&s->listeners_mutex);
469 err_destroy_attr:
470 pthread_mutexattr_destroy(&attr);
471 err_fini_workqueue:
472 workqueue_fini(&s->workqueue, true);
473 err_free_evdns_base:
474 evdns_base_free(s->evdns_base, 0);
475 err_free_event_base:
476 event_base_free(s->base);
477 err_free_server:
478 free(s);
479 return NULL;
480 }
481
482 /**
483 * Looking at top of lua stack as string, parse as address and port,
484 * then add as listener to server.
485 */
486 static void
487 server_init_listener_directive_a_string_(lua_State *L, struct server_listener_convenience_ *c_data)
488 {
489 char *l_str;
490
491 l_str = (char *)lua_tostring(L, -1);
492 if (string_to_addrinfo_call(l_str, 0, server_add_listener_addrinfo_, c_data)) {
493 NOTIFY_ERROR("failed to add '%s' value '%s'", c_data->type, l_str ? l_str : "[unknown]");
494 }
495 NOTIFY_INFO("listening on %s (%s)", l_str, c_data->type);
496 }
497
498 /**
499 * Using configuration key provided in c_data->type, create all listeners.
500 */
501 static void
502 server_init_listener_directive_(lua_State *L, struct server_listener_convenience_ *c_data)
503 {
504 lua_getglobal(L, "server_config");
505 lua_getfield(L, -1, c_data->type);
506 if (lua_istable(L, -1)) {
507 lua_pushnil(L);
508 while (lua_next(L, -2) != 0) {
509 if (lua_isstring(L, -1)) {
510 server_init_listener_directive_a_string_(L, c_data);
511 } else {
512 char *l_str = (char *)lua_tostring(L, -2); /* table key name */
513 NOTIFY_ERROR("could not fathom value for '%s' at index '%s'", c_data->type, l_str ? l_str : "[unknown]");
514 }
515 lua_pop(L, 1);
516 }
517 } else if (lua_isstring(L, -1)) {
518 server_init_listener_directive_a_string_(L, c_data);
519 } else {
520 NOTIFY_ERROR("could not fathom '%s' value", c_data->type);
521 }
522 lua_pop(L, 2);
523 }
524
525 /**
526 * Return an allocated copy of value of global lua variable.
527 */
528 static int
529 get_lua_config_string_(lua_State *L, const char *key, char **value, size_t *value_len)
530 {
531 int retval = 0;
532 const char *l_str;
533 size_t l_str_len;
534 assert(key);
535 assert(value);
536 *value = NULL;
537
538 lua_getglobal(L, "server_config");
539 lua_getfield(L, -1, key);
540
541 if (!lua_isstring(L, -1)) {
542 NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "string", lua_typename(L, lua_type(L, -1)));
543 retval = -1;
544 goto done;
545 }
546 l_str = lua_tolstring(L, -1, &l_str_len);
547 if (!l_str) {
548 NOTIFY_ERROR("'%s' value invalid", key);
549 retval = -1;
550 goto done;
551 }
552 if (value_len) {
553 *value_len = l_str_len;
554 }
555 l_str_len += 1; // \0
556 *value = malloc(l_str_len);
557 if (!*value) {
558 NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
559 retval = -1;
560 goto done;
561 }
562 memcpy(*value, l_str, l_str_len);
563 done:
564 lua_pop(L, 1);
565 return retval;
566 }
567
568 static int
569 get_lua_config_integer_(lua_State *L, const char *key, int *value)
570 {
571 int retval = 0;
572 assert(key);
573 assert(value);
574
575 lua_getglobal(L, "server_config");
576 lua_getfield(L, -1, key);
577
578 if (!lua_isinteger(L, -1)) {
579 NOTIFY_ERROR("'%s' needs to be %s, but is %s", key, "integer", lua_typename(L, lua_type(L, -1)));
580 retval = -1;
581 goto done;
582 }
583 *value = (int)lua_tointeger(L, -1);
584 done:
585 lua_pop(L, 1);
586 return retval;
587 }
588
589 /**
590 * Load or reload the ssl context, with cert and settings.
591 */
592 int
593 server_refresh_ssl_context(lua_State *L, struct server *s)
594 {
595 SSL_CTX *new_ssl_ctx, *old_ssl_ctx;
596 char *l_str;
597
598 new_ssl_ctx = SSL_CTX_new(SSLv23_server_method());
599 if (!new_ssl_ctx) {
600 NOTIFY_ERROR("%s:%s", "SSL_CTX_new", ERR_error_string(ERR_get_error(), NULL));
601 return -1;
602 }
603
604 // load cert
605 if (get_lua_config_string_(L, "ssl_keyfile", &l_str, NULL)) {
606 return -1;
607 }
608 if (SSL_CTX_use_PrivateKey_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) {
609 NOTIFY_ERROR("%s file '%s': %s", "key", l_str, ERR_error_string(ERR_get_error(), NULL));
610 free(l_str);
611 return -1;
612 }
613 free(l_str);
614
615 if (get_lua_config_string_(L, "ssl_certfile", &l_str, NULL)) {
616 return -1;
617 }
618 if (SSL_CTX_use_certificate_file(new_ssl_ctx, l_str, SSL_FILETYPE_PEM) < 1) {
619 NOTIFY_ERROR("%s file '%s': %s", "certificate", l_str, ERR_error_string(ERR_get_error(), NULL));
620 free(l_str);
621 return -1;
622 }
623 free(l_str);
624
625 old_ssl_ctx = s->ssl_ctx;
626 s->ssl_ctx = new_ssl_ctx;
627
628 if (old_ssl_ctx) {
629 SSL_CTX_free(old_ssl_ctx);
630 }
631
632 return 0;
633 }
634
635
636 /**
637 * Example heartbeat worker job.
638 * N.B. server as data instead of connection.
639 */
640 static void
641 server_heartbeat_worker_(void *data, void *context UNUSED, size_t id UNUSED)
642 {
643 struct server *s = data;
644 struct connection **clist, **clist_iter;
645 char *message;
646 int r;
647
648 clist = connections_all_as_array(&(s->connections), NULL);
649 if (!clist) {
650 return;
651 }
652
653 message = strdup("[heartbeat]\n");
654 if (!message) {
655 NOTIFY_ERROR("%s:%s", "strdup", strerror(errno));
656 return;
657 }
658
659 r = connection_multi_send(clist, message, strlen(message));
660 if (r < 0) {
661 NOTIFY_ERROR("could not broadcast heartbeat");
662 } else {
663 NOTIFY_DEBUG("broadcast heartbeat to %d connection%s", r, (r > 1) ? "s" : "");
664 }
665
666 clist_iter = clist;
667 while (*clist_iter) {
668 connection_free(*clist_iter);
669 clist_iter++;
670 }
671 free(clist);
672 }
673
674
675 /**
676 * Periodic heartbeat handler.
677 */
678 static void
679 server_heartbeat_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg)
680 {
681 struct server *server = arg;
682
683 if (workqueue_add(&server->workqueue, server_heartbeat_worker_, server)) {
684 NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
685 }
686 }
687
688
689 /**
690 * Configure the server based on the provided lua file.
691 */
692 int
693 server_init(struct server *s, char *conf_file)
694 {
695 struct server_listener_convenience_ c_data;
696 int l_int;
697
698 /* Set up initial server state, in order to process configuration file. */
699 server_lua_openlibs_(s->L);
700
701 /* Create config table, which will be used as global env for loading config file. */
702 lua_newtable(s->L);
703 lua_setglobal(s->L, "server_config");
704
705
706 if (luaL_loadfile(s->L, conf_file)) {
707 NOTIFY_ERROR("could not %s configuration file '%s': %s",
708 "load",
709 conf_file,
710 lua_tostring(s->L, -1));
711 return -1;
712 }
713 /* server_config table becomes global env for executing config file */
714 lua_getglobal(s->L, "server_config");
715 lua_setupvalue(s->L, -2, 1);
716
717 if (lua_pcall(s->L, 0, 0, 0) != LUA_OK) {
718 NOTIFY_ERROR("could not %s configuration file '%s': %s",
719 "parse",
720 conf_file,
721 lua_tostring(s->L, -1));
722 return -1;
723 }
724
725 // FIXME: let db fetch its own options
726 struct database_options db_options = { 0 };
727 if (get_lua_config_string_(s->L, "db_file", &db_options.db_file, NULL)) {
728 return -1;
729 }
730
731 if (db_init(&s->db, &db_options)) {
732 NOTIFY_ERROR("%s:%s", "db_init", "failed");
733 free(db_options.db_file);
734 return -1;
735 }
736 free(db_options.db_file);
737
738 if (server_refresh_ssl_context(s->L, s)) {
739 return -1;
740 }
741
742 if (get_lua_config_integer_(s->L, "processing_threads", &l_int)) {
743 return -1;
744 }
745 /* FIXME: define reasonable limits */
746 if (l_int < 1 || l_int > 64) {
747 NOTIFY_ERROR("'%s' is not a reasonable value", "processing_threads");
748 return -1;
749 }
750 // FIXME: while l_int != workqueue_workers()
751 while (l_int) {
752 if (server_init_add_thread_(s)) {
753 NOTIFY_ERROR("could not start processing thread");
754 return -1;
755 }
756 l_int--;
757 }
758
759 c_data.s = s;
760 c_data.cb = server_accept_conn_event_;
761 c_data.type = "listen";
762 server_init_listener_directive_(s->L, &c_data);
763
764 c_data.cb = server_accept_ssl_conn_event_;
765 c_data.type = "listen_ssl";
766 server_init_listener_directive_(s->L, &c_data);
767
768 do {
769 struct timeval _seconds = {30, 0};
770 struct event *ev;
771 /* FIXME: instead of persist, reset from event */
772 ev = event_new(s->base, -1, EV_TIMEOUT | EV_PERSIST, server_heartbeat_event_, s);
773 event_add(ev, &_seconds);
774 } while (0);
775
776 NOTIFY_INFO("server starting...\n"
777 "\t%s %s (%s)\n"
778 "\tlibevent %s [%s]\n"
779 "\t%s %s\n"
780 "\t%s\n"
781 "\t%s",
782 "lemu", VERSION_STR, VERSION_DATE,
783 event_get_version(), event_base_get_method(s->base),
784 db_engine(), db_version(),
785 OpenSSL_version(OPENSSL_VERSION),
786 LUA_RELEASE);
787
788 event_base_dispatch(s->base);
789
790 return 0;
791 }
792
793 void
794 server_free(struct server *s)
795 {
796 int r;
797
798 while (s->listeners_used > 0) {
799 s->listeners_used--;
800 if (s->listeners[s->listeners_used]) {
801 evconnlistener_free(s->listeners[s->listeners_used]);
802 s->listeners[s->listeners_used] = NULL;
803 }
804 NOTIFY_DEBUG("freed listener %zu", s->listeners_used);
805 }
806 free(s->listeners);
807 s->listeners = NULL;
808
809 workqueue_fini(&s->workqueue, false);
810
811 if (s->ssl_ctx) {
812 SSL_CTX_free(s->ssl_ctx);
813 s->ssl_ctx = NULL;
814 }
815
816 if (s->evdns_base) {
817 evdns_base_free(s->evdns_base, 0);
818 s->evdns_base = NULL;
819 }
820
821 if (s->base) {
822 event_base_free(s->base);
823 s->base = NULL;
824 }
825
826 if ( (r = pthread_mutex_destroy(&s->connections.mutex)) ) {
827 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
828 }
829
830 if ( (r = pthread_mutex_destroy(&s->listeners_mutex)) ) {
831 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
832 }
833
834 lua_close(s->L);
835
836 db_fini(&s->db);
837
838 free(s);
839 }