Merge branch 'release/release-1.2'
[reservoir_sample] / reservoir.c
diff --git a/reservoir.c b/reservoir.c
new file mode 100644 (file)
index 0000000..1878203
--- /dev/null
@@ -0,0 +1,174 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "reservoir.h"
+#include "notify.h"
+#include "buf.h"
+#include "randomness.h"
+#include "test_suite.h"
+
+reservoir_t reservoir_new(size_t sz) {
+       reservoir_t reservoir;
+
+       reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
+       if (reservoir == NULL) {
+               NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
+               return NULL;
+       }
+       reservoir->reservoir_sz = sz;
+       reservoir->reservoir_used = 0;
+       reservoir->reservoir_tally = 0;
+       memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
+
+       return reservoir;
+}
+
+int reservoir_grow(reservoir_t *preservoir, size_t growby) {
+       assert(preservoir != NULL);
+
+       if (growby) {
+               void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir);
+               if (tmp_ptr == NULL) {
+                       NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
+                       return -1;
+               }
+               *preservoir = tmp_ptr;
+               (*preservoir)->reservoir_sz += growby;
+               memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used);
+       }
+
+       D_RESERVOIR(*preservoir);
+
+       return 0;
+}
+
+void reservoir_remember(reservoir_t reservoir, buf_t buf) {
+       buf_t old_buf;
+
+       assert(reservoir != NULL);
+
+       D_BUF("reserving ", buf);
+
+       if (reservoir->reservoir_sz > 0) {
+               unsigned long randomness;
+
+               if (reservoir->reservoir_used < reservoir->reservoir_sz) {
+                       /* there are still unused slots, fill them up without discarding anything */
+                       /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
+                       randomness = randomness_upto_inclusive(reservoir->reservoir_used);
+
+                       assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */
+
+                       NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used);
+                       reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness];
+                       old_buf = NULL; /* no old entry to discard */
+                       reservoir->reservoir_used += 1;
+               } else {
+                       randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1);
+                       old_buf = reservoir->reservoir[randomness];
+               }
+               NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
+               reservoir->reservoir[randomness] = buf;
+       } else {
+               /* can't add anything to a zero-size reservoir, so just dispose of new item */
+               old_buf = buf;
+       }
+
+       reservoir->reservoir_tally += 1;
+
+       if (old_buf != NULL) {
+               D_BUF("FREEING ", old_buf);
+               memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
+               free(old_buf);
+       }
+
+       D_RESERVOIR(reservoir);
+}
+
+int reservoir_write(int fd, reservoir_t reservoir, char delim) {
+       ssize_t len;
+       size_t i;
+       struct iovec iov[2];
+
+       iov[1].iov_base = &delim;
+       iov[1].iov_len = sizeof delim;
+
+       assert(reservoir != NULL);
+       D_RESERVOIR(reservoir);
+
+       for (i = 0; i < reservoir->reservoir_sz; i++) {
+               if (reservoir->reservoir[i]) {
+                       iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
+                       iov[0].iov_len = reservoir->reservoir[i]->buf_used;
+               } else {
+                       iov[0].iov_base = NULL;
+                       iov[0].iov_len = 0;
+               }
+
+               do {
+                       len = writev(fd, iov, sizeof iov / sizeof *iov);
+               } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+               if (len < 0) {
+                       NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+#define META_BUF_SZ 128
+int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) {
+       char buf[META_BUF_SZ];
+       int metalen;
+       ssize_t len;
+
+       metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c",
+                          reservoir->reservoir_sz, delim,
+                          reservoir->reservoir_used, delim,
+                          reservoir->reservoir_tally, delim,
+                          samples, delim);
+       if ((size_t)metalen >= sizeof buf) {
+               NOTIFY_ERROR("out of buffer");
+               return -1;
+       }
+       do {
+               len = write(fd, buf, metalen);
+       } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+       if (len < metalen) {
+               if (len < 0) {
+                       NOTIFY_ERROR("%s:%s", "write", strerror(errno));
+               }
+               return -1;
+       }
+
+       return 0;
+}
+
+#ifdef TEST
+
+void *test_suite_data;
+
+int test_suite_pre(void *suite_data) {
+       (void)suite_data;
+       if (randomness_init(NULL)) {
+               return -1;
+       }
+       return 0;
+}
+
+int test_suite_post(void *suite_data) {
+       (void)suite_data;
+       return 0;
+}
+
+test_t test_suite[] = {
+       { NULL, NULL, NULL, NULL, NULL },
+};
+
+#endif /* TEST */