Split code into modules, handle USR1, minor fixes.
authorJustin Wind <justin.wind@gmail.com>
Thu, 21 Feb 2013 01:33:57 +0000 (17:33 -0800)
committerJustin Wind <justin.wind@gmail.com>
Fri, 22 Feb 2013 02:30:14 +0000 (18:30 -0800)
Dump current reservoir buffers eventually after receiving USR1 signal.
buf_t, reservoir_t, and randomness now each have their own respective modules.
Reads and writes are now EINTR safe.
Randomness source now defaults to system PRNG.
Reservoirs might now be growable -- implmented but untested.

Makefile
README.md [new file with mode: 0644]
buf.c [new file with mode: 0644]
buf.h [new file with mode: 0644]
randomness.c [new file with mode: 0644]
randomness.h [new file with mode: 0644]
reservoir.c [new file with mode: 0644]
reservoir.h [new file with mode: 0644]
reservoir_sample.c

index 8f8f4b7daf8ae73eb0f7055a7145336894334491..405e0337e375fe2003d988e481109329f0286384 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -13,8 +13,8 @@ MAKEDEPEND = $(CC) -MM
 
 TARGETS = reservoir_sample
 TEST_DIR = test
-TESTS = $(addprefix $(TEST_DIR)/, reservoir_sample_test)
-SOURCES = reservoir_sample.c notify.c
+TESTS = $(addprefix $(TEST_DIR)/, buf_test reservoir_test)
+SOURCES = reservoir_sample.c notify.c buf.c randomness.c reservoir.c
 OBJECTS = $(SOURCES:.c=.o)
 TEST_OBJECTS = $(TESTS:=.o) test_suite.o
 
@@ -43,8 +43,10 @@ check:       test
 $(TEST_DIR)/%_test.o:  %.c
        $(CC) $(CFLAGS) $(CPPFLAGS) -DTEST -c -o $@ $<
 
-$(TEST_DIR)/reservoir_sample_test:     %: %.o test_suite.o notify.o
+$(TEST_DIR)/buf_test:  %: %.o test_suite.o notify.o
 
-reservoir_sample:      %: %.o notify.o
+$(TEST_DIR)/reservoir_test:    %: %.o test_suite.o notify.o buf.o randomness.o
+
+reservoir_sample:      %: %.o notify.o buf.o randomness.o reservoir.o
 
 -include .depend
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..0033712
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+# reservoir_sample #
+This is a small unixy command-line utility which will harvest an evenly-weighted random sampling from its input stream.
+
+## what ##
+While this sort of collation function can easily be implemented in a few lines of shell, here is a standalone utility specifically for the task, with a few frills.
+
+## who ##
+Justin Wind <justin.wind@gmail.com>
+
+## license ##
+cite or blame, as apropos
+corrections appreciated
+no warranties implied
diff --git a/buf.c b/buf.c
new file mode 100644 (file)
index 0000000..7a94079
--- /dev/null
+++ b/buf.c
@@ -0,0 +1,243 @@
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "buf.h"
+#include "notify.h"
+#include "test_suite.h"
+
+inline
+buf_t buf_new(size_t sz) {
+       buf_t buf = malloc(sz + sizeof *buf);
+       if (buf) {
+               buf->buf_sz = sz;
+               buf->buf_start = buf->buf_used = 0;
+               memset(buf->buf, 0, sz);
+       }
+       return buf;
+}
+
+inline
+void buf_rebase(buf_t buf) {
+       if (buf->buf_start == 0)
+               return;
+       memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
+       buf->buf_start = 0;
+}
+
+inline
+int buf_makeroom(buf_t *pbuf, size_t roomfor) {
+       size_t new_sz;
+       void *tmp_ptr;
+
+       assert(pbuf != NULL);
+
+       if (*pbuf == NULL) {
+               *pbuf = buf_new(roomfor);
+               if (*pbuf == NULL) {
+                       return -1;
+               }
+       }
+
+       buf_rebase(*pbuf);
+
+       if (BUF_ROOM(*pbuf) >= roomfor)
+               return 0;
+
+       new_sz = (*pbuf)->buf_used + roomfor;
+       tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
+       if (tmp_ptr == NULL) {
+               NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
+               return -1;
+       }
+       *pbuf = tmp_ptr;
+       (*pbuf)->buf_sz = new_sz;
+
+       return 0;
+}
+
+inline
+int buf_range_dup_or_append(buf_t src, size_t src_offset, size_t n, buf_t *pdst) {
+       assert(src != NULL);
+       assert(pdst != NULL);
+       assert(src_offset + n <= src->buf_used);
+
+       if (buf_makeroom(pdst, n)) {
+               return -1;
+       }
+
+       memcpy((*pdst)->buf + (*pdst)->buf_used, src->buf + src->buf_start + src_offset, n);
+       (*pdst)->buf_used += n;
+
+       return 0;
+}
+
+/*  Room for improvement:
+               Depending on ratio of flensed portion to buffer use, try to do less
+       copying-around of buffer ranges, by swapping buf pointers instead.
+*/
+ssize_t buf_flense(buf_t *psrc, size_t src_offset, int delimiter, buf_t *pdst) {
+       const size_t delimiter_len = 1;
+       size_t i;
+
+       assert(psrc != NULL);
+       assert(src_offset <= (*psrc)->buf_used);
+
+       NOTIFY_DEBUG("src_offset:%zu", src_offset);
+       D_BUF("src ", *psrc);
+       D_BUF("dst ", pdst ? *pdst : NULL);
+
+       for (i = src_offset; i < (*psrc)->buf_used; i++) {
+               if ((*psrc)->buf[(*psrc)->buf_start + i] == delimiter) {
+
+                       if (pdst != NULL) {
+                               if (buf_range_dup_or_append((*psrc), 0, i, pdst)) {
+                                       return -1;
+                               }
+                       }
+
+                       (*psrc)->buf_start += i + delimiter_len;
+                       (*psrc)->buf_used -= i + delimiter_len;
+
+                       D_BUF("src ", *psrc);
+                       D_BUF("dst ", pdst ? *pdst : NULL);
+                       return i + delimiter_len;
+               }
+       }
+
+       return 0;
+}
+
+#ifdef TEST
+
+static const char buf_flense_testdata1_[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
+static const char buf_flense_testdata2_[] = "\n\nfoo\nbar\n\n";
+
+struct buf_flense_expected_result_ {
+       ssize_t r;
+       const char *buf;
+} buf_flense_expected1[] = {
+       { 1 + 1, "a" },
+       { 2 + 1, "bc" },
+       { 3 + 1, "def" },
+       { 4 + 1, "ghij" },
+       { 5 + 1, "klmno" },
+       { 3 + 1, "pqr" },
+       { 1 + 1, "s" },
+       { 3 + 1, "tuv" },
+       { 5 + 1, "wxyz0" },
+       { 4 + 1, "1234" },
+       { 3 + 1, "567" },
+       { 2 + 1, "89" },
+       { 0, "0leftovers" },
+}, buf_flense_expected2[] = {
+       { 0 + 1, "" },
+       { 0 + 1, "" },
+       { 3 + 1, "foo" },
+       { 3 + 1, "bar" },
+       { 0 + 1, "" },
+       { 0, "" },
+};
+
+struct test_buf_flense_data_ {
+       const char *src;
+       const struct buf_flense_expected_result_ *expected;
+} test_buf_flense_data1 = {
+       .src = buf_flense_testdata1_,
+       .expected = buf_flense_expected1,
+}, test_buf_flense_data2 = {
+       .src = buf_flense_testdata2_,
+       .expected = buf_flense_expected2,
+};
+
+static
+int test_buf_flense_(void *test_arg, void *suite_arg) {
+       (void)suite_arg;
+       struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
+       const char testdata_len = strlen(test_data->src);
+       const struct buf_flense_expected_result_ *next_result = test_data->expected;
+       int retval = 0;
+       buf_t src;
+
+       TEST_INFO("initializing src buffer with %zu characters", testdata_len);
+       src = buf_new(testdata_len);
+       if (src == NULL) {
+               TEST_ERROR("%s:%s", "new_buf_", "failed");
+               return -1;
+       }
+       memcpy(src->buf, test_data->src, testdata_len);
+       src->buf_used += testdata_len;
+
+       D_BUF("src ", src);
+
+       for (;;) {
+               ssize_t r;
+               buf_t dst = NULL;
+
+               r = buf_flense(&src, 0, '\n', &dst);
+               if (r != next_result->r) {
+                       TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
+                       retval = -1;
+               }
+               if (r == 0) {
+                       TEST_INFO("finished flensing");
+                       break;
+               }
+
+               if (strlen(next_result->buf) > dst->buf_used) {
+                       TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
+                       D_BUF("dst ", dst);
+                       retval = -1;
+               } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
+                       TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
+                       D_BUF("dst ", dst);
+                       retval = -1;
+               }
+
+               TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
+
+               memset(dst, 0, dst->buf_sz + sizeof *dst);
+               free(dst);
+               dst = NULL;
+
+               next_result++;
+       }
+       if (strlen(next_result->buf) > src->buf_used) {
+               TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
+               D_BUF("src ", src);
+               retval = -1;
+       } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
+               TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
+               D_BUF("src ", src);
+               retval = -1;
+       }
+
+       TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
+
+       memset(src, 0, src->buf_sz + sizeof *src);
+       free(src);
+       src = NULL;
+
+       return retval;
+}
+
+void *test_suite_data;
+
+int test_suite_pre(void *suite_data) {
+       (void)suite_data;
+       return 0;
+}
+
+int test_suite_post(void *suite_data) {
+       (void)suite_data;
+       return 0;
+}
+
+test_t test_suite[] = {
+       { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
+       { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
+       { NULL, NULL, NULL, NULL, NULL },
+};
+
+#endif /* TEST */
diff --git a/buf.h b/buf.h
new file mode 100644 (file)
index 0000000..a47f3ba
--- /dev/null
+++ b/buf.h
@@ -0,0 +1,73 @@
+#ifndef BUF_H_B87OZOFU
+#define BUF_H_B87OZOFU
+
+#include <stdlib.h>
+
+#ifndef NDEBUG
+#include "notify.h"
+#endif /* NDEBUG */
+
+/* A simple sliding-window byte buffer. */
+
+typedef struct buf_ {
+       size_t buf_sz;
+       size_t buf_start;
+       size_t buf_used;
+       unsigned char buf[];
+} *buf_t;
+#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
+
+#ifndef NDEBUG
+#define D_BUF(__pre__,__b__,...) do {\
+       if ( (__b__) == NULL )\
+               NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
+       else {\
+               NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
+                            ## __VA_ARGS__,\
+                            (__b__),\
+                            (__b__)->buf_sz,\
+                            (__b__)->buf_start,\
+                            (__b__)->buf_used,\
+                            BUF_ROOM((__b__)),\
+                            (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
+               assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
+       }\
+} while (0)
+#else /* NDEBUG */
+#define D_BUF(...)
+#endif /* NDEBUG */
+
+/*  buf_new
+       Allocate and return a new buf_t capable of holding #sz bytes.
+*/
+buf_t buf_new(size_t sz);
+
+/*  buf_rebase
+       Reclaim any free space from the start of #buf, preserving active content.
+*/
+void buf_rebase(buf_t buf);
+
+/* buf_makeroom
+               Assures that the buf pointed to by #pbuf has at space to hold #roomfor more
+       bytes.
+*/
+int buf_makeroom(buf_t *pbuf, size_t roomfor);
+
+/*  buf_range_dup_or_append
+               Starting at the #src_offset byte of #src, appends the following #n bytes to
+       the buffer pointed to by #dstp, which will be re/allocated if needed.
+*/
+int buf_range_dup_or_append(buf_t src, size_t src_offset, size_t n, buf_t *pdst);
+
+/*  buf_flense
+               Starting after #src_offset characters, scan through the buffer pointed
+       to by #psrc, stopping at the first byte matching #delimiter, whereupon, if
+       #pdst is not NULL, all the bytes previous to #delimiter are appended onto
+       the buffer pointed to by *#pdst.  The buffer pointed to by #psrc is then
+       trimmed to only contain the bytes following #delimiter.  The delimiter byte
+       is discarded.
+               Returns the number of characters flensed from #src.
+*/
+ssize_t buf_flense(buf_t *psrc, size_t src_offset, int delimiter, buf_t *pdst);
+
+#endif /* BUF_H_B87OZOFU */
diff --git a/randomness.c b/randomness.c
new file mode 100644 (file)
index 0000000..90b375a
--- /dev/null
@@ -0,0 +1,67 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "randomness.h"
+#include "notify.h"
+
+static int randomness_fd_ = -1;
+
+int randomness_init(const char *filename) {
+       if (filename != NULL) {
+               randomness_fd_ = open(filename, O_RDONLY);
+               if (randomness_fd_ == -1) {
+                       NOTIFY_ERROR("%s('%s'):%s", "open", filename, strerror(errno));
+                       return -1;
+               }
+               NOTIFY_DEBUG("reading randomness from '%s', fd:%d", filename, randomness_fd_);
+       } else {
+               if (randomness_fd_ != -1) {
+                       close(randomness_fd_);
+                       randomness_fd_ = -1;
+               }
+               srand48(time(NULL) ^ getpid());
+               NOTIFY_DEBUG("reading randomness from system PRNG");
+       }
+
+       return 0;
+}
+
+/*
+       Room for improvement: constrain bits of randomness consumed, based on #limit
+       Also maybe read chunks of randomness at a time
+ */
+unsigned long randomness_upto_inclusive(unsigned long limit) {
+       unsigned long randomness;
+
+       if (limit == 0)
+               return 0;
+
+       if (randomness_fd_ != -1) {
+               ssize_t len;
+
+               do {
+                       len = read(randomness_fd_, &randomness, sizeof randomness);
+               } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+               if (len == sizeof randomness) {
+                       randomness %= limit + 1;
+                       NOTIFY_DEBUG("randomness:%lu", randomness);
+                       return randomness;
+               }
+               NOTIFY_ERROR("%s(%d, %zu):%zd:%s",
+                            "read", randomness_fd_, sizeof randomness, len,
+                            (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read" )
+                           );
+       }
+
+       /* fall back to pseudo-randomness if read failed */
+       randomness = mrand48();
+       randomness %= limit + 1;
+
+       NOTIFY_DEBUG("randomness:%lu", randomness);
+       return randomness;
+}
\ No newline at end of file
diff --git a/randomness.h b/randomness.h
new file mode 100644 (file)
index 0000000..24d7653
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef RANDOMNESS_H_4LC721CM
+#define RANDOMNESS_H_4LC721CM
+
+/*
+       Wrapper for wrangling random values.
+*/
+
+/*     rand_init
+       Prepare to read randomness from #filename.
+       If filename is NULL, use system pseudorandom generator.
+*/
+int randomness_init(const char *filename);
+
+/*     randomness_upto_inclusive
+       Return a random number from zero up through #limit.
+*/
+unsigned long randomness_upto_inclusive(unsigned long limit);
+
+#endif /* RANDOMNESS_H_4LC721CM */
diff --git a/reservoir.c b/reservoir.c
new file mode 100644 (file)
index 0000000..1878203
--- /dev/null
@@ -0,0 +1,174 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "reservoir.h"
+#include "notify.h"
+#include "buf.h"
+#include "randomness.h"
+#include "test_suite.h"
+
+reservoir_t reservoir_new(size_t sz) {
+       reservoir_t reservoir;
+
+       reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
+       if (reservoir == NULL) {
+               NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
+               return NULL;
+       }
+       reservoir->reservoir_sz = sz;
+       reservoir->reservoir_used = 0;
+       reservoir->reservoir_tally = 0;
+       memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
+
+       return reservoir;
+}
+
+int reservoir_grow(reservoir_t *preservoir, size_t growby) {
+       assert(preservoir != NULL);
+
+       if (growby) {
+               void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir);
+               if (tmp_ptr == NULL) {
+                       NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
+                       return -1;
+               }
+               *preservoir = tmp_ptr;
+               (*preservoir)->reservoir_sz += growby;
+               memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used);
+       }
+
+       D_RESERVOIR(*preservoir);
+
+       return 0;
+}
+
+void reservoir_remember(reservoir_t reservoir, buf_t buf) {
+       buf_t old_buf;
+
+       assert(reservoir != NULL);
+
+       D_BUF("reserving ", buf);
+
+       if (reservoir->reservoir_sz > 0) {
+               unsigned long randomness;
+
+               if (reservoir->reservoir_used < reservoir->reservoir_sz) {
+                       /* there are still unused slots, fill them up without discarding anything */
+                       /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
+                       randomness = randomness_upto_inclusive(reservoir->reservoir_used);
+
+                       assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */
+
+                       NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used);
+                       reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness];
+                       old_buf = NULL; /* no old entry to discard */
+                       reservoir->reservoir_used += 1;
+               } else {
+                       randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1);
+                       old_buf = reservoir->reservoir[randomness];
+               }
+               NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
+               reservoir->reservoir[randomness] = buf;
+       } else {
+               /* can't add anything to a zero-size reservoir, so just dispose of new item */
+               old_buf = buf;
+       }
+
+       reservoir->reservoir_tally += 1;
+
+       if (old_buf != NULL) {
+               D_BUF("FREEING ", old_buf);
+               memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
+               free(old_buf);
+       }
+
+       D_RESERVOIR(reservoir);
+}
+
+int reservoir_write(int fd, reservoir_t reservoir, char delim) {
+       ssize_t len;
+       size_t i;
+       struct iovec iov[2];
+
+       iov[1].iov_base = &delim;
+       iov[1].iov_len = sizeof delim;
+
+       assert(reservoir != NULL);
+       D_RESERVOIR(reservoir);
+
+       for (i = 0; i < reservoir->reservoir_sz; i++) {
+               if (reservoir->reservoir[i]) {
+                       iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
+                       iov[0].iov_len = reservoir->reservoir[i]->buf_used;
+               } else {
+                       iov[0].iov_base = NULL;
+                       iov[0].iov_len = 0;
+               }
+
+               do {
+                       len = writev(fd, iov, sizeof iov / sizeof *iov);
+               } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+               if (len < 0) {
+                       NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+#define META_BUF_SZ 128
+int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) {
+       char buf[META_BUF_SZ];
+       int metalen;
+       ssize_t len;
+
+       metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c",
+                          reservoir->reservoir_sz, delim,
+                          reservoir->reservoir_used, delim,
+                          reservoir->reservoir_tally, delim,
+                          samples, delim);
+       if ((size_t)metalen >= sizeof buf) {
+               NOTIFY_ERROR("out of buffer");
+               return -1;
+       }
+       do {
+               len = write(fd, buf, metalen);
+       } while (len == -1 && (errno == EINTR || errno == EAGAIN));
+       if (len < metalen) {
+               if (len < 0) {
+                       NOTIFY_ERROR("%s:%s", "write", strerror(errno));
+               }
+               return -1;
+       }
+
+       return 0;
+}
+
+#ifdef TEST
+
+void *test_suite_data;
+
+int test_suite_pre(void *suite_data) {
+       (void)suite_data;
+       if (randomness_init(NULL)) {
+               return -1;
+       }
+       return 0;
+}
+
+int test_suite_post(void *suite_data) {
+       (void)suite_data;
+       return 0;
+}
+
+test_t test_suite[] = {
+       { NULL, NULL, NULL, NULL, NULL },
+};
+
+#endif /* TEST */
diff --git a/reservoir.h b/reservoir.h
new file mode 100644 (file)
index 0000000..c056ac5
--- /dev/null
@@ -0,0 +1,61 @@
+#ifndef RESERVOIR_H_MZSA8WJ3
+#define RESERVOIR_H_MZSA8WJ3
+
+#include <stdlib.h>
+
+#include "buf.h"
+
+#ifndef NDEBUG
+#include "notify.h"
+#endif /* NDEBUG */
+
+/* A pool of buf_t */
+
+typedef struct reservoir_ {
+       size_t reservoir_sz; /* allocated slots */
+       size_t reservoir_used; /* slots filled */
+       unsigned long reservoir_tally; /* total number of items remembered */
+       buf_t reservoir[];
+} *reservoir_t;
+
+#ifndef NDEBUG
+#define D_RESERVOIR(__r__) do {\
+       size_t i;\
+               NOTIFY_DEBUG("reservoir:%p sz: %zu used:%zu tally:%zu", (__r__), (__r__)->reservoir_sz, (__r__)->reservoir_used, (__r__)->reservoir_tally);\
+               for (i = 0; i < (__r__)->reservoir_sz; i++) {\
+                       D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
+               }\
+} while (0)
+#else /* NDEBUG */
+#define D_RESERVOIR(...)
+#endif /* NDEBUG */
+
+/*     reservoir_new
+       Allocate and return a new reservoir capable of holding #sz bufs.
+*/
+reservoir_t reservoir_new(size_t sz);
+
+/*     reservoir_grow
+       Increase the storage capacity of the reservoir pointed to by #preservoir
+       by #growby bufs.
+*/
+int reservoir_grow(reservoir_t *preservoir, size_t growby);
+
+/*     reservoir_remember
+               Remember #buf, forgetting another buf at random if the reservoir
+               pointed to by #preservoir is already full to capacity.
+*/
+void reservoir_remember(reservoir_t reservoir, buf_t buf);
+
+/*     reservoir_write
+       Write the contents of the bufs within #reservoir to #fd, each with a
+       trailing #delim.
+*/
+int reservoir_write(int fd, reservoir_t reservoir, char delim);
+
+/*     reservoir_write_meta
+       Write metadata of #reservoir and #samples to fd.
+*/
+int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim);
+
+#endif /* RESERVOIR_H_MZSA8WJ3 */
index 167b5c9531aab1960d0d7fdc35a7daecc3bc52aa..674506250024495fdd8d50bd40899f80985b667e 100644 (file)
@@ -1,22 +1,32 @@
 /*  reservoir_sample.c
-        This generates a randomized subset of its input, by means of reservoir-
+        This collates a randomized subset of its input, by means of reservoir-
     sampling, and a Fisher-Yates shuffle.
 */
 
+/*  To do:
+               Allow user-supplied function (in relation to entry count) to define
+               grow-rate option, rather than static count.  JIT would be nice there.
+
+               The entire grow-rate thing is not very well thought-through, will bias
+               the results, and likely not very useful as currently implemented.
+*/
+
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
 #include <errno.h>
 #include <stdio.h>
 #include <fcntl.h>
-#include <sys/uio.h>
-#include <time.h>
+#include <limits.h>
+#include <signal.h>
 #include <sysexits.h>
 #include <assert.h>
 
 #include "version.h"
 #include "notify.h"
-#include "test_suite.h"
+#include "buf.h"
+#include "randomness.h"
+#include "reservoir.h"
 
 static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
 
@@ -24,63 +34,27 @@ static struct options_ {
        unsigned int verbosity;
        size_t read_buf_sz;
        size_t reservoir_sz;
+       unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */
+       unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */
        char delim;
        char delim_out;
        char *rand_file;
+       int status_fd;
 } options_ = {
        .verbosity = 0,
        .read_buf_sz = 8192,
        .reservoir_sz = 1,
+       .reservoir_grow_per = 0,
        .delim = '\n',
        .delim_out = '\n',
-       .rand_file = "/dev/random",
+       .rand_file = NULL,
+       .status_fd = STDERR_FILENO,
 };
 
-static int rand_fd_ = -1;
-
-/* a simple little sliding-window buffer */
-typedef struct buf_ {
-       size_t buf_sz;
-       size_t buf_start;
-       size_t buf_used;
-       unsigned char buf[];
-} *buf_t;
-#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
-
-typedef struct reservoir_ {
-       size_t reservoir_insertions; /* how many items have ever been added to reservoir */
-       size_t reservoir_sz;
-       buf_t reservoir[];
-} *reservoir_t;
-
-#ifndef NDEBUG
-#define D_BUF(__pre__,__b__,...) do {\
-       if ( (__b__) == NULL )\
-               NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
-       else {\
-               NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
-                            ## __VA_ARGS__,\
-                            (__b__),\
-                            (__b__)->buf_sz,\
-                            (__b__)->buf_start,\
-                            (__b__)->buf_used,\
-                            BUF_ROOM((__b__)),\
-                            (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
-               assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
-       }\
-} while (0)
-#define D_RESERVOIR(__r__) do {\
-       size_t i;\
-       for (i = 0; i < (__r__)->reservoir_sz; i++) {\
-               D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
-       }\
-       NOTIFY_DEBUG("  insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
-} while (0)
-#else
-#define D_RESERVOIR(...)
-#define D_BUF(...)
-#endif
-
+/*     #status_requested_ will be set whenever a view of the current sample
+       reservoir is desired.
+*/
+static int status_requested_ = 0;
 
 static
 void usage_(const char *prog, unsigned int full) {
@@ -99,380 +73,94 @@ void usage_(const char *prog, unsigned int full) {
 
        if (full) {
                fprintf(f, "\nOptions:\n"
-                          "\t-n <num> -- returns <num> samples [default: %zu]\n"
-                          "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
-                          "\t-r <file> -- read randomness from <file> [default: '%s']\n"
-                          "\t-b <bytes> -- read buffer size [default: %zu]\n"
-                          "\t-v -- increase verbosity\n"
-                          "\t-h -- this screen\n",
+                          "\t-n <num>   -- returns <num> samples [default: %zu]\n"
+                          "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
+                          "\t-r <file>  -- read randomness from <file> [default: '%s']\n"
+                          "\t-b <bytes> -- set input buffer size [default: %zu]\n"
+                          "\t-i <num>   -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
+                          "\t-h <num>   -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
+                          "\t-s <file>  -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
+                          "\t-v         -- increase verbosity\n"
+                          "\t-h         -- this screen\n",
                        options_.reservoir_sz,
                        options_.delim,
-                       options_.rand_file,
-                       options_.read_buf_sz);
+                       options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)",
+                       options_.read_buf_sz,
+                       options_.reservoir_grow_double_per,
+                       options_.reservoir_grow_per);
 
                fprintf(f, "\n%78s\n", src_id_);
        }
        fflush(f);
 }
 
-
-/*  rand_upto_inclusive_
-    Room for improvement: constrain bits of randomness consumed, based on #limit
-    also maybe read chunks of randomness at a time
- */
-static
-unsigned long rand_upto_inclusive_(unsigned long limit) {
-       unsigned long randomness;
-
-       if (limit == 0)
-               return 0;
-
-       if (rand_fd_ != -1) {
-               ssize_t len;
-
-               len = read(rand_fd_, &randomness, sizeof randomness);
-               if (len == sizeof randomness) {
-                       randomness %= limit + 1;
-
-                       return randomness;
-               }
-               NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
-       }
-
-       /* fall back to dumb randomness */
-       randomness = mrand48();
-       randomness %= limit + 1;
-
-       return randomness;
-}
-
-
-static
-int rand_init_(char *rand_file) {
-       srand48(time(NULL) ^ getpid());
-       if (rand_file) {
-               rand_fd_ = open(rand_file, O_RDONLY);
-               if (rand_fd_ == -1) {
-                       NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno));
-                       return -1;
-               }
-       }
-       return 0;
-}
-
-
-static
-buf_t buf_new_(size_t sz) {
-       buf_t buf = malloc(sz + sizeof *buf);
-       if (buf) {
-               buf->buf_sz = sz;
-               buf->buf_start = buf->buf_used = 0;
-               memset(buf->buf, 0, sz);
-       }
-       return buf;
-}
-
-
-static
-void buf_rebase_(buf_t buf) {
-       if (buf->buf_start == 0)
-               return;
-       memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
-       buf->buf_start = 0;
-}
-
-
-static
-int buf_makeroom_(buf_t *pbuf, size_t roomfor) {
-       size_t new_sz;
-       void *tmp_ptr;
-
-       assert(pbuf != NULL);
-
-       if (*pbuf == NULL) {
-               *pbuf = buf_new_(roomfor);
-               if (*pbuf == NULL) {
-                       return -1;
-               }
-       }
-
-       buf_rebase_(*pbuf);
-
-       if (BUF_ROOM(*pbuf) >= roomfor)
-               return 0;
-
-       new_sz = (*pbuf)->buf_used + roomfor;
-       tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
-       if (tmp_ptr == NULL) {
-               NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
-               return -1;
-       }
-       *pbuf = tmp_ptr;
-       (*pbuf)->buf_sz = new_sz;
-
-       return 0;
-}
-
-
-static
-int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) {
-       assert(src != NULL);
-       assert(dst != NULL);
-       assert(src_skip + n <= src->buf_used);
-
-       if (buf_makeroom_(dst, n)) {
-               return -1;
-       }
-
-       memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n);
-       (*dst)->buf_used += n;
-
-       return 0;
-}
-
-
-/*  buf_flense_
-        Starting after #src_offset characters, scan through #src, stopping at
-    the first character matching #delimiter, whereupon all the characters
-    leading up to #delimiter are copied into *#dst if #dst is not NULL.  #src
-    becomes the characters following #delimiter.
-        Returns the number of characters flensed from #src.
-
-    Room for improvement:
-        If flensed segment is more than half the buffer, copy remainder of src
-    into dst, then return src, leaving dst in its place.
-*/
-static
-ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) {
-       const size_t delimiter_len = 1;
-       size_t i;
-
-       assert(src != NULL);
-       assert(src_offset <= (*src)->buf_used);
-
-       NOTIFY_DEBUG("src_offset:%zu", src_offset);
-       D_BUF("src ", *src);
-       D_BUF("dst ", dst ? *dst : NULL);
-
-       for (i = src_offset; i < (*src)->buf_used; i++) {
-               if ((*src)->buf[(*src)->buf_start + i] == delimiter) {
-
-                       if (dst != NULL) {
-                               if (buf_range_dup_or_append_((*src), 0, i, dst)) {
-                                       return -1;
-                               }
-                       }
-
-                       (*src)->buf_start += i + delimiter_len;
-                       (*src)->buf_used -= i + delimiter_len;
-
-                       D_BUF("src ", *src);
-                       D_BUF("dst ", dst ? *dst : NULL);
-                       return i + delimiter_len;
-               }
-       }
-
-       return 0;
-}
-
-#ifdef TEST
-
-static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
-static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n";
-
-struct buf_flense_expected_result_ {
-       ssize_t r;
-       const char *buf;
-} buf_flense_expected1[] = {
-       { 1 + 1, "a" },
-       { 2 + 1, "bc" },
-       { 3 + 1, "def" },
-       { 4 + 1, "ghij" },
-       { 5 + 1, "klmno" },
-       { 3 + 1, "pqr" },
-       { 1 + 1, "s" },
-       { 3 + 1, "tuv" },
-       { 5 + 1, "wxyz0" },
-       { 4 + 1, "1234" },
-       { 3 + 1, "567" },
-       { 2 + 1, "89" },
-       { 0, "0leftovers" },
-}, buf_flense_expected2[] = {
-       { 0 + 1, "" },
-       { 0 + 1, "" },
-       { 3 + 1, "foo" },
-       { 3 + 1, "bar" },
-       { 0 + 1, "" },
-       { 0, "" },
-};
-
-struct test_buf_flense_data_ {
-       const char *src;
-       const struct buf_flense_expected_result_ *expected;
-} test_buf_flense_data1 = {
-       .src = buf_flense_testdata1,
-       .expected = buf_flense_expected1,
-}, test_buf_flense_data2 = {
-       .src = buf_flense_testdata2,
-       .expected = buf_flense_expected2,
-};
-
-static int test_buf_flense_(void *test_arg, void *suite_arg) {
-       (void)suite_arg;
-       struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
-       const char testdata_len = strlen(test_data->src);
-       const struct buf_flense_expected_result_ *next_result = test_data->expected;
-       int retval = 0;
-       buf_t src;
-
-       TEST_INFO("initializing src buffer with %zu characters", testdata_len);
-       src = buf_new_(testdata_len);
-       if (src == NULL) {
-               TEST_ERROR("%s:%s", "new_buf_", "failed");
-               return -1;
-       }
-       memcpy(src->buf, test_data->src, testdata_len);
-       src->buf_used += testdata_len;
-
-       D_BUF("src ", src);
-
-       for (;;) {
-               ssize_t r;
-               buf_t dst = NULL;
-
-               r = buf_flense_(&src, 0, '\n', &dst);
-               if (r != next_result->r) {
-                       TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
-                       retval = -1;
-               }
-               if (r == 0) {
-                       TEST_INFO("finished flensing");
-                       break;
-               }
-
-               if (strlen(next_result->buf) > dst->buf_used) {
-                       TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
-                       D_BUF("dst ", dst);
-                       retval = -1;
-               } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
-                       TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
-                       D_BUF("dst ", dst);
-                       retval = -1;
-               }
-
-               TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
-
-               memset(dst, 0, dst->buf_sz + sizeof *dst);
-               free(dst);
-               dst = NULL;
-
-               next_result++;
-       }
-       if (strlen(next_result->buf) > src->buf_used) {
-               TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
-               D_BUF("src ", src);
-               retval = -1;
-       } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
-               TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
-               D_BUF("src ", src);
-               retval = -1;
-       }
-
-       TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
-
-       memset(src, 0, src->buf_sz + sizeof *src);
-       free(src);
-       src = NULL;
-
-       return retval;
-}
-#endif /* TEST */
-
-
-/*  reservoir_remember_
-        Remember #line, forgetting a line if more than #num_lines have already
-    been remembered.  Remembers them in random order.
+/*     request_snapshot_
+       Signal handler to be bound to USR1.
+       Upon receiving a signal, take note that a snapshot of the current state
+       has been requested.
 */
 static
-void reservoir_remember_(reservoir_t reservoir, buf_t buf) {
-       buf_t old_buf;
-
-       assert(reservoir != NULL);
-
-       D_BUF("reserving ", buf);
-
-       if (reservoir->reservoir_sz > 0) {
-               unsigned long randomness;
-
-               if (reservoir->reservoir_insertions < reservoir->reservoir_sz) {
-                       /* there are still unused slots, fill them up without discarding anything */
-                       /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
-                       randomness = rand_upto_inclusive_(reservoir->reservoir_insertions);
-
-                       assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */
-
-                       NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions);
-                       reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness];
-                       old_buf = NULL; /* no old entry to discard */
-               } else {
-                       randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
-                       old_buf = reservoir->reservoir[randomness];
-               }
-               NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
-               reservoir->reservoir[randomness] = buf;
-       } else {
-               /* can't add anything to a zero-size reservoir, so just dispose of new item */
-               old_buf = buf;
-       }
-
-       reservoir->reservoir_insertions += 1;
-
-       if (old_buf != NULL) {
-               D_BUF("FREEING ", old_buf);
-               memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
-               free(old_buf);
-       }
-
-       D_RESERVOIR(reservoir);
+void request_snapshot_(int signum) {
+       (void)signum;
+       status_requested_ = 1;
 }
 
-
-/*  reservoir_read_
+/*  accumulate_input_
         Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
     accumulator buffer.  For each #delimiter character found in what was just
     read, occasionally remember the preceeding characters.
 */
 static
-int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) {
+int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) {
        buf_t read_buf, new_buf = NULL;
-       size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
+       size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */
        ssize_t len;
+       unsigned long grow_count = 0;
+       unsigned long grow_grow_count = 0;
 
        assert(read_block_sz > 0);
-       assert(num_lines != NULL);
-       assert(reservoir != NULL);
+       assert(psamples != NULL);
+       assert(preservoir != NULL);
+       assert(*preservoir != NULL);
 
        if (fd < 0) {
                return -1;
        }
 
-       read_buf = buf_new_(read_block_sz);
+       read_buf = buf_new(read_block_sz);
        if (read_buf == NULL) {
                return -1;
        }
-       line_scanned = 0;
+       bytes_scanned = 0;
 
+       /* begin accumulating */
        for (;;) {
                NOTIFY_DEBUG("read loop\n\n");
 
-               if (buf_makeroom_(&read_buf, read_block_sz)) {
+               /* make sure there's enough room in our input buffer for a full read() */
+               if (buf_makeroom(&read_buf, read_block_sz)) {
                        free(read_buf);
                        free(new_buf);
                        return -1;
                }
 
-               NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf));
-               len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf));
+               NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf));
+               do {
+                       len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf));
+
+                       /* a signal may have interrupted read(), deal with that before
+                          doing anything else */
+                       if (status_requested_) {
+                               status_requested_ = 0;
+                               NOTIFY_DEBUG("dumping reservoir due to signal");
+                               if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) {
+                                       NOTIFY_ERROR("failed to output current reservoir contents");
+                               }
+                               if (options_.verbosity) {
+                                       reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out);
+                               }
+                       }
+               } while (len == -1 && (errno == EINTR || errno == EAGAIN));
                if (len < 0) {
                        NOTIFY_ERROR("%s:%s", "read", strerror(errno));
                        free(read_buf);
@@ -492,22 +180,46 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
 
                        NOTIFY_DEBUG("len:%zd", len);
 
+                       if (options_.reservoir_grow_per
+                       &&  grow_count >= options_.reservoir_grow_per) {
+                               NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1);
+                               grow_count = 0;
+                               if (reservoir_grow(preservoir, 1)) {
+                                       NOTIFY_ERROR("failed to increase reservoir size");
+                                       free(read_buf);
+                                       return -1;
+                               }
+                       }
+
+                       if (options_.reservoir_grow_double_per
+                       &&  grow_grow_count >= options_.reservoir_grow_double_per) {
+                               if (grow_count > ULONG_MAX / 2) {
+                                       /* would overflow, never grow again */
+                                       grow_count = 0;
+                                       grow_grow_count = 0;
+                                       NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
+                               } else {
+                                       NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2);
+                                       grow_count *= 2;
+                               }
+                       }
+
                        /* determine if we want to save the next buffer */
                        if (new_buf == NULL) {
                                /* if new_buf is not null, we already want to save the next one.. */
                                /* otherwise, save if we've read in fewer lines than the reservoir holds */
-                               /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
+                               /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
 
-                               if (*num_lines < reservoir->reservoir_sz) {
-                                       NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz);
+                               if (*psamples < (*preservoir)->reservoir_sz) {
+                                       NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz);
                                } else {
-                                       NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz);
+                                       NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz);
                                }
 
-                               if (*num_lines < reservoir->reservoir_sz
-                               ||  rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) {
-                                       NOTIFY_DEBUG("next buffer will be reserved..");
-                                       new_buf = buf_new_(0);
+                               if (*psamples < (*preservoir)->reservoir_sz
+                               ||  randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) {
+                                       NOTIFY_DEBUG("next buffer will be remembered..");
+                                       new_buf = buf_new(0);
                                        if (new_buf == NULL) {
                                                free(read_buf);
                                                return -1;
@@ -517,7 +229,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
                                }
                        }
 
-                       len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL);
+                       len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL);
                        if (len_flensed < 0) {
                                free(read_buf);
                                free(new_buf);
@@ -526,23 +238,25 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
                        if (len_flensed == 0) {
                                /* no delimiter found yet, stop parsing and read more */
                                NOTIFY_DEBUG("no delim found after %zd", len);
-                               line_scanned = len;
-                               buf_rebase_(read_buf);
+                               bytes_scanned = len;
+                               buf_rebase(read_buf);
                                break;
                        }
 
                        len -= len_flensed;
-                       line_scanned = 0;
+                       bytes_scanned = 0;
 
                        D_BUF("read_buf: ", read_buf);
 
                        if (new_buf != NULL) {
                                D_BUF("parsed complete line: ", new_buf);
-                               reservoir_remember_(reservoir, new_buf);
+                               reservoir_remember((*preservoir), new_buf);
                                new_buf = NULL;
                        }
 
-                       *num_lines += 1;
+                       *psamples += 1;
+                       grow_count += 1;
+                       grow_grow_count += 1;
 
                }
        }
@@ -552,12 +266,12 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
                D_BUF("leftovers: ", read_buf);
 
                if (new_buf != NULL
-               ||  *num_lines < reservoir->reservoir_sz) {
-                       reservoir_remember_(reservoir, read_buf);
+               ||  *psamples < (*preservoir)->reservoir_sz) {
+                       reservoir_remember((*preservoir), read_buf);
                        read_buf = NULL;
                }
 
-               *num_lines += 1;
+               *psamples += 1;
        }
 
        free(read_buf);
@@ -567,65 +281,45 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
 }
 
 
-/*  reservoir_write_
-    Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
-*/
-static
-int reservoir_write_(int fd, reservoir_t reservoir, char delim) {
-       ssize_t len;
-       size_t i;
-       struct iovec iov[2];
-
-       iov[1].iov_base = &delim;
-       iov[1].iov_len = sizeof delim;
-
-       assert(reservoir != NULL);
-       D_RESERVOIR(reservoir);
-
-       for (i = 0; i < reservoir->reservoir_sz; i++) {
-               if (reservoir->reservoir[i]) {
-                       iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
-                       iov[0].iov_len = reservoir->reservoir[i]->buf_used;
-               } else {
-                       iov[0].iov_base = NULL;
-                       iov[0].iov_len = 0;
-               }
-
-               len = writev(fd, iov, sizeof iov / sizeof *iov);
-               if (len < 0) {
-                       NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-#ifndef TEST
-
 int main(int argc, char *argv[]) {
+       struct sigaction sa;
+       char *status_filename = NULL;
        reservoir_t reservoir;
-       size_t num_lines = 0;
-       int fd;
+       unsigned long num_samples = 0;
        int c;
 
-       while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) {
+       while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) {
                switch (c) {
                        case 'v':
                                options_.verbosity++;
                                break;
 
                        case 'b':
-                               options_.read_buf_sz = atoi(optarg);
+                               options_.read_buf_sz = strtoul(optarg, NULL, 0);
+                               /* XXX: validate */
                                break;
 
                        case 'd':
                                options_.delim = *optarg;
+                               /* @fallthrough@ */
+                       case 'D':
                                options_.delim_out = *optarg;
                                break;
 
+                       case 'i':
+                               options_.reservoir_grow_per = strtoul(optarg, NULL, 0);
+                               break;
+
+                       case 'j':
+                               options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0);
+                               break;
+
+                       case 's':
+                               status_filename = optarg;
+                               break;
+
                        case 'n':
-                               options_.reservoir_sz = atoi(optarg);
+                               options_.reservoir_sz = strtoul(optarg, NULL, 0);
                                break;
 
                        case 'r':
@@ -642,44 +336,59 @@ int main(int argc, char *argv[]) {
                }
        }
 
+#if 0
+       /* zero-or-more arguments required */
+       /* if that ever changes... */
        if (argc - optind < 0) {
                usage_(argv[0], 0);
                exit(EX_USAGE);
        }
+#endif
 
-       if (rand_init_(options_.rand_file)) {
+       if (randomness_init(options_.rand_file)) {
                NOTIFY_ERROR("failed to initialize randomness source\n");
                exit(EX_NOINPUT);
        }
 
-       reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
+       reservoir = reservoir_new(options_.reservoir_sz);
        if (reservoir == NULL) {
-               NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+               NOTIFY_ERROR("could not create new reservoir");
+               exit(EX_OSERR);
+       }
+
+       if (status_filename) {
+               options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT);
+               if (options_.status_fd < 0) {
+                       NOTIFY_ERROR("could not open status file '%s'", status_filename);
+                       exit(EX_OSERR);
+               }
+       }
+
+       sa.sa_handler = request_snapshot_;
+       (void)sigemptyset(&sa.sa_mask);
+       sa.sa_flags = 0;
+       if (sigaction(SIGUSR1, &sa, NULL)) {
+               NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno));
                exit(EX_OSERR);
        }
-       reservoir->reservoir_sz = options_.reservoir_sz;
-       reservoir->reservoir_insertions = 0;
-       memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
 
        if (argc - optind == 0) {
-               fd = STDIN_FILENO;
-               if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+               if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
                        exit(EX_SOFTWARE);
                }
        } else {
                while (optind < argc) {
                        if (strcmp("-", argv[optind]) == 0) {
-                               fd = STDIN_FILENO;
-                               if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+                               if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
                                        exit(EX_SOFTWARE);
                                }
                        } else {
-                               fd = open(argv[optind], O_RDONLY);
+                               int fd = open(argv[optind], O_RDONLY);
                                if (fd < 0) {
                                        NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
                                        exit(EX_NOINPUT);
                                }
-                               if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+                               if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
                                        exit(EX_SOFTWARE);
                                }
                                if (close(fd)) {
@@ -692,41 +401,16 @@ int main(int argc, char *argv[]) {
        }
 
        if (options_.verbosity) {
-               fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
+               fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples);
        }
 
        if (options_.verbosity > 1) {
-               fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions);
+               fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally);
        }
 
-       if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
+       if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) {
                exit(EX_SOFTWARE);
        }
 
        exit(EX_OK);
 }
-
-#else /* TEST */
-
-void *test_suite_data;
-
-int test_suite_pre(void *suite_data) {
-       (void)suite_data;
-       if (rand_init_(NULL)) {
-               return -1;
-       }
-       return 0;
-}
-int test_suite_post(void *suite_data) {
-       (void)suite_data;
-       return 0;
-}
-
-test_t test_suite[] = {
-       { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
-       { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
-       { NULL, NULL, NULL, NULL, NULL },
-};
-
-
-#endif /* TEST */