--- /dev/null
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "reservoir.h"
+#include "notify.h"
+#include "buf.h"
+#include "randomness.h"
+#include "test_suite.h"
+
+reservoir_t reservoir_new(size_t sz) {
+ reservoir_t reservoir;
+
+ reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
+ if (reservoir == NULL) {
+ NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
+ return NULL;
+ }
+ reservoir->reservoir_sz = sz;
+ reservoir->reservoir_used = 0;
+ reservoir->reservoir_tally = 0;
+ memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
+
+ return reservoir;
+}
+
+int reservoir_grow(reservoir_t *preservoir, size_t growby) {
+ assert(preservoir != NULL);
+
+ if (growby) {
+ void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir);
+ if (tmp_ptr == NULL) {
+ NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
+ return -1;
+ }
+ *preservoir = tmp_ptr;
+ (*preservoir)->reservoir_sz += growby;
+ memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used);
+ }
+
+ D_RESERVOIR(*preservoir);
+
+ return 0;
+}
+
+void reservoir_remember(reservoir_t reservoir, buf_t buf) {
+ buf_t old_buf;
+
+ assert(reservoir != NULL);
+
+ D_BUF("reserving ", buf);
+
+ if (reservoir->reservoir_sz > 0) {
+ unsigned long randomness;
+
+ if (reservoir->reservoir_used < reservoir->reservoir_sz) {
+ /* there are still unused slots, fill them up without discarding anything */
+ /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
+ randomness = randomness_upto_inclusive(reservoir->reservoir_used);
+
+ assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */
+
+ NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used);
+ reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness];
+ old_buf = NULL; /* no old entry to discard */
+ reservoir->reservoir_used += 1;
+ } else {
+ randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1);
+ old_buf = reservoir->reservoir[randomness];
+ }
+ NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
+ reservoir->reservoir[randomness] = buf;
+ } else {
+ /* can't add anything to a zero-size reservoir, so just dispose of new item */
+ old_buf = buf;
+ }
+
+ reservoir->reservoir_tally += 1;
+
+ if (old_buf != NULL) {
+ D_BUF("FREEING ", old_buf);
+ memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
+ free(old_buf);
+ }
+
+ D_RESERVOIR(reservoir);
+}
+
+int reservoir_write(int fd, reservoir_t reservoir, char delim) {
+ ssize_t len;
+ size_t i;
+ struct iovec iov[2];
+
+ iov[1].iov_base = &delim;
+ iov[1].iov_len = sizeof delim;
+
+ assert(reservoir != NULL);
+ D_RESERVOIR(reservoir);
+
+ for (i = 0; i < reservoir->reservoir_sz; i++) {
+ if (reservoir->reservoir[i]) {
+ iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
+ iov[0].iov_len = reservoir->reservoir[i]->buf_used;
+ } else {
+ iov[0].iov_base = NULL;
+ iov[0].iov_len = 0;
+ }
+
+ do {
+ len = writev(fd, iov, sizeof iov / sizeof *iov);
+ } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+ if (len < 0) {
+ NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+#define META_BUF_SZ 128
+int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) {
+ char buf[META_BUF_SZ];
+ int metalen;
+ ssize_t len;
+
+ metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c",
+ reservoir->reservoir_sz, delim,
+ reservoir->reservoir_used, delim,
+ reservoir->reservoir_tally, delim,
+ samples, delim);
+ if ((size_t)metalen >= sizeof buf) {
+ NOTIFY_ERROR("out of buffer");
+ return -1;
+ }
+ do {
+ len = write(fd, buf, metalen);
+ } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+ if (len < metalen) {
+ if (len < 0) {
+ NOTIFY_ERROR("%s:%s", "write", strerror(errno));
+ }
+ return -1;
+ }
+
+ return 0;
+}
+
+#ifdef TEST
+
+void *test_suite_data;
+
+int test_suite_pre(void *suite_data) {
+ (void)suite_data;
+ if (randomness_init(NULL)) {
+ return -1;
+ }
+ return 0;
+}
+
+int test_suite_post(void *suite_data) {
+ (void)suite_data;
+ return 0;
+}
+
+test_t test_suite[] = {
+ { NULL, NULL, NULL, NULL, NULL },
+};
+
+#endif /* TEST */