rough framework
[lemu] / connections.c
diff --git a/connections.c b/connections.c
new file mode 100644 (file)
index 0000000..fa35f79
--- /dev/null
@@ -0,0 +1,927 @@
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#ifndef NI_MAXHOST
+#      define NI_MAXHOST 1025
+#endif
+#ifndef NI_MAXSERV
+#      define NI_MAXSERV 32
+#endif
+
+#include <sys/resource.h> // needs _GNU_SOURCE for RUSAGE_THREAD
+
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+#include <event2/event.h>
+#include <event2/util.h>
+
+#include <ossp/uuid.h>
+
+#include "common.h"
+#include "notify.h"
+#include "connections.h"
+#include "command.h"
+#include "server.h"
+
+/**
+ *  A reference-counted data buffer.
+ *  Used internally here for sending out one message to multiple connections, without
+ *  creating a copy for each connection.
+ *  Frees data pointer when the last reference is removed.
+ *  A connection thread increases the reference count as it sends the message to each
+ *  destination connection, while the libevent thread decrements as it completes each
+ *  transmission.
+ */
+struct rc_data_ {
+       pthread_mutex_t mutex;
+       size_t reference_count;
+       size_t data_len;
+       char *data;
+};
+
+/**
+ *  Allocates, initializes, and returns a new struct rc_data_ entity.
+ */
+static struct rc_data_ *
+rc_data_new_(char *data, size_t len)
+{
+       struct rc_data_ *rc;
+       int r;
+
+       if (!data) {
+               NOTIFY_DEBUG("data:%p len:%zu", data, len);
+               return NULL;
+       }
+
+       rc = malloc(sizeof *rc);
+       if (!rc) {
+               NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno));
+               return NULL;
+       }
+
+       if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               free(rc);
+               return NULL;
+       }
+
+       rc->reference_count = 1;
+       rc->data_len = len;
+       rc->data = data;
+
+       return rc;
+}
+
+/**
+ *  Increments the reference count of a struct rc_data_ entity.
+ */
+static void
+rc_data_ref_inc_(struct rc_data_ *rc)
+{
+       int r;
+
+       if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+
+       rc->reference_count += 1;
+
+       if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+}
+
+/**
+ *  Decrements the reference count of a struct rc_data_ entity
+ *  and frees everything if there are no more references.
+ */
+static void
+rc_data_free_(struct rc_data_ *rc)
+{
+       int r;
+
+       if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+       rc->reference_count -= 1;
+       if (rc->reference_count > 0) {
+               if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+                       NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               }
+               return;
+       }
+
+       free(rc->data);
+       rc->data = NULL;
+       if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+       if ( (r = pthread_mutex_destroy(&rc->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       memset(rc, 0, sizeof *rc);
+
+       free(rc);
+}
+
+/**
+ *  Wrapper for rc_data_free_() of the proper callback type to be used by
+ *  evbuffer_add_reference() in connection_multi_send().
+ */
+static void
+rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg)
+{
+       struct rc_data_ *r = arg;
+
+       rc_data_free_(r);
+}
+
+/**
+ *  Initializes a struct connections entity.
+ */
+int
+connections_init(struct connections *cs)
+{
+       int r;
+
+       cs->count = 0;
+       TAILQ_INIT(&cs->head);
+
+       if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               return -1;
+       }
+
+       return 0;
+}
+
+/**
+ * Cleanup a struct connections.
+ */
+void
+connections_fini(struct connections *cs)
+{
+       struct connection *c1, *c2;
+       int r;
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("cs:%p", cs);
+#endif
+
+       /* free any connection entities */
+       c1 = TAILQ_FIRST(&cs->head);
+       while (c1 != NULL) {
+               c2 = TAILQ_NEXT(c1, tailq_entry);
+               connection_free(c1);
+               c1 = c2;
+       }
+       TAILQ_INIT(&cs->head);
+       cs->count = 0;
+
+       if ( (r = pthread_mutex_destroy(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+}
+
+
+/**
+ *  Inserts a connection at the end of connections list.
+ */
+void
+connections_append(struct connection *c)
+{
+       struct connections *cs = &c->server->connections;
+       int r;
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("c:%p", c);
+#endif
+
+       if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+
+       TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry);
+       cs->count += 1;
+
+       if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               return;
+       }
+}
+
+/**
+ *  Removes a connection from its struct connections list.
+ */
+void
+connections_remove(struct connection *c)
+{
+       struct connections *cs = &c->server->connections;
+       int r;
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("c:%p", c);
+#endif
+
+       if ( !c || !cs) {
+               NOTIFY_DEBUG("c:%p connections:%p", c, cs);
+               return;
+       }
+
+       if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+
+       TAILQ_REMOVE(&cs->head, c, tailq_entry);
+       cs->count -= 1;
+
+       if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               return;
+       }
+}
+
+/**
+ *  Allocates and returns a null-terminated array of pointers to all connection entities in
+ *  connections list, minus the 'c_exclude' connection, if set.
+ *  Used as a simple implementation of broadcasting a message.
+ *  This increments the reference count of each connection added to the array
+ *  so they need to individually be freed before freeing the array.
+ */
+struct connection **
+connections_all_as_array(struct connections *cs, struct connection *c_exclude)
+{
+       struct connection **all;
+       struct connection *c;
+       size_t i = 0;
+       int r;
+
+       if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return NULL;
+       }
+
+       all = calloc(cs->count + 1, sizeof *all);
+       if (!all) {
+               NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno));
+               goto err_unlock;
+       }
+
+       TAILQ_FOREACH(c, &cs->head, tailq_entry) {
+               if (c != c_exclude
+               &&  c->state < CONNECTION_STATE_WANT_CLOSE
+               &&  c->state > CONNECTION_STATE_INIT) {
+                       connection_inc_ref(c);
+                       all[i++] = c;
+               }
+       }
+
+       if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               while (i) {
+                       i--;
+                       connection_free(c);
+               }
+               free(all);
+               return NULL;
+       }
+
+       return all;
+
+err_unlock:
+       if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+       return NULL;
+}
+
+
+/**
+ * Wrap ev timeout event to re-resolve hostname.
+ */
+static void
+connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg)
+{
+       struct connection *c = arg;
+
+       NOTIFY_DEBUG("retrying hostname resolution");
+
+       c->dns_retry_ev = NULL;
+       c->dns_retries += 1;
+       connection_resolve_hostname(c);
+}
+
+/**
+ *  A callback used by connection_init to populate the reverse-lookup of a connection's
+ *  client_address.
+**/
+static void
+connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx)
+{
+       struct connection *c = ctx;
+       char *old_client_address = c->client_address;
+
+       if (result != DNS_ERR_NONE) {
+               NOTIFY_ERROR("Error resolving: %s",
+                            evdns_err_to_string(result) );
+               c->evdns_request = NULL;
+
+               // TODO: configurable
+               // if (server_get_config_integer(c->server, "dns_retries_max")) {
+               // if (server_get_config_integer(c->server, "dns_retry_seconds")) {
+
+               if (c->dns_retries < 4) {
+                       struct timeval _seconds = {30, 0};
+                       c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c);
+                       event_add(c->dns_retry_ev, &_seconds);
+               };
+               return;
+       }
+
+       switch (type) {
+               case DNS_PTR:
+                       if (count < 1) {
+                               return;
+                       }
+                       if (count > 1) {
+                               NOTIFY_DEBUG("multiple records returned, using first");
+                       }
+                       c->client_address = strdup(((char **)addresses)[0]);
+                       NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c);
+                       break;
+
+               default:
+                       NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type);
+                       break;
+       }
+       c->evdns_request = NULL;
+
+       if (old_client_address) {
+               free(old_client_address);
+       }
+}
+
+
+/**
+ * Perform reverse lookup of connection address.
+ */
+void
+connection_resolve_hostname(struct connection *c)
+{
+       if (c->evdns_request) {
+               NOTIFY_DEBUG("resolution already in progress");
+               return;
+       }
+
+       /* does libevent's evdns have a getnameinfo style call? */
+
+       switch (c->sa->sa_family) {
+       case AF_INET:
+               c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c);
+               break;
+
+       case AF_INET6:
+               c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c);
+               break;
+
+       default:
+               NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family);
+       }
+       if (!c->evdns_request) {
+               NOTIFY_DEBUG("could not submit PTR lookup request");
+       }
+}
+
+
+/**
+ *  Populates a connection with initial information and sets reference count.
+ */
+int
+connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags)
+{
+       char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
+       int r;
+       uuid_rc_t rc;
+
+       if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+               goto err;
+       }
+
+       if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) {
+               NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r));
+               goto err_rc_mutex_destroy;
+       }
+
+       rc = uuid_create(&c->uuid);
+       if (rc != UUID_RC_OK) {
+               NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc));
+               goto err_commands_mutex_destroy;
+       }
+
+       rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC);
+       if (rc != UUID_RC_OK) {
+               NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc));
+               goto err_uuid_destroy;
+       }
+
+       c->state = CONNECTION_STATE_INIT;
+       c->bev = bev;
+       c->flags = flags;
+       c->server = s;
+
+       /* render an IP from a sockaddr */
+       r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV);
+       if (r) {
+               NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r));
+               strncpy(hbuf, "(unknown)", sizeof hbuf);
+               strncpy(sbuf, "(?)", sizeof sbuf);
+       }
+       c->client_address = strdup(hbuf);
+       if (!c->client_address) {
+               NOTIFY_ERROR("%s:%s", "strdup", strerror(errno));
+               goto err_uuid_destroy;
+       }
+
+       /* Past this point, errors are non-fatal. */
+       c->reference_count = 1;
+
+       /* Now try to resolve a name from client ip, in background. */
+       c->sa = sock;
+       c->sa_len = socklen;
+       c->dns_retries = 0;
+       c->dns_retry_ev = NULL;
+       connection_resolve_hostname(c);
+
+       /* FIXME: temporary name of connection for POC */
+       static int current_connection_number;
+       r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++);
+       if ((size_t)r >= sizeof c->name) {
+               NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__);
+       }
+
+#if DARWIN
+       /* Darwin systems <10.12 lack clock_gettime (!) */
+       if (gettimeofday(&c->connect_time, NULL)) {
+               NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
+       }
+       NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec));
+#else
+       if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) {
+               NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+       }
+       NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec));
+#endif /* DARWIN */
+
+       return 0;
+
+err_uuid_destroy:
+       rc = uuid_destroy(c->uuid);
+       if (rc != UUID_RC_OK) {
+               NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
+       }
+err_commands_mutex_destroy:
+       if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
+       }
+err_rc_mutex_destroy:
+       if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
+       }
+err:
+       return -1;
+}
+
+
+/**
+ * Process a connection's commands as long as there are commands to process.
+ * FIXME: track how much time has been spend consuming commands, re-queue
+ *        processor job when over some threshold considering pending workqueue
+ */
+static void
+connection_process_queue_(void *data, void *context, size_t id)
+{
+       struct connection *c = data;
+       struct server_worker_context *ctx = context;
+       struct command *command;
+       struct rusage ru_start, ru_end;
+       struct timespec ts_start, ts_current;
+       int r;
+
+       if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) {
+               NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno));
+       }
+       while (1) {
+               if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+                       NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+                       return;
+               }
+
+               command = STAILQ_FIRST(&c->commands_head);
+               if (command) {
+                       STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry);
+               }
+
+               if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+                       NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+                       return;
+               }
+
+               if (command == NULL) {
+                       break;
+               }
+
+               if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) {
+                       NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
+               }
+
+               command_parse(c, context, id, command);
+
+               if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) {
+                       NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
+               }
+
+               /* track how much time was spent procesing */
+               connection_accounting_increment(c, &ru_start, &ru_end);
+
+               command_free(command);
+
+               if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) {
+                       NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+               }
+
+       }
+       connection_free(c);
+
+       // Done processing for a connection, force GC cycle to release additional c references.
+       lua_gc(ctx->L, LUA_GCCOLLECT);
+}
+
+
+/**
+ *  Adds a command to a connection's list of things to process.
+ */
+int
+connection_command_enqueue(struct connection *c, struct command *command)
+{
+       bool was_empty;
+       int r;
+
+       if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+               command_free(command);
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return -1;
+       }
+
+       was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false;
+       if (was_empty == true) {
+               STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry);
+       } else {
+               STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry);
+       }
+
+#if DARWIN
+       /* Darwin systems <10.12 lack clock_gettime (!) */
+       if (gettimeofday(&c->last_received_time, NULL)) {
+               NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
+       }
+#else
+       if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) {
+               NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+       }
+#endif /* DARWIN */
+
+       if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               return -1;
+       }
+
+       /* if there were already queued commands, a processor should already be scheduled */
+       if (was_empty == true) {
+               connection_inc_ref(c);
+               if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) {
+                       NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+
+/**
+ *  Add the utime and stime values to a connection's totals.
+ */
+void
+connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end)
+{
+       struct timeval utime_diff, stime_diff;
+
+       timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime);
+       timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime);
+
+       timeval_increment(&c->utime, &utime_diff);
+       timeval_increment(&c->stime, &stime_diff);
+}
+
+/**
+ *  Locks the output buffer, for sending multiple
+ *  contiguous messages without possible interleaving.
+ */
+void
+connection_lock_output(struct connection *c)
+{
+       struct evbuffer *output = bufferevent_get_output(c->bev);
+
+       evbuffer_lock(output);
+}
+
+/**
+ *  Unlocks the output buffer.
+ */
+void
+connection_unlock_output(struct connection *c)
+{
+       struct evbuffer *output = bufferevent_get_output(c->bev);
+
+       evbuffer_unlock(output);
+}
+
+/**
+ *  Sends a text string to a connection.
+ */
+int
+connection_printf(struct connection *c, const char *fmt, ...)
+{
+       struct evbuffer *output = bufferevent_get_output(c->bev);
+       va_list ap;
+       int r;
+
+       va_start(ap, fmt);
+       r = evbuffer_add_vprintf(output, fmt, ap);
+       va_end(ap);
+
+       return r;
+}
+
+
+/**
+ *  Sends a text string to a connection.
+ */
+int
+connection_vprintf(struct connection *c, const char *fmt, va_list ap)
+{
+       struct evbuffer *output = bufferevent_get_output(c->bev);
+       int r;
+
+       r = evbuffer_add_vprintf(output, fmt, ap);
+
+       return r;
+}
+
+
+/**
+ *  Send one data buffer to multiple connections.
+ *  Data buffer must be malloced, and will be freed when last connection
+ *  has finished with it.
+ *  Returns number of items sent, or negative on error.
+ */
+int
+connection_multi_send(struct connection **clist, char *data, size_t len)
+{
+       struct connection **clist_iter;
+       struct rc_data_ *rc;
+       int r = 0;
+
+       if (!clist) {
+               NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__);
+               return -1;
+       }
+
+       /* Track our data so it can be freed once all connections are done with it. */
+       rc = rc_data_new_(data, len);
+       if (rc == NULL) {
+               NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len);
+               return -1;
+       }
+
+       clist_iter = clist;
+       while (*clist_iter) {
+               struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev);
+
+               /* FIXME: if connection is still valid... */
+
+               /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */
+               rc_data_ref_inc_(rc);
+               if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) {
+                       NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed");
+                       rc_data_free_(rc);
+               } else {
+                       r++;
+               }
+               clist_iter++;
+       }
+
+       /* FIXME: clang also claims this is use-after-free, same as above. */
+       rc_data_free_(rc);
+
+       return r;
+}
+
+
+int
+connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...)
+{
+       struct connection **clist;
+       char *message = NULL;
+       ssize_t message_len = 0;
+       va_list ap;
+       int r;
+
+       va_start(ap, fmt);
+       message_len = vsnprintf(message, message_len, fmt, ap);
+       va_end(ap);
+       if (message_len < 0) {
+               NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
+               return -1;
+       }
+       message_len += 1;
+       message = malloc(message_len);
+       if (!message) {
+               NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno));
+               return -1;
+       }
+       va_start(ap, fmt);
+       message_len = vsnprintf(message, message_len, fmt, ap);
+       va_end(ap);
+       if (message_len < 0) {
+               NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
+               free(message);
+               return -1;
+       }
+
+#ifdef DEBUG_SILLY
+               NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message);
+#endif
+
+       clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL);
+       r = connection_multi_send(clist, message, message_len);
+       if (r < 0) {
+               NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed");
+       }
+       for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) {
+               connection_free(*clist_iter);
+       }
+       free(clist);
+
+       return r;
+}
+
+/**
+ *
+ */
+void
+connection_inc_ref(struct connection *c)
+{
+       int r;
+
+       if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+
+       c->reference_count += 1;
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
+#endif
+
+       if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               return;
+       }
+}
+
+/**
+ *  Frees a connection (and all data owned by a connection) if there are no more references to it.
+ */
+void
+connection_free(struct connection *c)
+{
+       struct evbuffer *tmp;
+       evutil_socket_t fd;
+       int r;
+
+       if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+               return;
+       }
+
+       c->reference_count -= 1;
+
+#if DEBUG_SILLY
+       NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
+#endif
+
+       if (c->reference_count > 0) {
+               if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+                       NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+               }
+
+               return;
+       }
+
+       NOTIFY_DEBUG("c:%p freeing", c);
+
+       connections_remove(c);
+
+       bufferevent_disable(c->bev, EV_READ|EV_WRITE);
+       tmp = bufferevent_get_output(c->bev);
+       evbuffer_drain(tmp, evbuffer_get_length(tmp));
+       tmp = bufferevent_get_input(c->bev);
+       evbuffer_drain(tmp, evbuffer_get_length(tmp));
+
+       fd = bufferevent_getfd(c->bev);
+       if (fd != -1) {
+               EVUTIL_CLOSESOCKET(fd);
+               bufferevent_setfd(c->bev, -1);
+       }
+
+       if (c->dns_retry_ev) {
+               if (event_del(c->dns_retry_ev)) {
+                       NOTIFY_ERROR("%s:%s", "event_del", strerror(errno));
+               }
+               c->dns_retry_ev = NULL;
+       }
+
+       if (c->evdns_request) {
+               evdns_cancel_request(c->server->evdns_base, c->evdns_request);
+               c->evdns_request = NULL;
+       }
+
+       if (c->client_address) {
+               free(c->client_address);
+               c->client_address = NULL;
+       }
+
+       bufferevent_free(c->bev);
+
+       if (c->uuid) {
+               uuid_rc_t rc;
+
+               rc = uuid_destroy(c->uuid);
+               if (rc != UUID_RC_OK) {
+                       NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
+               }
+               c->uuid = NULL;
+       }
+
+       /* empty out the command queue */
+       if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+       }
+
+       struct command *c1, *c2;
+       c1 = STAILQ_FIRST(&c->commands_head);
+       while (c1 != NULL) {
+               c2 = STAILQ_NEXT(c1, stailq_entry);
+               free(c1);
+               c1 = c2;
+       }
+       STAILQ_INIT(&c->commands_head);
+
+       if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+       if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+       }
+       if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
+               NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+       }
+
+       memset(c, 0, sizeof *c);
+
+       free(c);
+}