rough framework
[lemu] / connections.c
1 #define _GNU_SOURCE
2
3 #include <stdio.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <stdarg.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <pthread.h>
10
11 #include <sys/time.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #ifndef NI_MAXHOST
17 # define NI_MAXHOST 1025
18 #endif
19 #ifndef NI_MAXSERV
20 # define NI_MAXSERV 32
21 #endif
22
23 #include <sys/resource.h> // needs _GNU_SOURCE for RUSAGE_THREAD
24
25 #include <event2/bufferevent.h>
26 #include <event2/buffer.h>
27 #include <event2/event.h>
28 #include <event2/util.h>
29
30 #include <ossp/uuid.h>
31
32 #include "common.h"
33 #include "notify.h"
34 #include "connections.h"
35 #include "command.h"
36 #include "server.h"
37
38 /**
39 * A reference-counted data buffer.
40 * Used internally here for sending out one message to multiple connections, without
41 * creating a copy for each connection.
42 * Frees data pointer when the last reference is removed.
43 * A connection thread increases the reference count as it sends the message to each
44 * destination connection, while the libevent thread decrements as it completes each
45 * transmission.
46 */
47 struct rc_data_ {
48 pthread_mutex_t mutex;
49 size_t reference_count;
50 size_t data_len;
51 char *data;
52 };
53
54 /**
55 * Allocates, initializes, and returns a new struct rc_data_ entity.
56 */
57 static struct rc_data_ *
58 rc_data_new_(char *data, size_t len)
59 {
60 struct rc_data_ *rc;
61 int r;
62
63 if (!data) {
64 NOTIFY_DEBUG("data:%p len:%zu", data, len);
65 return NULL;
66 }
67
68 rc = malloc(sizeof *rc);
69 if (!rc) {
70 NOTIFY_ERROR("malloc(%zu): %s", sizeof *rc, strerror(errno));
71 return NULL;
72 }
73
74 if ( (r = pthread_mutex_init(&rc->mutex, NULL)) ) {
75 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
76 free(rc);
77 return NULL;
78 }
79
80 rc->reference_count = 1;
81 rc->data_len = len;
82 rc->data = data;
83
84 return rc;
85 }
86
87 /**
88 * Increments the reference count of a struct rc_data_ entity.
89 */
90 static void
91 rc_data_ref_inc_(struct rc_data_ *rc)
92 {
93 int r;
94
95 if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
96 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
97 return;
98 }
99
100 rc->reference_count += 1;
101
102 if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
103 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
104 }
105 }
106
107 /**
108 * Decrements the reference count of a struct rc_data_ entity
109 * and frees everything if there are no more references.
110 */
111 static void
112 rc_data_free_(struct rc_data_ *rc)
113 {
114 int r;
115
116 if ( (r = pthread_mutex_lock(&rc->mutex)) ) {
117 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
118 return;
119 }
120 rc->reference_count -= 1;
121 if (rc->reference_count > 0) {
122 if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
123 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
124 }
125 return;
126 }
127
128 free(rc->data);
129 rc->data = NULL;
130 if ( (r = pthread_mutex_unlock(&rc->mutex)) ) {
131 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
132 }
133 if ( (r = pthread_mutex_destroy(&rc->mutex)) ) {
134 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
135 }
136
137 memset(rc, 0, sizeof *rc);
138
139 free(rc);
140 }
141
142 /**
143 * Wrapper for rc_data_free_() of the proper callback type to be used by
144 * evbuffer_add_reference() in connection_multi_send().
145 */
146 static void
147 rc_cleanup_cb_(const void *data UNUSED, size_t len UNUSED, void *arg)
148 {
149 struct rc_data_ *r = arg;
150
151 rc_data_free_(r);
152 }
153
154 /**
155 * Initializes a struct connections entity.
156 */
157 int
158 connections_init(struct connections *cs)
159 {
160 int r;
161
162 cs->count = 0;
163 TAILQ_INIT(&cs->head);
164
165 if ( (r = pthread_mutex_init(&cs->mutex, NULL)) ) {
166 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
167 return -1;
168 }
169
170 return 0;
171 }
172
173 /**
174 * Cleanup a struct connections.
175 */
176 void
177 connections_fini(struct connections *cs)
178 {
179 struct connection *c1, *c2;
180 int r;
181
182 #if DEBUG_SILLY
183 NOTIFY_DEBUG("cs:%p", cs);
184 #endif
185
186 /* free any connection entities */
187 c1 = TAILQ_FIRST(&cs->head);
188 while (c1 != NULL) {
189 c2 = TAILQ_NEXT(c1, tailq_entry);
190 connection_free(c1);
191 c1 = c2;
192 }
193 TAILQ_INIT(&cs->head);
194 cs->count = 0;
195
196 if ( (r = pthread_mutex_destroy(&cs->mutex)) ) {
197 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
198 }
199 }
200
201
202 /**
203 * Inserts a connection at the end of connections list.
204 */
205 void
206 connections_append(struct connection *c)
207 {
208 struct connections *cs = &c->server->connections;
209 int r;
210
211 #if DEBUG_SILLY
212 NOTIFY_DEBUG("c:%p", c);
213 #endif
214
215 if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
216 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
217 return;
218 }
219
220 TAILQ_INSERT_TAIL(&cs->head, c, tailq_entry);
221 cs->count += 1;
222
223 if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
224 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
225 return;
226 }
227 }
228
229 /**
230 * Removes a connection from its struct connections list.
231 */
232 void
233 connections_remove(struct connection *c)
234 {
235 struct connections *cs = &c->server->connections;
236 int r;
237
238 #if DEBUG_SILLY
239 NOTIFY_DEBUG("c:%p", c);
240 #endif
241
242 if ( !c || !cs) {
243 NOTIFY_DEBUG("c:%p connections:%p", c, cs);
244 return;
245 }
246
247 if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
248 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
249 return;
250 }
251
252 TAILQ_REMOVE(&cs->head, c, tailq_entry);
253 cs->count -= 1;
254
255 if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
256 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
257 return;
258 }
259 }
260
261 /**
262 * Allocates and returns a null-terminated array of pointers to all connection entities in
263 * connections list, minus the 'c_exclude' connection, if set.
264 * Used as a simple implementation of broadcasting a message.
265 * This increments the reference count of each connection added to the array
266 * so they need to individually be freed before freeing the array.
267 */
268 struct connection **
269 connections_all_as_array(struct connections *cs, struct connection *c_exclude)
270 {
271 struct connection **all;
272 struct connection *c;
273 size_t i = 0;
274 int r;
275
276 if ( (r = pthread_mutex_lock(&cs->mutex)) ) {
277 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
278 return NULL;
279 }
280
281 all = calloc(cs->count + 1, sizeof *all);
282 if (!all) {
283 NOTIFY_ERROR("calloc(%zu, %zu): %s", cs->count + 1, sizeof *all, strerror(errno));
284 goto err_unlock;
285 }
286
287 TAILQ_FOREACH(c, &cs->head, tailq_entry) {
288 if (c != c_exclude
289 && c->state < CONNECTION_STATE_WANT_CLOSE
290 && c->state > CONNECTION_STATE_INIT) {
291 connection_inc_ref(c);
292 all[i++] = c;
293 }
294 }
295
296 if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
297 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
298 while (i) {
299 i--;
300 connection_free(c);
301 }
302 free(all);
303 return NULL;
304 }
305
306 return all;
307
308 err_unlock:
309 if ( (r = pthread_mutex_unlock(&cs->mutex)) ) {
310 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
311 }
312 return NULL;
313 }
314
315
316 /**
317 * Wrap ev timeout event to re-resolve hostname.
318 */
319 static void
320 connection_resolve_retry_event_(evutil_socket_t fd UNUSED, short what UNUSED, void *arg)
321 {
322 struct connection *c = arg;
323
324 NOTIFY_DEBUG("retrying hostname resolution");
325
326 c->dns_retry_ev = NULL;
327 c->dns_retries += 1;
328 connection_resolve_hostname(c);
329 }
330
331 /**
332 * A callback used by connection_init to populate the reverse-lookup of a connection's
333 * client_address.
334 **/
335 static void
336 connection_reverse_dns_cb_(int result, char type, int count, int ttl UNUSED, void *addresses, void *ctx)
337 {
338 struct connection *c = ctx;
339 char *old_client_address = c->client_address;
340
341 if (result != DNS_ERR_NONE) {
342 NOTIFY_ERROR("Error resolving: %s",
343 evdns_err_to_string(result) );
344 c->evdns_request = NULL;
345
346 // TODO: configurable
347 // if (server_get_config_integer(c->server, "dns_retries_max")) {
348 // if (server_get_config_integer(c->server, "dns_retry_seconds")) {
349
350 if (c->dns_retries < 4) {
351 struct timeval _seconds = {30, 0};
352 c->dns_retry_ev = event_new(c->server->base, -1, EV_TIMEOUT, connection_resolve_retry_event_, c);
353 event_add(c->dns_retry_ev, &_seconds);
354 };
355 return;
356 }
357
358 switch (type) {
359 case DNS_PTR:
360 if (count < 1) {
361 return;
362 }
363 if (count > 1) {
364 NOTIFY_DEBUG("multiple records returned, using first");
365 }
366 c->client_address = strdup(((char **)addresses)[0]);
367 NOTIFY_DEBUG("resolved [%s] as %s for '%s' (%p)", old_client_address, c->client_address, c->name, c);
368 break;
369
370 default:
371 NOTIFY_DEBUG("reverse lookup of [%s] returned dns type %d", c->client_address, type);
372 break;
373 }
374 c->evdns_request = NULL;
375
376 if (old_client_address) {
377 free(old_client_address);
378 }
379 }
380
381
382 /**
383 * Perform reverse lookup of connection address.
384 */
385 void
386 connection_resolve_hostname(struct connection *c)
387 {
388 if (c->evdns_request) {
389 NOTIFY_DEBUG("resolution already in progress");
390 return;
391 }
392
393 /* does libevent's evdns have a getnameinfo style call? */
394
395 switch (c->sa->sa_family) {
396 case AF_INET:
397 c->evdns_request = evdns_base_resolve_reverse(c->server->evdns_base, &((struct sockaddr_in *)c->sa)->sin_addr, 0, connection_reverse_dns_cb_, c);
398 break;
399
400 case AF_INET6:
401 c->evdns_request = evdns_base_resolve_reverse_ipv6(c->server->evdns_base, &((struct sockaddr_in6 *)c->sa)->sin6_addr, 0, connection_reverse_dns_cb_, c);
402 break;
403
404 default:
405 NOTIFY_DEBUG("unhandled address family %u", c->sa->sa_family);
406 }
407 if (!c->evdns_request) {
408 NOTIFY_DEBUG("could not submit PTR lookup request");
409 }
410 }
411
412
413 /**
414 * Populates a connection with initial information and sets reference count.
415 */
416 int
417 connection_init(struct server *s, struct connection *c, struct bufferevent *bev, struct sockaddr *sock, int socklen, connection_flags_t flags)
418 {
419 char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
420 int r;
421 uuid_rc_t rc;
422
423 if ( (r = pthread_mutex_init(&c->rc_mutex, NULL)) ) {
424 NOTIFY_ERROR("%s:%s", "pthread_mutex_init", strerror(r));
425 goto err;
426 }
427
428 if ( (r = pthread_mutex_init(&c->commands_mutex, NULL)) ) {
429 NOTIFY_ERROR("%s:%s", "pthead_mutex_init", strerror(r));
430 goto err_rc_mutex_destroy;
431 }
432
433 rc = uuid_create(&c->uuid);
434 if (rc != UUID_RC_OK) {
435 NOTIFY_ERROR("%s:%s", "uuid_create", uuid_error(rc));
436 goto err_commands_mutex_destroy;
437 }
438
439 rc = uuid_make(c->uuid, UUID_MAKE_V1|UUID_MAKE_MC);
440 if (rc != UUID_RC_OK) {
441 NOTIFY_ERROR("%s:%s", "uuid_make", uuid_error(rc));
442 goto err_uuid_destroy;
443 }
444
445 c->state = CONNECTION_STATE_INIT;
446 c->bev = bev;
447 c->flags = flags;
448 c->server = s;
449
450 /* render an IP from a sockaddr */
451 r = getnameinfo(sock, socklen, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV);
452 if (r) {
453 NOTIFY_ERROR("%s:%s", "getnameinfo", gai_strerror(r));
454 strncpy(hbuf, "(unknown)", sizeof hbuf);
455 strncpy(sbuf, "(?)", sizeof sbuf);
456 }
457 c->client_address = strdup(hbuf);
458 if (!c->client_address) {
459 NOTIFY_ERROR("%s:%s", "strdup", strerror(errno));
460 goto err_uuid_destroy;
461 }
462
463 /* Past this point, errors are non-fatal. */
464 c->reference_count = 1;
465
466 /* Now try to resolve a name from client ip, in background. */
467 c->sa = sock;
468 c->sa_len = socklen;
469 c->dns_retries = 0;
470 c->dns_retry_ev = NULL;
471 connection_resolve_hostname(c);
472
473 /* FIXME: temporary name of connection for POC */
474 static int current_connection_number;
475 r = snprintf((char *)c->name, sizeof(c->name), "conn %d", current_connection_number++);
476 if ((size_t)r >= sizeof c->name) {
477 NOTIFY_ERROR("buffer truncated [%s:%d]", __FILE__, __LINE__);
478 }
479
480 #if DARWIN
481 /* Darwin systems <10.12 lack clock_gettime (!) */
482 if (gettimeofday(&c->connect_time, NULL)) {
483 NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
484 }
485 NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_time.tv_sec));
486 #else
487 if (clock_gettime(CLOCK_REALTIME, &c->connect_timespec)) {
488 NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
489 }
490 NOTIFY_DEBUG("c:%p [%s] from %s at %.24s", c, c->name, c->client_address, ctime(&c->connect_timespec.tv_sec));
491 #endif /* DARWIN */
492
493 return 0;
494
495 err_uuid_destroy:
496 rc = uuid_destroy(c->uuid);
497 if (rc != UUID_RC_OK) {
498 NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
499 }
500 err_commands_mutex_destroy:
501 if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
502 NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
503 }
504 err_rc_mutex_destroy:
505 if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
506 NOTIFY_ERROR("%s:%s", "pthread_destroy", strerror(r));
507 }
508 err:
509 return -1;
510 }
511
512
513 /**
514 * Process a connection's commands as long as there are commands to process.
515 * FIXME: track how much time has been spend consuming commands, re-queue
516 * processor job when over some threshold considering pending workqueue
517 */
518 static void
519 connection_process_queue_(void *data, void *context, size_t id)
520 {
521 struct connection *c = data;
522 struct server_worker_context *ctx = context;
523 struct command *command;
524 struct rusage ru_start, ru_end;
525 struct timespec ts_start, ts_current;
526 int r;
527
528 if (clock_gettime(CLOCK_MONOTONIC, &ts_start) < 0) {
529 NOTIFY_DEBUG("%s:%s", "clock_gettime", strerror(errno));
530 }
531 while (1) {
532 if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
533 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
534 return;
535 }
536
537 command = STAILQ_FIRST(&c->commands_head);
538 if (command) {
539 STAILQ_REMOVE_HEAD(&c->commands_head, stailq_entry);
540 }
541
542 if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
543 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
544 return;
545 }
546
547 if (command == NULL) {
548 break;
549 }
550
551 if ( (r = getrusage(RUSAGE_THREAD, &ru_start)) ) {
552 NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
553 }
554
555 command_parse(c, context, id, command);
556
557 if ( (r = getrusage(RUSAGE_THREAD, &ru_end)) ) {
558 NOTIFY_ERROR("%s:%s", "getrusage", strerror(r));
559 }
560
561 /* track how much time was spent procesing */
562 connection_accounting_increment(c, &ru_start, &ru_end);
563
564 command_free(command);
565
566 if (clock_gettime(CLOCK_MONOTONIC, &ts_current) < 0) {
567 NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
568 }
569
570 }
571 connection_free(c);
572
573 // Done processing for a connection, force GC cycle to release additional c references.
574 lua_gc(ctx->L, LUA_GCCOLLECT);
575 }
576
577
578 /**
579 * Adds a command to a connection's list of things to process.
580 */
581 int
582 connection_command_enqueue(struct connection *c, struct command *command)
583 {
584 bool was_empty;
585 int r;
586
587 if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
588 command_free(command);
589 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
590 return -1;
591 }
592
593 was_empty = STAILQ_EMPTY(&c->commands_head) ? true : false;
594 if (was_empty == true) {
595 STAILQ_INSERT_HEAD(&c->commands_head, command, stailq_entry);
596 } else {
597 STAILQ_INSERT_TAIL(&c->commands_head, command, stailq_entry);
598 }
599
600 #if DARWIN
601 /* Darwin systems <10.12 lack clock_gettime (!) */
602 if (gettimeofday(&c->last_received_time, NULL)) {
603 NOTIFY_ERROR("%s:%s", "gettimeofday", strerror(errno));
604 }
605 #else
606 if (clock_gettime(CLOCK_REALTIME, &c->last_received_timespec)) {
607 NOTIFY_ERROR("%s:%s", "clock_gettime", strerror(errno));
608 }
609 #endif /* DARWIN */
610
611 if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
612 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
613 return -1;
614 }
615
616 /* if there were already queued commands, a processor should already be scheduled */
617 if (was_empty == true) {
618 connection_inc_ref(c);
619 if (workqueue_add(&c->server->workqueue, connection_process_queue_, c)) {
620 NOTIFY_ERROR("%s:%s", "workqueue_add", "failed");
621 return -1;
622 }
623 }
624
625 return 0;
626 }
627
628
629 /**
630 * Add the utime and stime values to a connection's totals.
631 */
632 void
633 connection_accounting_increment(struct connection *c, struct rusage *ru_start, struct rusage *ru_end)
634 {
635 struct timeval utime_diff, stime_diff;
636
637 timeval_diff(&utime_diff, &ru_end->ru_utime, &ru_start->ru_utime);
638 timeval_diff(&stime_diff, &ru_end->ru_stime, &ru_start->ru_stime);
639
640 timeval_increment(&c->utime, &utime_diff);
641 timeval_increment(&c->stime, &stime_diff);
642 }
643
644 /**
645 * Locks the output buffer, for sending multiple
646 * contiguous messages without possible interleaving.
647 */
648 void
649 connection_lock_output(struct connection *c)
650 {
651 struct evbuffer *output = bufferevent_get_output(c->bev);
652
653 evbuffer_lock(output);
654 }
655
656 /**
657 * Unlocks the output buffer.
658 */
659 void
660 connection_unlock_output(struct connection *c)
661 {
662 struct evbuffer *output = bufferevent_get_output(c->bev);
663
664 evbuffer_unlock(output);
665 }
666
667 /**
668 * Sends a text string to a connection.
669 */
670 int
671 connection_printf(struct connection *c, const char *fmt, ...)
672 {
673 struct evbuffer *output = bufferevent_get_output(c->bev);
674 va_list ap;
675 int r;
676
677 va_start(ap, fmt);
678 r = evbuffer_add_vprintf(output, fmt, ap);
679 va_end(ap);
680
681 return r;
682 }
683
684
685 /**
686 * Sends a text string to a connection.
687 */
688 int
689 connection_vprintf(struct connection *c, const char *fmt, va_list ap)
690 {
691 struct evbuffer *output = bufferevent_get_output(c->bev);
692 int r;
693
694 r = evbuffer_add_vprintf(output, fmt, ap);
695
696 return r;
697 }
698
699
700 /**
701 * Send one data buffer to multiple connections.
702 * Data buffer must be malloced, and will be freed when last connection
703 * has finished with it.
704 * Returns number of items sent, or negative on error.
705 */
706 int
707 connection_multi_send(struct connection **clist, char *data, size_t len)
708 {
709 struct connection **clist_iter;
710 struct rc_data_ *rc;
711 int r = 0;
712
713 if (!clist) {
714 NOTIFY_DEBUG("null clist [%s:%d]", __FILE__, __LINE__);
715 return -1;
716 }
717
718 /* Track our data so it can be freed once all connections are done with it. */
719 rc = rc_data_new_(data, len);
720 if (rc == NULL) {
721 NOTIFY_ERROR("rc_data_new_('%s', %zu) failed", data, len);
722 return -1;
723 }
724
725 clist_iter = clist;
726 while (*clist_iter) {
727 struct evbuffer *output = bufferevent_get_output((*clist_iter)->bev);
728
729 /* FIXME: if connection is still valid... */
730
731 /* FIXME: clang claims this is a use-after-free if evbuffer_add_reference fails, but I don't yet believe it. */
732 rc_data_ref_inc_(rc);
733 if (evbuffer_add_reference(output, rc->data, rc->data_len, rc_cleanup_cb_, rc)) {
734 NOTIFY_ERROR("%s:%s", "evbuffer_add_reference", "failed");
735 rc_data_free_(rc);
736 } else {
737 r++;
738 }
739 clist_iter++;
740 }
741
742 /* FIXME: clang also claims this is use-after-free, same as above. */
743 rc_data_free_(rc);
744
745 return r;
746 }
747
748
749 int
750 connection_printf_broadcast(struct connection *sender, bool exclude_sender, const char *fmt, ...)
751 {
752 struct connection **clist;
753 char *message = NULL;
754 ssize_t message_len = 0;
755 va_list ap;
756 int r;
757
758 va_start(ap, fmt);
759 message_len = vsnprintf(message, message_len, fmt, ap);
760 va_end(ap);
761 if (message_len < 0) {
762 NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
763 return -1;
764 }
765 message_len += 1;
766 message = malloc(message_len);
767 if (!message) {
768 NOTIFY_ERROR("%s(%zd):%s", "malloc", message_len, strerror(errno));
769 return -1;
770 }
771 va_start(ap, fmt);
772 message_len = vsnprintf(message, message_len, fmt, ap);
773 va_end(ap);
774 if (message_len < 0) {
775 NOTIFY_ERROR("%s:%s (%zd)", "vsnprintf", strerror(errno), message_len);
776 free(message);
777 return -1;
778 }
779
780 #ifdef DEBUG_SILLY
781 NOTIFY_DEBUG("message_len:%zu message:%s", message_len, message);
782 #endif
783
784 clist = connections_all_as_array(&sender->server->connections, exclude_sender == true ? sender : NULL);
785 r = connection_multi_send(clist, message, message_len);
786 if (r < 0) {
787 NOTIFY_ERROR("%s:%s", "connection_multi_send", "failed");
788 }
789 for (struct connection **clist_iter = clist; *clist_iter; clist_iter++) {
790 connection_free(*clist_iter);
791 }
792 free(clist);
793
794 return r;
795 }
796
797 /**
798 *
799 */
800 void
801 connection_inc_ref(struct connection *c)
802 {
803 int r;
804
805 if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
806 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
807 return;
808 }
809
810 c->reference_count += 1;
811
812 #if DEBUG_SILLY
813 NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
814 #endif
815
816 if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
817 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
818 return;
819 }
820 }
821
822 /**
823 * Frees a connection (and all data owned by a connection) if there are no more references to it.
824 */
825 void
826 connection_free(struct connection *c)
827 {
828 struct evbuffer *tmp;
829 evutil_socket_t fd;
830 int r;
831
832 if ( (r = pthread_mutex_lock(&c->rc_mutex)) ) {
833 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
834 return;
835 }
836
837 c->reference_count -= 1;
838
839 #if DEBUG_SILLY
840 NOTIFY_DEBUG("c:%p new reference_count:%zu", c, c->reference_count);
841 #endif
842
843 if (c->reference_count > 0) {
844 if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
845 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
846 }
847
848 return;
849 }
850
851 NOTIFY_DEBUG("c:%p freeing", c);
852
853 connections_remove(c);
854
855 bufferevent_disable(c->bev, EV_READ|EV_WRITE);
856 tmp = bufferevent_get_output(c->bev);
857 evbuffer_drain(tmp, evbuffer_get_length(tmp));
858 tmp = bufferevent_get_input(c->bev);
859 evbuffer_drain(tmp, evbuffer_get_length(tmp));
860
861 fd = bufferevent_getfd(c->bev);
862 if (fd != -1) {
863 EVUTIL_CLOSESOCKET(fd);
864 bufferevent_setfd(c->bev, -1);
865 }
866
867 if (c->dns_retry_ev) {
868 if (event_del(c->dns_retry_ev)) {
869 NOTIFY_ERROR("%s:%s", "event_del", strerror(errno));
870 }
871 c->dns_retry_ev = NULL;
872 }
873
874 if (c->evdns_request) {
875 evdns_cancel_request(c->server->evdns_base, c->evdns_request);
876 c->evdns_request = NULL;
877 }
878
879 if (c->client_address) {
880 free(c->client_address);
881 c->client_address = NULL;
882 }
883
884 bufferevent_free(c->bev);
885
886 if (c->uuid) {
887 uuid_rc_t rc;
888
889 rc = uuid_destroy(c->uuid);
890 if (rc != UUID_RC_OK) {
891 NOTIFY_ERROR("%s:%s", "uuid_destroy", uuid_error(rc));
892 }
893 c->uuid = NULL;
894 }
895
896 /* empty out the command queue */
897 if ( (r = pthread_mutex_lock(&c->commands_mutex)) ) {
898 NOTIFY_ERROR("%s:%s", "pthread_mutex_lock", strerror(r));
899 }
900
901 struct command *c1, *c2;
902 c1 = STAILQ_FIRST(&c->commands_head);
903 while (c1 != NULL) {
904 c2 = STAILQ_NEXT(c1, stailq_entry);
905 free(c1);
906 c1 = c2;
907 }
908 STAILQ_INIT(&c->commands_head);
909
910 if ( (r = pthread_mutex_unlock(&c->commands_mutex)) ) {
911 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
912 }
913 if ( (r = pthread_mutex_destroy(&c->commands_mutex)) ) {
914 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
915 }
916
917 if ( (r = pthread_mutex_unlock(&c->rc_mutex)) ) {
918 NOTIFY_ERROR("%s:%s", "pthread_mutex_unlock", strerror(r));
919 }
920 if ( (r = pthread_mutex_destroy(&c->rc_mutex)) ) {
921 NOTIFY_ERROR("%s:%s", "pthread_mutex_destroy", strerror(r));
922 }
923
924 memset(c, 0, sizeof *c);
925
926 free(c);
927 }