#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #ifndef NI_MAXHOST # define NI_MAXHOST 1025 #endif #ifndef NI_MAXSERV # define NI_MAXSERV 32 #endif #include // needs _GNU_SOURCE for RUSAGE_THREAD #include #include #include #include #include #include "common.h" #include "notify.h" #include "connections.h" #include "command.h" #include "server.h" /** * A reference-counted data buffer. * Used internally here for sending out one message to multiple connections, without * creating a copy for each connection. * Frees data pointer when the last reference is removed. * A connection thread increases the reference count as it sends the message to each * destination connection, while the libevent thread decrements as it completes each * transmission. */ struct rc_data_ { pthread_mutex_t mutex; size_t reference_count; size_t data_len; char *data; }; /** * Allocates, initializes, and returns a new struct rc_data_ entity. */ static struct rc_data_ * rc_data_new_(char *data, size_t len) { struct rc_data_ *rc; int r; if (!data) { NOTIFY_DEBUG("data:%p len:%zu", data, len); return NULL; } rc = malloc(sizeof *rc); if (!rc) { NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno)); return NULL; } if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); free(rc); return NULL; } rc->reference_count = 1; rc->data_len = len; rc->data = data; return rc; } /** * Increments the reference count of a struct rc_data_ entity. */ static void rc_data_ref_inc_(struct rc_data_ *rc) { int r; if ( (r = pthread_mutex_lock(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } rc->reference_count += 1; if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } } /** * Decrements the reference count of a struct rc_data_ entity * and frees everything if there are no more references. */ static void rc_data_free_(struct rc_data_ *rc) { int r; if ( (r = pthread_mutex_lock(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } rc->reference_count -= 1; if (rc->reference_count > 0) { if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } return; } free(rc->data); rc->data = NULL; if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } if ( (r = pthread_mutex_destroy(&rc->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } memset(rc, 0, sizeof *rc); free(rc); } /** * Wrapper for rc_data_free_() of the proper callback type to be used by * evbuffer_add_reference() in connection_multi_send(). */ static void rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg) { struct rc_data_ *r = arg; rc_data_free_(r); } /** * Initializes a struct connections entity. */ int connections_init(struct connections *cs) { int r; cs->count = 0; TAILQ_INIT(&cs->head); if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); return -1; } return 0; } /** * Cleanup a struct connections. */ void connections_fini(struct connections *cs) { struct connection *c1, *c2; int r; #if DEBUG_SILLY NOTIFY_DEBUG("cs:%p", cs); #endif /* free any connection entities */ c1 = TAILQ_FIRST(&cs->head); while (c1 != NULL) { c2 = TAILQ_NEXT(c1, tailq_entry); connection_free(c1); c1 = c2; } TAILQ_INIT(&cs->head); cs->count = 0; if ( (r = pthread_mutex_destroy(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } } /** * Inserts a connection at the end of connections list. */ void connections_append(struct connection *c) { struct connections *cs = &c->server->connections; int r; #if DEBUG_SILLY NOTIFY_DEBUG("c:%p", c); #endif if ( (r = pthread_mutex_lock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry); cs->count += 1; if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); return; } } /** * Removes a connection from its struct connections list. */ void connections_remove(struct connection *c) { struct connections *cs = &c->server->connections; int r; #if DEBUG_SILLY NOTIFY_DEBUG("c:%p", c); #endif if ( !c || !cs) { NOTIFY_DEBUG("c:%p connections:%p", c, cs); return; } if ( (r = pthread_mutex_lock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } TAILQ_REMOVE(&cs->head, c, tailq_entry); cs->count -= 1; if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); return; } } /** * Allocates and returns a null-terminated array of pointers to all connection entities in * connections list, minus the 'c_exclude' connection, if set. * Used as a simple implementation of broadcasting a message. * This increments the reference count of each connection added to the array * so they need to individually be freed before freeing the array. */ struct connection ** connections_all_as_array(struct connections *cs, struct connection *c_exclude) { struct connection **all; struct connection *c; size_t i = 0; int r; if ( (r = pthread_mutex_lock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return NULL; } all = calloc(cs->count + 1, sizeof *all); if (!all) { NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno)); goto err_unlock; } TAILQ_FOREACH(c, &cs->head, tailq_entry) { if (c != c_exclude && c->state < CONNECTION_STATE_WANT_CLOSE && c->state > CONNECTION_STATE_INIT) { connection_inc_ref(c); all[i++] = c; } } if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); while (i) { i--; connection_free(c); } free(all); return NULL; } return all; err_unlock: if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } return NULL; } /** * Wrap ev timeout event to re-resolve hostname. */ static void connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg) { struct connection *c = arg; NOTIFY_DEBUG("retrying hostname resolution"); c->dns_retry_ev = NULL; c->dns_retries += 1; connection_resolve_hostname(c); } /** * A callback used by connection_init to populate the reverse-lookup of a connection's * client_address. **/ static void connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx) { struct connection *c = ctx; char *old_client_address = c->client_address; if (result != DNS_ERR_NONE) { NOTIFY_ERROR("Error resolving: %s", evdns_err_to_string(result) ); c->evdns_request = NULL; // TODO: configurable // if (server_get_config_integer(c->server, "dns_retries_max")) { // if (server_get_config_integer(c->server, "dns_retry_seconds")) { if (c->dns_retries < 4) { struct timeval _seconds = {30, 0}; c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c); event_add(c->dns_retry_ev, &_seconds); }; return; } switch (type) { case DNS_PTR: if (count < 1) { return; } if (count > 1) { NOTIFY_DEBUG("multiple records returned, using first"); } c->client_address = strdup(((char **)addresses)[0]); NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c); break; default: NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type); break; } c->evdns_request = NULL; if (old_client_address) { free(old_client_address); } } /** * Perform reverse lookup of connection address. */ void connection_resolve_hostname(struct connection *c) { if (c->evdns_request) { NOTIFY_DEBUG("resolution already in progress"); return; } /* does libevent's evdns have a getnameinfo style call? */ switch (c->sa->sa_family) { case AF_INET: c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c); break; case AF_INET6: c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c); break; default: NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family); } if (!c->evdns_request) { NOTIFY_DEBUG("could not submit PTR lookup request"); } } /** * Populates a connection with initial information and sets reference count. */ int connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; int r; uuid_rc_t rc; if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); goto err; } if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) { NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r)); goto err_rc_mutex_destroy; } rc = uuid_create(&c->uuid); if (rc != UUID_RC_OK) { NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc)); goto err_commands_mutex_destroy; } rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC); if (rc != UUID_RC_OK) { NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc)); goto err_uuid_destroy; } c->state = CONNECTION_STATE_INIT; c->bev = bev; c->flags = flags; c->server = s; /* render an IP from a sockaddr */ r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); if (r) { NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r)); strncpy(hbuf, "(unknown)", sizeof hbuf); strncpy(sbuf, "(?)", sizeof sbuf); } c->client_address = strdup(hbuf); if (!c->client_address) { NOTIFY_ERROR("%s:%s", "strdup", strerror(errno)); goto err_uuid_destroy; } /* Past this point, errors are non-fatal. */ c->reference_count = 1; /* Now try to resolve a name from client ip, in background. */ c->sa = sock; c->sa_len = socklen; c->dns_retries = 0; c->dns_retry_ev = NULL; connection_resolve_hostname(c); /* FIXME: temporary name of connection for POC */ static int current_connection_number; r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++); if ((size_t)r >= sizeof c->name) { NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__); } #if DARWIN /* Darwin systems <10.12 lack clock_gettime (!) */ if (gettimeofday(&c->connect_time, NULL)) { NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); } NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec)); #else if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) { NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); } NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec)); #endif /* DARWIN */ return 0; err_uuid_destroy: rc = uuid_destroy(c->uuid); if (rc != UUID_RC_OK) { NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); } err_commands_mutex_destroy: if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); } err_rc_mutex_destroy: if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); } err: return -1; } /** * Process a connection's commands as long as there are commands to process. * FIXME: track how much time has been spend consuming commands, re-queue * processor job when over some threshold considering pending workqueue */ static void connection_process_queue_(void *data, void *context, size_t id) { struct connection *c = data; struct server_worker_context *ctx = context; struct command *command; struct rusage ru_start, ru_end; struct timespec ts_start, ts_current; int r; if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) { NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno)); } while (1) { if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } command = STAILQ_FIRST(&c->commands_head); if (command) { STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry); } if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); return; } if (command == NULL) { break; } if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) { NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); } command_parse(c, context, id, command); if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) { NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); } /* track how much time was spent procesing */ connection_accounting_increment(c, &ru_start, &ru_end); command_free(command); if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) { NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); } } connection_free(c); // Done processing for a connection, force GC cycle to release additional c references. lua_gc(ctx->L, LUA_GCCOLLECT); } /** * Adds a command to a connection's list of things to process. */ int connection_command_enqueue(struct connection *c, struct command *command) { bool was_empty; int r; if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { command_free(command); NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return -1; } was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false; if (was_empty == true) { STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry); } else { STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry); } #if DARWIN /* Darwin systems <10.12 lack clock_gettime (!) */ if (gettimeofday(&c->last_received_time, NULL)) { NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); } #else if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) { NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); } #endif /* DARWIN */ if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); return -1; } /* if there were already queued commands, a processor should already be scheduled */ if (was_empty == true) { connection_inc_ref(c); if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) { NOTIFY_ERROR("%s:%s", "workqueue_add", "failed"); return -1; } } return 0; } /** * Add the utime and stime values to a connection's totals. */ void connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end) { struct timeval utime_diff, stime_diff; timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime); timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime); timeval_increment(&c->utime, &utime_diff); timeval_increment(&c->stime, &stime_diff); } /** * Locks the output buffer, for sending multiple * contiguous messages without possible interleaving. */ void connection_lock_output(struct connection *c) { struct evbuffer *output = bufferevent_get_output(c->bev); evbuffer_lock(output); } /** * Unlocks the output buffer. */ void connection_unlock_output(struct connection *c) { struct evbuffer *output = bufferevent_get_output(c->bev); evbuffer_unlock(output); } /** * Sends a text string to a connection. */ int connection_printf(struct connection *c, const char *fmt, ...) { struct evbuffer *output = bufferevent_get_output(c->bev); va_list ap; int r; va_start(ap, fmt); r = evbuffer_add_vprintf(output, fmt, ap); va_end(ap); return r; } /** * Sends a text string to a connection. */ int connection_vprintf(struct connection *c, const char *fmt, va_list ap) { struct evbuffer *output = bufferevent_get_output(c->bev); int r; r = evbuffer_add_vprintf(output, fmt, ap); return r; } /** * Send one data buffer to multiple connections. * Data buffer must be malloced, and will be freed when last connection * has finished with it. * Returns number of items sent, or negative on error. */ int connection_multi_send(struct connection **clist, char *data, size_t len) { struct connection **clist_iter; struct rc_data_ *rc; int r = 0; if (!clist) { NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__); return -1; } /* Track our data so it can be freed once all connections are done with it. */ rc = rc_data_new_(data, len); if (rc == NULL) { NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len); return -1; } clist_iter = clist; while (*clist_iter) { struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev); /* FIXME: if connection is still valid... */ /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */ rc_data_ref_inc_(rc); if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) { NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed"); rc_data_free_(rc); } else { r++; } clist_iter++; } /* FIXME: clang also claims this is use-after-free, same as above. */ rc_data_free_(rc); return r; } int connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...) { struct connection **clist; char *message = NULL; ssize_t message_len = 0; va_list ap; int r; va_start(ap, fmt); message_len = vsnprintf(message, message_len, fmt, ap); va_end(ap); if (message_len < 0) { NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); return -1; } message_len += 1; message = malloc(message_len); if (!message) { NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno)); return -1; } va_start(ap, fmt); message_len = vsnprintf(message, message_len, fmt, ap); va_end(ap); if (message_len < 0) { NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); free(message); return -1; } #ifdef DEBUG_SILLY NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message); #endif clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL); r = connection_multi_send(clist, message, message_len); if (r < 0) { NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed"); } for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) { connection_free(*clist_iter); } free(clist); return r; } /** * */ void connection_inc_ref(struct connection *c) { int r; if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } c->reference_count += 1; #if DEBUG_SILLY NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); #endif if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); return; } } /** * Frees a connection (and all data owned by a connection) if there are no more references to it. */ void connection_free(struct connection *c) { struct evbuffer *tmp; evutil_socket_t fd; int r; if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); return; } c->reference_count -= 1; #if DEBUG_SILLY NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); #endif if (c->reference_count > 0) { if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } return; } NOTIFY_DEBUG("c:%p freeing", c); connections_remove(c); bufferevent_disable(c->bev, EV_READ|EV_WRITE); tmp = bufferevent_get_output(c->bev); evbuffer_drain(tmp, evbuffer_get_length(tmp)); tmp = bufferevent_get_input(c->bev); evbuffer_drain(tmp, evbuffer_get_length(tmp)); fd = bufferevent_getfd(c->bev); if (fd != -1) { EVUTIL_CLOSESOCKET(fd); bufferevent_setfd(c->bev, -1); } if (c->dns_retry_ev) { if (event_del(c->dns_retry_ev)) { NOTIFY_ERROR("%s:%s", "event_del", strerror(errno)); } c->dns_retry_ev = NULL; } if (c->evdns_request) { evdns_cancel_request(c->server->evdns_base, c->evdns_request); c->evdns_request = NULL; } if (c->client_address) { free(c->client_address); c->client_address = NULL; } bufferevent_free(c->bev); if (c->uuid) { uuid_rc_t rc; rc = uuid_destroy(c->uuid); if (rc != UUID_RC_OK) { NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); } c->uuid = NULL; } /* empty out the command queue */ if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); } struct command *c1, *c2; c1 = STAILQ_FIRST(&c->commands_head); while (c1 != NULL) { c2 = STAILQ_NEXT(c1, stailq_entry); free(c1); c1 = c2; } STAILQ_INIT(&c->commands_head); if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); } if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); } memset(c, 0, sizeof *c); free(c); }