X-Git-Url: http://git.squeep.com/?p=lemu;a=blobdiff_plain;f=connections.c;fp=connections.c;h=fa35f791531029e401bab4f33445e7212c8796d8;hp=0000000000000000000000000000000000000000;hb=3c54afe11e890fc476cc4730226be1c45f8a04cb;hpb=29235d4c1f0b11bd2efcad262eaae70383228293 diff --git a/connections.c b/connections.c new file mode 100644 index 0000000..fa35f79 --- /dev/null +++ b/connections.c @@ -0,0 +1,927 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#ifndef NI_MAXHOST +# define NI_MAXHOST 1025 +#endif +#ifndef NI_MAXSERV +# define NI_MAXSERV 32 +#endif + +#include // needs _GNU_SOURCE for RUSAGE_THREAD + +#include +#include +#include +#include + +#include + +#include "common.h" +#include "notify.h" +#include "connections.h" +#include "command.h" +#include "server.h" + +/** + * A reference-counted data buffer. + * Used internally here for sending out one message to multiple connections, without + * creating a copy for each connection. + * Frees data pointer when the last reference is removed. + * A connection thread increases the reference count as it sends the message to each + * destination connection, while the libevent thread decrements as it completes each + * transmission. + */ +struct rc_data_ { + pthread_mutex_t mutex; + size_t reference_count; + size_t data_len; + char *data; +}; + +/** + * Allocates, initializes, and returns a new struct rc_data_ entity. + */ +static struct rc_data_ * +rc_data_new_(char *data, size_t len) +{ + struct rc_data_ *rc; + int r; + + if (!data) { + NOTIFY_DEBUG("data:%p len:%zu", data, len); + return NULL; + } + + rc = malloc(sizeof *rc); + if (!rc) { + NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno)); + return NULL; + } + + if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + free(rc); + return NULL; + } + + rc->reference_count = 1; + rc->data_len = len; + rc->data = data; + + return rc; +} + +/** + * Increments the reference count of a struct rc_data_ entity. + */ +static void +rc_data_ref_inc_(struct rc_data_ *rc) +{ + int r; + + if ( (r = pthread_mutex_lock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + rc->reference_count += 1; + + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } +} + +/** + * Decrements the reference count of a struct rc_data_ entity + * and frees everything if there are no more references. + */ +static void +rc_data_free_(struct rc_data_ *rc) +{ + int r; + + if ( (r = pthread_mutex_lock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + rc->reference_count -= 1; + if (rc->reference_count > 0) { + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return; + } + + free(rc->data); + rc->data = NULL; + if ( (r = pthread_mutex_unlock(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&rc->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + memset(rc, 0, sizeof *rc); + + free(rc); +} + +/** + * Wrapper for rc_data_free_() of the proper callback type to be used by + * evbuffer_add_reference() in connection_multi_send(). + */ +static void +rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg) +{ + struct rc_data_ *r = arg; + + rc_data_free_(r); +} + +/** + * Initializes a struct connections entity. + */ +int +connections_init(struct connections *cs) +{ + int r; + + cs->count = 0; + TAILQ_INIT(&cs->head); + + if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + return -1; + } + + return 0; +} + +/** + * Cleanup a struct connections. + */ +void +connections_fini(struct connections *cs) +{ + struct connection *c1, *c2; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("cs:%p", cs); +#endif + + /* free any connection entities */ + c1 = TAILQ_FIRST(&cs->head); + while (c1 != NULL) { + c2 = TAILQ_NEXT(c1, tailq_entry); + connection_free(c1); + c1 = c2; + } + TAILQ_INIT(&cs->head); + cs->count = 0; + + if ( (r = pthread_mutex_destroy(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } +} + + +/** + * Inserts a connection at the end of connections list. + */ +void +connections_append(struct connection *c) +{ + struct connections *cs = &c->server->connections; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p", c); +#endif + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry); + cs->count += 1; + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Removes a connection from its struct connections list. + */ +void +connections_remove(struct connection *c) +{ + struct connections *cs = &c->server->connections; + int r; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p", c); +#endif + + if ( !c || !cs) { + NOTIFY_DEBUG("c:%p connections:%p", c, cs); + return; + } + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + TAILQ_REMOVE(&cs->head, c, tailq_entry); + cs->count -= 1; + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Allocates and returns a null-terminated array of pointers to all connection entities in + * connections list, minus the 'c_exclude' connection, if set. + * Used as a simple implementation of broadcasting a message. + * This increments the reference count of each connection added to the array + * so they need to individually be freed before freeing the array. + */ +struct connection ** +connections_all_as_array(struct connections *cs, struct connection *c_exclude) +{ + struct connection **all; + struct connection *c; + size_t i = 0; + int r; + + if ( (r = pthread_mutex_lock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return NULL; + } + + all = calloc(cs->count + 1, sizeof *all); + if (!all) { + NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno)); + goto err_unlock; + } + + TAILQ_FOREACH(c, &cs->head, tailq_entry) { + if (c != c_exclude + && c->state < CONNECTION_STATE_WANT_CLOSE + && c->state > CONNECTION_STATE_INIT) { + connection_inc_ref(c); + all[i++] = c; + } + } + + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + while (i) { + i--; + connection_free(c); + } + free(all); + return NULL; + } + + return all; + +err_unlock: + if ( (r = pthread_mutex_unlock(&cs->mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + return NULL; +} + + +/** + * Wrap ev timeout event to re-resolve hostname. + */ +static void +connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg) +{ + struct connection *c = arg; + + NOTIFY_DEBUG("retrying hostname resolution"); + + c->dns_retry_ev = NULL; + c->dns_retries += 1; + connection_resolve_hostname(c); +} + +/** + * A callback used by connection_init to populate the reverse-lookup of a connection's + * client_address. +**/ +static void +connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx) +{ + struct connection *c = ctx; + char *old_client_address = c->client_address; + + if (result != DNS_ERR_NONE) { + NOTIFY_ERROR("Error resolving: %s", + evdns_err_to_string(result) ); + c->evdns_request = NULL; + + // TODO: configurable + // if (server_get_config_integer(c->server, "dns_retries_max")) { + // if (server_get_config_integer(c->server, "dns_retry_seconds")) { + + if (c->dns_retries < 4) { + struct timeval _seconds = {30, 0}; + c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c); + event_add(c->dns_retry_ev, &_seconds); + }; + return; + } + + switch (type) { + case DNS_PTR: + if (count < 1) { + return; + } + if (count > 1) { + NOTIFY_DEBUG("multiple records returned, using first"); + } + c->client_address = strdup(((char **)addresses)[0]); + NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c); + break; + + default: + NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type); + break; + } + c->evdns_request = NULL; + + if (old_client_address) { + free(old_client_address); + } +} + + +/** + * Perform reverse lookup of connection address. + */ +void +connection_resolve_hostname(struct connection *c) +{ + if (c->evdns_request) { + NOTIFY_DEBUG("resolution already in progress"); + return; + } + + /* does libevent's evdns have a getnameinfo style call? */ + + switch (c->sa->sa_family) { + case AF_INET: + c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c); + break; + + case AF_INET6: + c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c); + break; + + default: + NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family); + } + if (!c->evdns_request) { + NOTIFY_DEBUG("could not submit PTR lookup request"); + } +} + + +/** + * Populates a connection with initial information and sets reference count. + */ +int +connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags) +{ + char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; + int r; + uuid_rc_t rc; + + if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r)); + goto err; + } + + if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) { + NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r)); + goto err_rc_mutex_destroy; + } + + rc = uuid_create(&c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc)); + goto err_commands_mutex_destroy; + } + + rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc)); + goto err_uuid_destroy; + } + + c->state = CONNECTION_STATE_INIT; + c->bev = bev; + c->flags = flags; + c->server = s; + + /* render an IP from a sockaddr */ + r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); + if (r) { + NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r)); + strncpy(hbuf, "(unknown)", sizeof hbuf); + strncpy(sbuf, "(?)", sizeof sbuf); + } + c->client_address = strdup(hbuf); + if (!c->client_address) { + NOTIFY_ERROR("%s:%s", "strdup", strerror(errno)); + goto err_uuid_destroy; + } + + /* Past this point, errors are non-fatal. */ + c->reference_count = 1; + + /* Now try to resolve a name from client ip, in background. */ + c->sa = sock; + c->sa_len = socklen; + c->dns_retries = 0; + c->dns_retry_ev = NULL; + connection_resolve_hostname(c); + + /* FIXME: temporary name of connection for POC */ + static int current_connection_number; + r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++); + if ((size_t)r >= sizeof c->name) { + NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__); + } + +#if DARWIN + /* Darwin systems <10.12 lack clock_gettime (!) */ + if (gettimeofday(&c->connect_time, NULL)) { + NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); + } + NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec)); +#else + if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } + NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec)); +#endif /* DARWIN */ + + return 0; + +err_uuid_destroy: + rc = uuid_destroy(c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); + } +err_commands_mutex_destroy: + if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); + } +err_rc_mutex_destroy: + if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r)); + } +err: + return -1; +} + + +/** + * Process a connection's commands as long as there are commands to process. + * FIXME: track how much time has been spend consuming commands, re-queue + * processor job when over some threshold considering pending workqueue + */ +static void +connection_process_queue_(void *data, void *context, size_t id) +{ + struct connection *c = data; + struct server_worker_context *ctx = context; + struct command *command; + struct rusage ru_start, ru_end; + struct timespec ts_start, ts_current; + int r; + + if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) { + NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno)); + } + while (1) { + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + command = STAILQ_FIRST(&c->commands_head); + if (command) { + STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry); + } + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } + + if (command == NULL) { + break; + } + + if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) { + NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); + } + + command_parse(c, context, id, command); + + if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) { + NOTIFY_ERROR("%s:%s", "getrusage", strerror(r)); + } + + /* track how much time was spent procesing */ + connection_accounting_increment(c, &ru_start, &ru_end); + + command_free(command); + + if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } + + } + connection_free(c); + + // Done processing for a connection, force GC cycle to release additional c references. + lua_gc(ctx->L, LUA_GCCOLLECT); +} + + +/** + * Adds a command to a connection's list of things to process. + */ +int +connection_command_enqueue(struct connection *c, struct command *command) +{ + bool was_empty; + int r; + + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + command_free(command); + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return -1; + } + + was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false; + if (was_empty == true) { + STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry); + } else { + STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry); + } + +#if DARWIN + /* Darwin systems <10.12 lack clock_gettime (!) */ + if (gettimeofday(&c->last_received_time, NULL)) { + NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno)); + } +#else + if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) { + NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno)); + } +#endif /* DARWIN */ + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return -1; + } + + /* if there were already queued commands, a processor should already be scheduled */ + if (was_empty == true) { + connection_inc_ref(c); + if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) { + NOTIFY_ERROR("%s:%s", "workqueue_add", "failed"); + return -1; + } + } + + return 0; +} + + +/** + * Add the utime and stime values to a connection's totals. + */ +void +connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end) +{ + struct timeval utime_diff, stime_diff; + + timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime); + timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime); + + timeval_increment(&c->utime, &utime_diff); + timeval_increment(&c->stime, &stime_diff); +} + +/** + * Locks the output buffer, for sending multiple + * contiguous messages without possible interleaving. + */ +void +connection_lock_output(struct connection *c) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + + evbuffer_lock(output); +} + +/** + * Unlocks the output buffer. + */ +void +connection_unlock_output(struct connection *c) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + + evbuffer_unlock(output); +} + +/** + * Sends a text string to a connection. + */ +int +connection_printf(struct connection *c, const char *fmt, ...) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + va_list ap; + int r; + + va_start(ap, fmt); + r = evbuffer_add_vprintf(output, fmt, ap); + va_end(ap); + + return r; +} + + +/** + * Sends a text string to a connection. + */ +int +connection_vprintf(struct connection *c, const char *fmt, va_list ap) +{ + struct evbuffer *output = bufferevent_get_output(c->bev); + int r; + + r = evbuffer_add_vprintf(output, fmt, ap); + + return r; +} + + +/** + * Send one data buffer to multiple connections. + * Data buffer must be malloced, and will be freed when last connection + * has finished with it. + * Returns number of items sent, or negative on error. + */ +int +connection_multi_send(struct connection **clist, char *data, size_t len) +{ + struct connection **clist_iter; + struct rc_data_ *rc; + int r = 0; + + if (!clist) { + NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__); + return -1; + } + + /* Track our data so it can be freed once all connections are done with it. */ + rc = rc_data_new_(data, len); + if (rc == NULL) { + NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len); + return -1; + } + + clist_iter = clist; + while (*clist_iter) { + struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev); + + /* FIXME: if connection is still valid... */ + + /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */ + rc_data_ref_inc_(rc); + if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) { + NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed"); + rc_data_free_(rc); + } else { + r++; + } + clist_iter++; + } + + /* FIXME: clang also claims this is use-after-free, same as above. */ + rc_data_free_(rc); + + return r; +} + + +int +connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...) +{ + struct connection **clist; + char *message = NULL; + ssize_t message_len = 0; + va_list ap; + int r; + + va_start(ap, fmt); + message_len = vsnprintf(message, message_len, fmt, ap); + va_end(ap); + if (message_len < 0) { + NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); + return -1; + } + message_len += 1; + message = malloc(message_len); + if (!message) { + NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno)); + return -1; + } + va_start(ap, fmt); + message_len = vsnprintf(message, message_len, fmt, ap); + va_end(ap); + if (message_len < 0) { + NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len); + free(message); + return -1; + } + +#ifdef DEBUG_SILLY + NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message); +#endif + + clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL); + r = connection_multi_send(clist, message, message_len); + if (r < 0) { + NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed"); + } + for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) { + connection_free(*clist_iter); + } + free(clist); + + return r; +} + +/** + * + */ +void +connection_inc_ref(struct connection *c) +{ + int r; + + if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + c->reference_count += 1; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); +#endif + + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + return; + } +} + +/** + * Frees a connection (and all data owned by a connection) if there are no more references to it. + */ +void +connection_free(struct connection *c) +{ + struct evbuffer *tmp; + evutil_socket_t fd; + int r; + + if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + return; + } + + c->reference_count -= 1; + +#if DEBUG_SILLY + NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count); +#endif + + if (c->reference_count > 0) { + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + + return; + } + + NOTIFY_DEBUG("c:%p freeing", c); + + connections_remove(c); + + bufferevent_disable(c->bev, EV_READ|EV_WRITE); + tmp = bufferevent_get_output(c->bev); + evbuffer_drain(tmp, evbuffer_get_length(tmp)); + tmp = bufferevent_get_input(c->bev); + evbuffer_drain(tmp, evbuffer_get_length(tmp)); + + fd = bufferevent_getfd(c->bev); + if (fd != -1) { + EVUTIL_CLOSESOCKET(fd); + bufferevent_setfd(c->bev, -1); + } + + if (c->dns_retry_ev) { + if (event_del(c->dns_retry_ev)) { + NOTIFY_ERROR("%s:%s", "event_del", strerror(errno)); + } + c->dns_retry_ev = NULL; + } + + if (c->evdns_request) { + evdns_cancel_request(c->server->evdns_base, c->evdns_request); + c->evdns_request = NULL; + } + + if (c->client_address) { + free(c->client_address); + c->client_address = NULL; + } + + bufferevent_free(c->bev); + + if (c->uuid) { + uuid_rc_t rc; + + rc = uuid_destroy(c->uuid); + if (rc != UUID_RC_OK) { + NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc)); + } + c->uuid = NULL; + } + + /* empty out the command queue */ + if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r)); + } + + struct command *c1, *c2; + c1 = STAILQ_FIRST(&c->commands_head); + while (c1 != NULL) { + c2 = STAILQ_NEXT(c1, stailq_entry); + free(c1); + c1 = c2; + } + STAILQ_INIT(&c->commands_head); + + if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r)); + } + if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) { + NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r)); + } + + memset(c, 0, sizeof *c); + + free(c); +}