--- /dev/null
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdarg.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+#include <sys/resource.h> // needs _GNU_SOURCE for RUSAGE_THREAD
+
+#include <event2/bufferevent.h>
+#include <event2/buffer.h>
+#include <event2/event.h>
+#include <event2/util.h>
+
+#include <ossp/uuid.h>
+
+#include "common.h"
+#include "notify.h"
+#include "connections.h"
+#include "command.h"
+#include "server.h"
+
+/**
+ * A reference-counted data buffer.
+ * Used internally here for sending out one message to multiple connections, without
+ * creating a copy for each connection.
+ * Frees data pointer when the last reference is removed.
+ * A connection thread increases the reference count as it sends the message to each
+ * destination connection, while the libevent thread decrements as it completes each
+ * transmission.
+ */
+struct rc_data_ {
+ pthread_mutex_t mutex;
+ size_t reference_count;
+ size_t data_len;
+ char *data;
+};
+
+/**
+ * Allocates, initializes, and returns a new struct rc_data_ entity.
+ */
+static struct rc_data_ *
+rc_data_new_(char *data, size_t len)
+{
+ struct rc_data_ *rc;
+ int r;
+
+ if (!data) {
+ NOTIFY_DEBUG("data:%p len:%zu", data, len);
+ return NULL;
+ }
+
+ rc = malloc(sizeof *rc);
+ if (!rc) {
+ NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno));
+ return NULL;
+ }
+
+ if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ free(rc);
+ return NULL;
+ }
+
+ rc->reference_count = 1;
+ rc->data_len = len;
+ rc->data = data;
+
+ return rc;
+}
+
+/**
+ * Increments the reference count of a struct rc_data_ entity.
+ */
+static void
+rc_data_ref_inc_(struct rc_data_ *rc)
+{
+ int r;
+
+ if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ rc->reference_count += 1;
+
+ if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+}
+
+/**
+ * Decrements the reference count of a struct rc_data_ entity
+ * and frees everything if there are no more references.
+ */
+static void
+rc_data_free_(struct rc_data_ *rc)
+{
+ int r;
+
+ if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+ rc->reference_count -= 1;
+ if (rc->reference_count > 0) {
+ if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ return;
+ }
+
+ free(rc->data);
+ rc->data = NULL;
+ if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ if ( (r = pthread_mutex_destroy(&rc->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+
+ memset(rc, 0, sizeof *rc);
+
+ free(rc);
+}
+
+/**
+ * Wrapper for rc_data_free_() of the proper callback type to be used by
+ * evbuffer_add_reference() in connection_multi_send().
+ */
+static void
+rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg)
+{
+ struct rc_data_ *r = arg;
+
+ rc_data_free_(r);
+}
+
+/**
+ * Initializes a struct connections entity.
+ */
+int
+connections_init(struct connections *cs)
+{
+ int r;
+
+ cs->count = 0;
+ TAILQ_INIT(&cs->head);
+
+ if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * Cleanup a struct connections.
+ */
+void
+connections_fini(struct connections *cs)
+{
+ struct connection *c1, *c2;
+ int r;
+
+#if DEBUG_SILLY
+ NOTIFY_DEBUG("cs:%p", cs);
+#endif
+
+ /* free any connection entities */
+ c1 = TAILQ_FIRST(&cs->head);
+ while (c1 != NULL) {
+ c2 = TAILQ_NEXT(c1, tailq_entry);
+ connection_free(c1);
+ c1 = c2;
+ }
+ TAILQ_INIT(&cs->head);
+ cs->count = 0;
+
+ if ( (r = pthread_mutex_destroy(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+}
+
+
+/**
+ * Inserts a connection at the end of connections list.
+ */
+void
+connections_append(struct connection *c)
+{
+ struct connections *cs = &c->server->connections;
+ int r;
+
+#if DEBUG_SILLY
+ NOTIFY_DEBUG("c:%p", c);
+#endif
+
+ if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry);
+ cs->count += 1;
+
+ if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ return;
+ }
+}
+
+/**
+ * Removes a connection from its struct connections list.
+ */
+void
+connections_remove(struct connection *c)
+{
+ struct connections *cs = &c->server->connections;
+ int r;
+
+#if DEBUG_SILLY
+ NOTIFY_DEBUG("c:%p", c);
+#endif
+
+ if ( !c || !cs) {
+ NOTIFY_DEBUG("c:%p connections:%p", c, cs);
+ return;
+ }
+
+ if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ TAILQ_REMOVE(&cs->head, c, tailq_entry);
+ cs->count -= 1;
+
+ if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ return;
+ }
+}
+
+/**
+ * Allocates and returns a null-terminated array of pointers to all connection entities in
+ * connections list, minus the 'c_exclude' connection, if set.
+ * Used as a simple implementation of broadcasting a message.
+ * This increments the reference count of each connection added to the array
+ * so they need to individually be freed before freeing the array.
+ */
+struct connection **
+connections_all_as_array(struct connections *cs, struct connection *c_exclude)
+{
+ struct connection **all;
+ struct connection *c;
+ size_t i = 0;
+ int r;
+
+ if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return NULL;
+ }
+
+ all = calloc(cs->count + 1, sizeof *all);
+ if (!all) {
+ NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno));
+ goto err_unlock;
+ }
+
+ TAILQ_FOREACH(c, &cs->head, tailq_entry) {
+ if (c != c_exclude
+ && c->state < CONNECTION_STATE_WANT_CLOSE
+ && c->state > CONNECTION_STATE_INIT) {
+ connection_inc_ref(c);
+ all[i++] = c;
+ }
+ }
+
+ if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ while (i) {
+ i--;
+ connection_free(c);
+ }
+ free(all);
+ return NULL;
+ }
+
+ return all;
+
+err_unlock:
+ if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ return NULL;
+}
+
+
+/**
+ * Wrap ev timeout event to re-resolve hostname.
+ */
+static void
+connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg)
+{
+ struct connection *c = arg;
+
+ NOTIFY_DEBUG("retrying hostname resolution");
+
+ c->dns_retry_ev = NULL;
+ c->dns_retries += 1;
+ connection_resolve_hostname(c);
+}
+
+/**
+ * A callback used by connection_init to populate the reverse-lookup of a connection's
+ * client_address.
+**/
+static void
+connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx)
+{
+ struct connection *c = ctx;
+ char *old_client_address = c->client_address;
+
+ if (result != DNS_ERR_NONE) {
+ NOTIFY_ERROR("Error resolving: %s",
+ evdns_err_to_string(result) );
+ c->evdns_request = NULL;
+
+ // TODO: configurable
+ // if (server_get_config_integer(c->server, "dns_retries_max")) {
+ // if (server_get_config_integer(c->server, "dns_retry_seconds")) {
+
+ if (c->dns_retries < 4) {
+ struct timeval _seconds = {30, 0};
+ c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c);
+ event_add(c->dns_retry_ev, &_seconds);
+ };
+ return;
+ }
+
+ switch (type) {
+ case DNS_PTR:
+ if (count < 1) {
+ return;
+ }
+ if (count > 1) {
+ NOTIFY_DEBUG("multiple records returned, using first");
+ }
+ c->client_address = strdup(((char **)addresses)[0]);
+ NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c);
+ break;
+
+ default:
+ NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type);
+ break;
+ }
+ c->evdns_request = NULL;
+
+ if (old_client_address) {
+ free(old_client_address);
+ }
+}
+
+
+/**
+ * Perform reverse lookup of connection address.
+ */
+void
+connection_resolve_hostname(struct connection *c)
+{
+ if (c->evdns_request) {
+ NOTIFY_DEBUG("resolution already in progress");
+ return;
+ }
+
+ /* does libevent's evdns have a getnameinfo style call? */
+
+ switch (c->sa->sa_family) {
+ case AF_INET:
+ c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c);
+ break;
+
+ case AF_INET6:
+ c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c);
+ break;
+
+ default:
+ NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family);
+ }
+ if (!c->evdns_request) {
+ NOTIFY_DEBUG("could not submit PTR lookup request");
+ }
+}
+
+
+/**
+ * Populates a connection with initial information and sets reference count.
+ */
+int
+connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags)
+{
+ char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
+ int r;
+ uuid_rc_t rc;
+
+ if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
+ goto err;
+ }
+
+ if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) {
+ NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r));
+ goto err_rc_mutex_destroy;
+ }
+
+ rc = uuid_create(&c->uuid);
+ if (rc != UUID_RC_OK) {
+ NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc));
+ goto err_commands_mutex_destroy;
+ }
+
+ rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC);
+ if (rc != UUID_RC_OK) {
+ NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc));
+ goto err_uuid_destroy;
+ }
+
+ c->state = CONNECTION_STATE_INIT;
+ c->bev = bev;
+ c->flags = flags;
+ c->server = s;
+
+ /* render an IP from a sockaddr */
+ r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV);
+ if (r) {
+ NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r));
+ strncpy(hbuf, "(unknown)", sizeof hbuf);
+ strncpy(sbuf, "(?)", sizeof sbuf);
+ }
+ c->client_address = strdup(hbuf);
+ if (!c->client_address) {
+ NOTIFY_ERROR("%s:%s", "strdup", strerror(errno));
+ goto err_uuid_destroy;
+ }
+
+ /* Past this point, errors are non-fatal. */
+ c->reference_count = 1;
+
+ /* Now try to resolve a name from client ip, in background. */
+ c->sa = sock;
+ c->sa_len = socklen;
+ c->dns_retries = 0;
+ c->dns_retry_ev = NULL;
+ connection_resolve_hostname(c);
+
+ /* FIXME: temporary name of connection for POC */
+ static int current_connection_number;
+ r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++);
+ if ((size_t)r >= sizeof c->name) {
+ NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__);
+ }
+
+#if DARWIN
+ /* Darwin systems <10.12 lack clock_gettime (!) */
+ if (gettimeofday(&c->connect_time, NULL)) {
+ NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
+ }
+ NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec));
+#else
+ if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) {
+ NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+ }
+ NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec));
+#endif /* DARWIN */
+
+ return 0;
+
+err_uuid_destroy:
+ rc = uuid_destroy(c->uuid);
+ if (rc != UUID_RC_OK) {
+ NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
+ }
+err_commands_mutex_destroy:
+ if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
+ }
+err_rc_mutex_destroy:
+ if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
+ }
+err:
+ return -1;
+}
+
+
+/**
+ * Process a connection's commands as long as there are commands to process.
+ * FIXME: track how much time has been spend consuming commands, re-queue
+ * processor job when over some threshold considering pending workqueue
+ */
+static void
+connection_process_queue_(void *data, void *context, size_t id)
+{
+ struct connection *c = data;
+ struct server_worker_context *ctx = context;
+ struct command *command;
+ struct rusage ru_start, ru_end;
+ struct timespec ts_start, ts_current;
+ int r;
+
+ if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) {
+ NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno));
+ }
+ while (1) {
+ if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ command = STAILQ_FIRST(&c->commands_head);
+ if (command) {
+ STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry);
+ }
+
+ if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ return;
+ }
+
+ if (command == NULL) {
+ break;
+ }
+
+ if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) {
+ NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
+ }
+
+ command_parse(c, context, id, command);
+
+ if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) {
+ NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
+ }
+
+ /* track how much time was spent procesing */
+ connection_accounting_increment(c, &ru_start, &ru_end);
+
+ command_free(command);
+
+ if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) {
+ NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+ }
+
+ }
+ connection_free(c);
+
+ // Done processing for a connection, force GC cycle to release additional c references.
+ lua_gc(ctx->L, LUA_GCCOLLECT);
+}
+
+
+/**
+ * Adds a command to a connection's list of things to process.
+ */
+int
+connection_command_enqueue(struct connection *c, struct command *command)
+{
+ bool was_empty;
+ int r;
+
+ if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+ command_free(command);
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return -1;
+ }
+
+ was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false;
+ if (was_empty == true) {
+ STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry);
+ } else {
+ STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry);
+ }
+
+#if DARWIN
+ /* Darwin systems <10.12 lack clock_gettime (!) */
+ if (gettimeofday(&c->last_received_time, NULL)) {
+ NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
+ }
+#else
+ if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) {
+ NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
+ }
+#endif /* DARWIN */
+
+ if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ return -1;
+ }
+
+ /* if there were already queued commands, a processor should already be scheduled */
+ if (was_empty == true) {
+ connection_inc_ref(c);
+ if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) {
+ NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+/**
+ * Add the utime and stime values to a connection's totals.
+ */
+void
+connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end)
+{
+ struct timeval utime_diff, stime_diff;
+
+ timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime);
+ timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime);
+
+ timeval_increment(&c->utime, &utime_diff);
+ timeval_increment(&c->stime, &stime_diff);
+}
+
+/**
+ * Locks the output buffer, for sending multiple
+ * contiguous messages without possible interleaving.
+ */
+void
+connection_lock_output(struct connection *c)
+{
+ struct evbuffer *output = bufferevent_get_output(c->bev);
+
+ evbuffer_lock(output);
+}
+
+/**
+ * Unlocks the output buffer.
+ */
+void
+connection_unlock_output(struct connection *c)
+{
+ struct evbuffer *output = bufferevent_get_output(c->bev);
+
+ evbuffer_unlock(output);
+}
+
+/**
+ * Sends a text string to a connection.
+ */
+int
+connection_printf(struct connection *c, const char *fmt, ...)
+{
+ struct evbuffer *output = bufferevent_get_output(c->bev);
+ va_list ap;
+ int r;
+
+ va_start(ap, fmt);
+ r = evbuffer_add_vprintf(output, fmt, ap);
+ va_end(ap);
+
+ return r;
+}
+
+
+/**
+ * Sends a text string to a connection.
+ */
+int
+connection_vprintf(struct connection *c, const char *fmt, va_list ap)
+{
+ struct evbuffer *output = bufferevent_get_output(c->bev);
+ int r;
+
+ r = evbuffer_add_vprintf(output, fmt, ap);
+
+ return r;
+}
+
+
+/**
+ * Send one data buffer to multiple connections.
+ * Data buffer must be malloced, and will be freed when last connection
+ * has finished with it.
+ * Returns number of items sent, or negative on error.
+ */
+int
+connection_multi_send(struct connection **clist, char *data, size_t len)
+{
+ struct connection **clist_iter;
+ struct rc_data_ *rc;
+ int r = 0;
+
+ if (!clist) {
+ NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__);
+ return -1;
+ }
+
+ /* Track our data so it can be freed once all connections are done with it. */
+ rc = rc_data_new_(data, len);
+ if (rc == NULL) {
+ NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len);
+ return -1;
+ }
+
+ clist_iter = clist;
+ while (*clist_iter) {
+ struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev);
+
+ /* FIXME: if connection is still valid... */
+
+ /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */
+ rc_data_ref_inc_(rc);
+ if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) {
+ NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed");
+ rc_data_free_(rc);
+ } else {
+ r++;
+ }
+ clist_iter++;
+ }
+
+ /* FIXME: clang also claims this is use-after-free, same as above. */
+ rc_data_free_(rc);
+
+ return r;
+}
+
+
+int
+connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...)
+{
+ struct connection **clist;
+ char *message = NULL;
+ ssize_t message_len = 0;
+ va_list ap;
+ int r;
+
+ va_start(ap, fmt);
+ message_len = vsnprintf(message, message_len, fmt, ap);
+ va_end(ap);
+ if (message_len < 0) {
+ NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
+ return -1;
+ }
+ message_len += 1;
+ message = malloc(message_len);
+ if (!message) {
+ NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno));
+ return -1;
+ }
+ va_start(ap, fmt);
+ message_len = vsnprintf(message, message_len, fmt, ap);
+ va_end(ap);
+ if (message_len < 0) {
+ NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
+ free(message);
+ return -1;
+ }
+
+#ifdef DEBUG_SILLY
+ NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message);
+#endif
+
+ clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL);
+ r = connection_multi_send(clist, message, message_len);
+ if (r < 0) {
+ NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed");
+ }
+ for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) {
+ connection_free(*clist_iter);
+ }
+ free(clist);
+
+ return r;
+}
+
+/**
+ *
+ */
+void
+connection_inc_ref(struct connection *c)
+{
+ int r;
+
+ if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ c->reference_count += 1;
+
+#if DEBUG_SILLY
+ NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
+#endif
+
+ if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ return;
+ }
+}
+
+/**
+ * Frees a connection (and all data owned by a connection) if there are no more references to it.
+ */
+void
+connection_free(struct connection *c)
+{
+ struct evbuffer *tmp;
+ evutil_socket_t fd;
+ int r;
+
+ if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ return;
+ }
+
+ c->reference_count -= 1;
+
+#if DEBUG_SILLY
+ NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
+#endif
+
+ if (c->reference_count > 0) {
+ if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+
+ return;
+ }
+
+ NOTIFY_DEBUG("c:%p freeing", c);
+
+ connections_remove(c);
+
+ bufferevent_disable(c->bev, EV_READ|EV_WRITE);
+ tmp = bufferevent_get_output(c->bev);
+ evbuffer_drain(tmp, evbuffer_get_length(tmp));
+ tmp = bufferevent_get_input(c->bev);
+ evbuffer_drain(tmp, evbuffer_get_length(tmp));
+
+ fd = bufferevent_getfd(c->bev);
+ if (fd != -1) {
+ EVUTIL_CLOSESOCKET(fd);
+ bufferevent_setfd(c->bev, -1);
+ }
+
+ if (c->dns_retry_ev) {
+ if (event_del(c->dns_retry_ev)) {
+ NOTIFY_ERROR("%s:%s", "event_del", strerror(errno));
+ }
+ c->dns_retry_ev = NULL;
+ }
+
+ if (c->evdns_request) {
+ evdns_cancel_request(c->server->evdns_base, c->evdns_request);
+ c->evdns_request = NULL;
+ }
+
+ if (c->client_address) {
+ free(c->client_address);
+ c->client_address = NULL;
+ }
+
+ bufferevent_free(c->bev);
+
+ if (c->uuid) {
+ uuid_rc_t rc;
+
+ rc = uuid_destroy(c->uuid);
+ if (rc != UUID_RC_OK) {
+ NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
+ }
+ c->uuid = NULL;
+ }
+
+ /* empty out the command queue */
+ if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
+ }
+
+ struct command *c1, *c2;
+ c1 = STAILQ_FIRST(&c->commands_head);
+ while (c1 != NULL) {
+ c2 = STAILQ_NEXT(c1, stailq_entry);
+ free(c1);
+ c1 = c2;
+ }
+ STAILQ_INIT(&c->commands_head);
+
+ if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+
+ if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
+ }
+ if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
+ NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
+ }
+
+ memset(c, 0, sizeof *c);
+
+ free(c);
+}