7 #include <event2/bufferevent_ssl.h>
8 #include <event2/bufferevent.h>
9 #include <event2/buffer.h>
10 #include <event2/listener.h>
11 #include <event2/dns.h>
13 #include <openssl/ssl.h>
14 #include <openssl/err.h>
24 #include "lua_interface.h"
26 #include "workqueue.h"
30 /* Wrapper struct for passing multiple values through a voidstar.
31 * Used when calling server_add_listener_addrinfo_().
33 struct server_listener_convenience_
{
41 * Load all lua libs except for io and os modules,
42 * TODO: this should be more fine-grained and properly sandboxed. Also, add server lib (needs to be created).
45 server_lua_openlibs_(lua_State
*L
)
47 const luaL_Reg lualibs
[] = {
49 { LUA_LOADLIBNAME
, luaopen_package
},
50 { LUA_TABLIBNAME
, luaopen_table
},
51 { LUA_STRLIBNAME
, luaopen_string
},
52 { LUA_MATHLIBNAME
, luaopen_math
},
53 // { LUA_DBLIBNAME, luaopen_debug },
57 for (lib
= lualibs
; lib
->func
; lib
++) {
58 lua_pushcfunction(L
, lib
->func
);
59 lua_pushstring(L
, lib
->name
);
63 lemu_connection_luainit(L
);
68 * Allocate and initialize a new per-thread context.
70 static struct server_worker_context
*
71 server_worker_context_create_(struct server
*server
)
73 struct server_worker_context
*ctx
;
75 ctx
= calloc(1, sizeof *ctx
);
77 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
81 ctx
->L
= luaL_newstate();
83 NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
87 server_lua_openlibs_(ctx
->L
);
93 * Free a thread context.
94 * workqueue:worker_ctx_free_fn
97 server_worker_context_destroy_(void *data
)
99 struct server_worker_context
*ctx
= data
;
105 * Adds a worker thread to the pool.
108 server_init_add_thread_(struct server
*s
)
110 struct server_worker_context
*ctx
;
113 ctx
= server_worker_context_create_(s
);
115 NOTIFY_ERROR("%s:%s", "server_worker_context_create_", "failed");
119 worker_id
= workqueue_worker_add(&s
->workqueue
, ctx
, 0);
121 NOTIFY_ERROR("%s:%s", "workqueue_worker_add", "failed");
122 server_worker_context_destroy_(ctx
);
126 NOTIFY_DEBUG("added worker thread %zd", worker_id
);
132 * Handle accept errors.
135 server_accept_error_cb_(struct evconnlistener
*listener UNUSED
, void *ctx UNUSED
)
137 int err
= EVUTIL_SOCKET_ERROR();
138 NOTIFY_ERROR("%s (%d)", evutil_socket_error_to_string(err
), err
);
143 * Move incoming data to connection queue.
144 * TBD: is it worth work-queueing this data shuffling, or just let main thread handle it?
147 server_read_event_(struct bufferevent
*bev
, void *ctx
)
149 struct connection
*c
= ctx
;
150 struct evbuffer
*input
= bufferevent_get_input(bev
);
154 while ( (line
= (unsigned char *)evbuffer_readln(input
, &line_len
, EVBUFFER_EOL_CRLF
)) ) {
155 c
->total_read
+= line_len
; /* this drops the newlines from the total_read tally */
157 struct command
*command
= command_new(line
, line_len
, 0);
158 if (command
== NULL
) {
159 NOTIFY_ERROR("%s:%s", "command_new", "failed");
164 if (connection_command_enqueue(c
, command
)) {
165 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
166 command_free(command
);
174 * Handle write events.
177 server_write_event_(struct bufferevent
*bev UNUSED
, void *ctx
)
179 struct connection
*c
= ctx
;
181 if (c
->state
== CONNECTION_STATE_WANT_CLOSE
) {
182 c
->state
= CONNECTION_STATE_CLOSED
;
184 struct command
*command
= command_new(NULL
, 0, COMMAND_FLAG_EVENT_DISCONNECT
);
185 if (command
== NULL
) {
186 NOTIFY_ERROR("%s:%s", "command_new", "failed");
189 if (connection_command_enqueue(c
, command
)) {
190 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
191 command_free(command
);
199 * Handle some events.
202 server_event_(struct bufferevent
*bev UNUSED
, short events
, void *ctx
)
204 struct connection
*c
= ctx
;
207 if (events
& BEV_EVENT_READING
) {
208 NOTIFY_ERROR("reading error from '%s'", c
->name
);
212 if (events
& BEV_EVENT_WRITING
) {
213 NOTIFY_ERROR("writing error from '%s'", c
->name
);
217 if (events
& BEV_EVENT_EOF
) {
218 NOTIFY_DEBUG("eof from '%s'", c
->name
);
222 if (events
& BEV_EVENT_ERROR
) {
223 NOTIFY_ERROR("unrecoverable error from '%s': %s",
225 evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()) );
229 if (events
& BEV_EVENT_TIMEOUT
) {
230 NOTIFY_ERROR("timeout from '%s'", c
->name
);
234 if (events
& BEV_EVENT_CONNECTED
) {
235 NOTIFY_DEBUG("connected from '%s'", c
->name
);
238 if (events
& ~(BEV_EVENT_READING
|BEV_EVENT_WRITING
|BEV_EVENT_EOF
|BEV_EVENT_ERROR
|BEV_EVENT_TIMEOUT
|BEV_EVENT_CONNECTED
)) {
239 NOTIFY_ERROR("unrecognized event from '%s': %hd", c
->name
, events
);
243 struct command
*command
= command_new(NULL
, 0, COMMAND_FLAG_EVENT_DISCONNECT
);
244 if (command
== NULL
) {
245 NOTIFY_ERROR("%s:%s", "command_new", "failed");
247 if (connection_command_enqueue(c
, command
)) {
248 NOTIFY_ERROR("%s:%s", "connection_comand_enqueue", "failed");
249 command_free(command
);
258 * Everything needed to accept a new connection, regardless of transport type.
261 server_accept_conn_common_(struct server
*server
, struct bufferevent
*bev
, struct sockaddr
*sock
, int socklen
, connection_flags_t flags
)
263 struct connection
*c
;
265 c
= calloc(1, sizeof *c
);
267 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
268 bufferevent_free(bev
);
272 if (connection_init(server
, c
, bev
, sock
, socklen
, flags
)) {
273 NOTIFY_ERROR("%s:%s", "connection_init", "failed");
274 bufferevent_free(bev
);
278 c
->state
= CONNECTION_STATE_CONNECTED
;
280 /* a bufferevent_add_cb interface would be better, but there isn't one yet */
281 bufferevent_setcb(c
->bev
, server_read_event_
, server_write_event_
, server_event_
, c
);
282 bufferevent_enable(c
->bev
, EV_READ
|EV_WRITE
);
284 connections_append(c
);
286 struct command
*command
= command_new(NULL
, 0, COMMAND_FLAG_EVENT_CONNECT
);
287 if (command
== NULL
) {
288 NOTIFY_ERROR("%s:%s", "command_new", "failed");
291 if (connection_command_enqueue(c
, command
)) {
292 NOTIFY_ERROR("%s:%s", "connection_command_enqueue", "failed");
293 command_free(command
);
299 * Accept a new plain-text connection.
302 server_accept_conn_event_(struct evconnlistener
*listener
, evutil_socket_t fd
, struct sockaddr
*sock
, int socklen
, void *ctx
)
304 struct server
*server
= ctx
;
305 struct event_base
*base
= evconnlistener_get_base(listener
);
306 struct bufferevent
*bev
;
308 bev
= bufferevent_socket_new(base
, fd
, BEV_OPT_CLOSE_ON_FREE
| BEV_OPT_THREADSAFE
| BEV_OPT_DEFER_CALLBACKS
);
310 NOTIFY_ERROR("%s:%s", "bufferevent_socket_new", "failed");
314 server_accept_conn_common_(server
, bev
, sock
, socklen
, 0);
318 * Accept a new ssl connection.
321 server_accept_ssl_conn_event_(struct evconnlistener
*listener
, evutil_socket_t fd
, struct sockaddr
*sock
, int socklen
, void *ctx
)
323 struct server
*server
= ctx
;
324 struct event_base
*base
= evconnlistener_get_base(listener
);
325 struct bufferevent
*bev
;
328 ssl
= SSL_new(server
->ssl_ctx
);
330 bev
= bufferevent_openssl_socket_new(base
, fd
, ssl
, BUFFEREVENT_SSL_ACCEPTING
, BEV_OPT_CLOSE_ON_FREE
| BEV_OPT_THREADSAFE
| BEV_OPT_DEFER_CALLBACKS
);
332 NOTIFY_ERROR("%s:%s", "bufferevent_openssl_socket_new", "failed");
337 server_accept_conn_common_(server
, bev
, sock
, socklen
, CONN_TYPE_SSL
);
342 * Add a new listener binding to server for the provided address.
345 server_add_listener_addrinfo_(struct addrinfo
*ai
, void *data
)
347 struct server
*s
= ((struct server_listener_convenience_
*)data
)->s
;
348 evconnlistener_cb cb
= ((struct server_listener_convenience_
*)data
)->cb
;
349 struct evconnlistener
**l
;
353 if ( (r
= pthread_mutex_lock(&s
->listeners_mutex
)) ) {
354 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r
));
358 if (s
->listeners_used
== s
->listeners_allocated
) {
360 size_t new_allocated
= s
->listeners_allocated
+ 8;
362 new_ptr
= realloc(s
->listeners
, new_allocated
* sizeof *(s
->listeners
));
364 NOTIFY_ERROR("realloc(%zu): %s", new_allocated
* sizeof *(s
->listeners
), strerror(errno
));
368 s
->listeners
= new_ptr
;
369 s
->listeners_allocated
= new_allocated
;
372 memset(&s
->listeners
[s
->listeners_used
], 0, (s
->listeners_allocated
- s
->listeners_used
- 1) * sizeof *s
->listeners
);
376 NOTIFY_DEBUG("new listener in slot %zu", s
->listeners_used
);
380 l
= &(s
->listeners
[s
->listeners_used
- 1]);
381 *l
= evconnlistener_new_bind(s
->base
, cb
, s
, LEV_OPT_CLOSE_ON_FREE
| LEV_OPT_REUSEABLE
, -1, ai
->ai_addr
, ai
->ai_addrlen
);
383 NOTIFY_ERROR("%s:%s", "evconnlistener_new_bind", strerror(errno
));
389 evconnlistener_set_error_cb(*l
, server_accept_error_cb_
);
392 if ( (r
= pthread_mutex_unlock(&s
->listeners_mutex
)) ) {
393 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r
));
402 * Allocates a new server structure, initializes all the structures which don't need
403 * any specific configuration. Such further setup is handled by server_init().
408 pthread_mutexattr_t attr
;
412 s
= calloc(1, sizeof *s
);
414 NOTIFY_ERROR("calloc(%zu, %zu): %s", (size_t)1, sizeof *s
, strerror(errno
));
418 s
->base
= event_base_new();
420 NOTIFY_ERROR("%s:%s", "event_base_new", "failed");
421 goto err_free_server
;
424 s
->evdns_base
= evdns_base_new(s
->base
, 1);
425 if (!s
->evdns_base
) {
426 NOTIFY_ERROR("%s:%s", "evdns_base", "failed");
427 goto err_free_event_base
;
430 if (workqueue_init(&s
->workqueue
, server_worker_context_destroy_
, 0)) {
431 NOTIFY_ERROR("%s:%s", "workqueue_init", "failed");
432 goto err_free_evdns_base
;
435 if ( (r
= pthread_mutexattr_init(&attr
)) ) {
436 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_init", strerror(r
));
437 goto err_fini_workqueue
;
440 if ( (r
= pthread_mutex_init(&s
->listeners_mutex
, &attr
)) ) {
441 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r
));
442 goto err_destroy_attr
;
445 if (connections_init(&s
->connections
) < 0) {
446 NOTIFY_ERROR("%s:%s", "connections_init", "&s->connections failed");
447 goto err_destroy_mutex
;
450 s
->L
= luaL_newstate();
452 NOTIFY_ERROR("%s:%s", "luaL_newstate", "failed");
453 goto err_free_connections
;
456 if ( (r
= pthread_mutexattr_destroy(&attr
)) ) {
457 NOTIFY_ERROR("%s:%s", "pthread_mutexattr_destroy", strerror(r
));
465 err_free_connections
:
466 connections_fini(&s
->connections
);
468 pthread_mutex_destroy(&s
->listeners_mutex
);
470 pthread_mutexattr_destroy(&attr
);
472 workqueue_fini(&s
->workqueue
, true);
474 evdns_base_free(s
->evdns_base
, 0);
476 event_base_free(s
->base
);
483 * Looking at top of lua stack as string, parse as address and port,
484 * then add as listener to server.
487 server_init_listener_directive_a_string_(lua_State
*L
, struct server_listener_convenience_
*c_data
)
491 l_str
= (char *)lua_tostring(L
, -1);
492 if (string_to_addrinfo_call(l_str
, 0, server_add_listener_addrinfo_
, c_data
)) {
493 NOTIFY_ERROR("failed to add '%s' value '%s'", c_data
->type
, l_str
? l_str
: "[unknown]");
495 NOTIFY_INFO("listening on %s (%s)", l_str
, c_data
->type
);
499 * Using configuration key provided in c_data->type, create all listeners.
502 server_init_listener_directive_(lua_State
*L
, struct server_listener_convenience_
*c_data
)
504 lua_getglobal(L
, "server_config");
505 lua_getfield(L
, -1, c_data
->type
);
506 if (lua_istable(L
, -1)) {
508 while (lua_next(L
, -2) != 0) {
509 if (lua_isstring(L
, -1)) {
510 server_init_listener_directive_a_string_(L
, c_data
);
512 char *l_str
= (char *)lua_tostring(L
, -2); /* table key name */
513 NOTIFY_ERROR("could not fathom value for '%s' at index '%s'", c_data
->type
, l_str
? l_str
: "[unknown]");
517 } else if (lua_isstring(L
, -1)) {
518 server_init_listener_directive_a_string_(L
, c_data
);
520 NOTIFY_ERROR("could not fathom '%s' value", c_data
->type
);
526 * Return an allocated copy of value of global lua variable.
529 get_lua_config_string_(lua_State
*L
, const char *key
, char **value
, size_t *value_len
)
538 lua_getglobal(L
, "server_config");
539 lua_getfield(L
, -1, key
);
541 if (!lua_isstring(L
, -1)) {
542 NOTIFY_ERROR("'%s' needs to be %s, but is %s", key
, "string", lua_typename(L
, lua_type(L
, -1)));
546 l_str
= lua_tolstring(L
, -1, &l_str_len
);
548 NOTIFY_ERROR("'%s' value invalid", key
);
553 *value_len
= l_str_len
;
555 l_str_len
+= 1; // \0
556 *value
= malloc(l_str_len
);
558 NOTIFY_ERROR("%s:%s", "malloc", strerror(errno
));
562 memcpy(*value
, l_str
, l_str_len
);
569 get_lua_config_integer_(lua_State
*L
, const char *key
, int *value
)
575 lua_getglobal(L
, "server_config");
576 lua_getfield(L
, -1, key
);
578 if (!lua_isinteger(L
, -1)) {
579 NOTIFY_ERROR("'%s' needs to be %s, but is %s", key
, "integer", lua_typename(L
, lua_type(L
, -1)));
583 *value
= (int)lua_tointeger(L
, -1);
590 * Load or reload the ssl context, with cert and settings.
593 server_refresh_ssl_context(lua_State
*L
, struct server
*s
)
595 SSL_CTX
*new_ssl_ctx
, *old_ssl_ctx
;
598 new_ssl_ctx
= SSL_CTX_new(SSLv23_server_method());
600 NOTIFY_ERROR("%s:%s", "SSL_CTX_new", ERR_error_string(ERR_get_error(), NULL
));
605 if (get_lua_config_string_(L
, "ssl_keyfile", &l_str
, NULL
)) {
608 if (SSL_CTX_use_PrivateKey_file(new_ssl_ctx
, l_str
, SSL_FILETYPE_PEM
) < 1) {
609 NOTIFY_ERROR("%s file '%s': %s", "key", l_str
, ERR_error_string(ERR_get_error(), NULL
));
615 if (get_lua_config_string_(L
, "ssl_certfile", &l_str
, NULL
)) {
618 if (SSL_CTX_use_certificate_file(new_ssl_ctx
, l_str
, SSL_FILETYPE_PEM
) < 1) {
619 NOTIFY_ERROR("%s file '%s': %s", "certificate", l_str
, ERR_error_string(ERR_get_error(), NULL
));
625 old_ssl_ctx
= s
->ssl_ctx
;
626 s
->ssl_ctx
= new_ssl_ctx
;
629 SSL_CTX_free(old_ssl_ctx
);
637 * Example heartbeat worker job.
638 * N.B. server as data instead of connection.
641 server_heartbeat_worker_(void *data
, void *context UNUSED
, size_t id UNUSED
)
643 struct server
*s
= data
;
644 struct connection
**clist
, **clist_iter
;
648 clist
= connections_all_as_array(&(s
->connections
), NULL
);
653 message
= strdup("[heartbeat]\n");
655 NOTIFY_ERROR("%s:%s", "strdup", strerror(errno
));
659 r
= connection_multi_send(clist
, message
, strlen(message
));
661 NOTIFY_ERROR("could not broadcast heartbeat");
663 NOTIFY_DEBUG("broadcast heartbeat to %d connection%s", r
, (r
> 1) ? "s" : "");
667 while (*clist_iter
) {
668 connection_free(*clist_iter
);
676 * Periodic heartbeat handler.
679 server_heartbeat_event_(evutil_socket_t fd UNUSED
, short what UNUSED
, void *arg
)
681 struct server
*server
= arg
;
683 if (workqueue_add(&server
->workqueue
, server_heartbeat_worker_
, server
)) {
684 NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
690 * Configure the server based on the provided lua file.
693 server_init(struct server
*s
, char *conf_file
)
695 struct server_listener_convenience_ c_data
;
698 /* Set up initial server state, in order to process configuration file. */
699 server_lua_openlibs_(s
->L
);
701 /* Create config table, which will be used as global env for loading config file. */
703 lua_setglobal(s
->L
, "server_config");
706 if (luaL_loadfile(s
->L
, conf_file
)) {
707 NOTIFY_ERROR("could not %s configuration file '%s': %s",
710 lua_tostring(s
->L
, -1));
713 /* server_config table becomes global env for executing config file */
714 lua_getglobal(s
->L
, "server_config");
715 lua_setupvalue(s
->L
, -2, 1);
717 if (lua_pcall(s
->L
, 0, 0, 0) != LUA_OK
) {
718 NOTIFY_ERROR("could not %s configuration file '%s': %s",
721 lua_tostring(s
->L
, -1));
725 // FIXME: let db fetch its own options
726 struct database_options db_options
= { 0 };
727 if (get_lua_config_string_(s
->L
, "db_file", &db_options
.db_file
, NULL
)) {
731 if (db_init(&s
->db
, &db_options
)) {
732 NOTIFY_ERROR("%s:%s", "db_init", "failed");
733 free(db_options
.db_file
);
736 free(db_options
.db_file
);
738 if (server_refresh_ssl_context(s
->L
, s
)) {
742 if (get_lua_config_integer_(s
->L
, "processing_threads", &l_int
)) {
745 /* FIXME: define reasonable limits */
746 if (l_int
< 1 || l_int
> 64) {
747 NOTIFY_ERROR("'%s' is not a reasonable value", "processing_threads");
750 // FIXME: while l_int != workqueue_workers()
752 if (server_init_add_thread_(s
)) {
753 NOTIFY_ERROR("could not start processing thread");
760 c_data
.cb
= server_accept_conn_event_
;
761 c_data
.type
= "listen";
762 server_init_listener_directive_(s
->L
, &c_data
);
764 c_data
.cb
= server_accept_ssl_conn_event_
;
765 c_data
.type
= "listen_ssl";
766 server_init_listener_directive_(s
->L
, &c_data
);
769 struct timeval _seconds
= {30, 0};
771 /* FIXME: instead of persist, reset from event */
772 ev
= event_new(s
->base
, -1, EV_TIMEOUT
| EV_PERSIST
, server_heartbeat_event_
, s
);
773 event_add(ev
, &_seconds
);
776 NOTIFY_INFO("server starting...\n"
778 "\tlibevent %s [%s]\n"
782 "lemu", VERSION_STR
, VERSION_DATE
,
783 event_get_version(), event_base_get_method(s
->base
),
784 db_engine(), db_version(),
785 OpenSSL_version(OPENSSL_VERSION
),
788 event_base_dispatch(s
->base
);
794 server_free(struct server
*s
)
798 while (s
->listeners_used
> 0) {
800 if (s
->listeners
[s
->listeners_used
]) {
801 evconnlistener_free(s
->listeners
[s
->listeners_used
]);
802 s
->listeners
[s
->listeners_used
] = NULL
;
804 NOTIFY_DEBUG("freed listener %zu", s
->listeners_used
);
809 workqueue_fini(&s
->workqueue
, false);
812 SSL_CTX_free(s
->ssl_ctx
);
817 evdns_base_free(s
->evdns_base
, 0);
818 s
->evdns_base
= NULL
;
822 event_base_free(s
->base
);
826 if ( (r
= pthread_mutex_destroy(&s
->connections
.mutex
)) ) {
827 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));
830 if ( (r
= pthread_mutex_destroy(&s
->listeners_mutex
)) ) {
831 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r
));