From c0224807bccbd7e71e312fb4151378bff4f5a5db Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Wed, 20 Feb 2013 17:33:57 -0800 Subject: [PATCH] Split code into modules, handle USR1, minor fixes. Dump current reservoir buffers eventually after receiving USR1 signal. buf_t, reservoir_t, and randomness now each have their own respective modules. Reads and writes are now EINTR safe. Randomness source now defaults to system PRNG. Reservoirs might now be growable -- implmented but untested. --- Makefile | 10 +- README.md | 13 + buf.c | 243 +++++++++++++++++ buf.h | 73 +++++ randomness.c | 67 +++++ randomness.h | 19 ++ reservoir.c | 174 ++++++++++++ reservoir.h | 61 +++++ reservoir_sample.c | 652 ++++++++++++--------------------------------- 9 files changed, 824 insertions(+), 488 deletions(-) create mode 100644 README.md create mode 100644 buf.c create mode 100644 buf.h create mode 100644 randomness.c create mode 100644 randomness.h create mode 100644 reservoir.c create mode 100644 reservoir.h diff --git a/Makefile b/Makefile index 8f8f4b7..405e033 100644 --- a/Makefile +++ b/Makefile @@ -13,8 +13,8 @@ MAKEDEPEND = $(CC) -MM TARGETS = reservoir_sample TEST_DIR = test -TESTS = $(addprefix $(TEST_DIR)/, reservoir_sample_test) -SOURCES = reservoir_sample.c notify.c +TESTS = $(addprefix $(TEST_DIR)/, buf_test reservoir_test) +SOURCES = reservoir_sample.c notify.c buf.c randomness.c reservoir.c OBJECTS = $(SOURCES:.c=.o) TEST_OBJECTS = $(TESTS:=.o) test_suite.o @@ -43,8 +43,10 @@ check: test $(TEST_DIR)/%_test.o: %.c $(CC) $(CFLAGS) $(CPPFLAGS) -DTEST -c -o $@ $< -$(TEST_DIR)/reservoir_sample_test: %: %.o test_suite.o notify.o +$(TEST_DIR)/buf_test: %: %.o test_suite.o notify.o -reservoir_sample: %: %.o notify.o +$(TEST_DIR)/reservoir_test: %: %.o test_suite.o notify.o buf.o randomness.o + +reservoir_sample: %: %.o notify.o buf.o randomness.o reservoir.o -include .depend diff --git a/README.md b/README.md new file mode 100644 index 0000000..0033712 --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# reservoir_sample # +This is a small unixy command-line utility which will harvest an evenly-weighted random sampling from its input stream. + +## what ## +While this sort of collation function can easily be implemented in a few lines of shell, here is a standalone utility specifically for the task, with a few frills. + +## who ## +Justin Wind + +## license ## +cite or blame, as apropos +corrections appreciated +no warranties implied diff --git a/buf.c b/buf.c new file mode 100644 index 0000000..7a94079 --- /dev/null +++ b/buf.c @@ -0,0 +1,243 @@ +#include +#include +#include +#include + +#include "buf.h" +#include "notify.h" +#include "test_suite.h" + +inline +buf_t buf_new(size_t sz) { + buf_t buf = malloc(sz + sizeof *buf); + if (buf) { + buf->buf_sz = sz; + buf->buf_start = buf->buf_used = 0; + memset(buf->buf, 0, sz); + } + return buf; +} + +inline +void buf_rebase(buf_t buf) { + if (buf->buf_start == 0) + return; + memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used); + buf->buf_start = 0; +} + +inline +int buf_makeroom(buf_t *pbuf, size_t roomfor) { + size_t new_sz; + void *tmp_ptr; + + assert(pbuf != NULL); + + if (*pbuf == NULL) { + *pbuf = buf_new(roomfor); + if (*pbuf == NULL) { + return -1; + } + } + + buf_rebase(*pbuf); + + if (BUF_ROOM(*pbuf) >= roomfor) + return 0; + + new_sz = (*pbuf)->buf_used + roomfor; + tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf); + if (tmp_ptr == NULL) { + NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); + return -1; + } + *pbuf = tmp_ptr; + (*pbuf)->buf_sz = new_sz; + + return 0; +} + +inline +int buf_range_dup_or_append(buf_t src, size_t src_offset, size_t n, buf_t *pdst) { + assert(src != NULL); + assert(pdst != NULL); + assert(src_offset + n <= src->buf_used); + + if (buf_makeroom(pdst, n)) { + return -1; + } + + memcpy((*pdst)->buf + (*pdst)->buf_used, src->buf + src->buf_start + src_offset, n); + (*pdst)->buf_used += n; + + return 0; +} + +/* Room for improvement: + Depending on ratio of flensed portion to buffer use, try to do less + copying-around of buffer ranges, by swapping buf pointers instead. +*/ +ssize_t buf_flense(buf_t *psrc, size_t src_offset, int delimiter, buf_t *pdst) { + const size_t delimiter_len = 1; + size_t i; + + assert(psrc != NULL); + assert(src_offset <= (*psrc)->buf_used); + + NOTIFY_DEBUG("src_offset:%zu", src_offset); + D_BUF("src ", *psrc); + D_BUF("dst ", pdst ? *pdst : NULL); + + for (i = src_offset; i < (*psrc)->buf_used; i++) { + if ((*psrc)->buf[(*psrc)->buf_start + i] == delimiter) { + + if (pdst != NULL) { + if (buf_range_dup_or_append((*psrc), 0, i, pdst)) { + return -1; + } + } + + (*psrc)->buf_start += i + delimiter_len; + (*psrc)->buf_used -= i + delimiter_len; + + D_BUF("src ", *psrc); + D_BUF("dst ", pdst ? *pdst : NULL); + return i + delimiter_len; + } + } + + return 0; +} + +#ifdef TEST + +static const char buf_flense_testdata1_[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers"; +static const char buf_flense_testdata2_[] = "\n\nfoo\nbar\n\n"; + +struct buf_flense_expected_result_ { + ssize_t r; + const char *buf; +} buf_flense_expected1[] = { + { 1 + 1, "a" }, + { 2 + 1, "bc" }, + { 3 + 1, "def" }, + { 4 + 1, "ghij" }, + { 5 + 1, "klmno" }, + { 3 + 1, "pqr" }, + { 1 + 1, "s" }, + { 3 + 1, "tuv" }, + { 5 + 1, "wxyz0" }, + { 4 + 1, "1234" }, + { 3 + 1, "567" }, + { 2 + 1, "89" }, + { 0, "0leftovers" }, +}, buf_flense_expected2[] = { + { 0 + 1, "" }, + { 0 + 1, "" }, + { 3 + 1, "foo" }, + { 3 + 1, "bar" }, + { 0 + 1, "" }, + { 0, "" }, +}; + +struct test_buf_flense_data_ { + const char *src; + const struct buf_flense_expected_result_ *expected; +} test_buf_flense_data1 = { + .src = buf_flense_testdata1_, + .expected = buf_flense_expected1, +}, test_buf_flense_data2 = { + .src = buf_flense_testdata2_, + .expected = buf_flense_expected2, +}; + +static +int test_buf_flense_(void *test_arg, void *suite_arg) { + (void)suite_arg; + struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg; + const char testdata_len = strlen(test_data->src); + const struct buf_flense_expected_result_ *next_result = test_data->expected; + int retval = 0; + buf_t src; + + TEST_INFO("initializing src buffer with %zu characters", testdata_len); + src = buf_new(testdata_len); + if (src == NULL) { + TEST_ERROR("%s:%s", "new_buf_", "failed"); + return -1; + } + memcpy(src->buf, test_data->src, testdata_len); + src->buf_used += testdata_len; + + D_BUF("src ", src); + + for (;;) { + ssize_t r; + buf_t dst = NULL; + + r = buf_flense(&src, 0, '\n', &dst); + if (r != next_result->r) { + TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r); + retval = -1; + } + if (r == 0) { + TEST_INFO("finished flensing"); + break; + } + + if (strlen(next_result->buf) > dst->buf_used) { + TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf); + D_BUF("dst ", dst); + retval = -1; + } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) { + TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf); + D_BUF("dst ", dst); + retval = -1; + } + + TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start); + + memset(dst, 0, dst->buf_sz + sizeof *dst); + free(dst); + dst = NULL; + + next_result++; + } + if (strlen(next_result->buf) > src->buf_used) { + TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf); + D_BUF("src ", src); + retval = -1; + } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) { + TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf); + D_BUF("src ", src); + retval = -1; + } + + TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start); + + memset(src, 0, src->buf_sz + sizeof *src); + free(src); + src = NULL; + + return retval; +} + +void *test_suite_data; + +int test_suite_pre(void *suite_data) { + (void)suite_data; + return 0; +} + +int test_suite_post(void *suite_data) { + (void)suite_data; + return 0; +} + +test_t test_suite[] = { + { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 }, + { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 }, + { NULL, NULL, NULL, NULL, NULL }, +}; + +#endif /* TEST */ diff --git a/buf.h b/buf.h new file mode 100644 index 0000000..a47f3ba --- /dev/null +++ b/buf.h @@ -0,0 +1,73 @@ +#ifndef BUF_H_B87OZOFU +#define BUF_H_B87OZOFU + +#include + +#ifndef NDEBUG +#include "notify.h" +#endif /* NDEBUG */ + +/* A simple sliding-window byte buffer. */ + +typedef struct buf_ { + size_t buf_sz; + size_t buf_start; + size_t buf_used; + unsigned char buf[]; +} *buf_t; +#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) + +#ifndef NDEBUG +#define D_BUF(__pre__,__b__,...) do {\ + if ( (__b__) == NULL )\ + NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\ + else {\ + NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\ + ## __VA_ARGS__,\ + (__b__),\ + (__b__)->buf_sz,\ + (__b__)->buf_start,\ + (__b__)->buf_used,\ + BUF_ROOM((__b__)),\ + (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\ + assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\ + }\ +} while (0) +#else /* NDEBUG */ +#define D_BUF(...) +#endif /* NDEBUG */ + +/* buf_new + Allocate and return a new buf_t capable of holding #sz bytes. +*/ +buf_t buf_new(size_t sz); + +/* buf_rebase + Reclaim any free space from the start of #buf, preserving active content. +*/ +void buf_rebase(buf_t buf); + +/* buf_makeroom + Assures that the buf pointed to by #pbuf has at space to hold #roomfor more + bytes. +*/ +int buf_makeroom(buf_t *pbuf, size_t roomfor); + +/* buf_range_dup_or_append + Starting at the #src_offset byte of #src, appends the following #n bytes to + the buffer pointed to by #dstp, which will be re/allocated if needed. +*/ +int buf_range_dup_or_append(buf_t src, size_t src_offset, size_t n, buf_t *pdst); + +/* buf_flense + Starting after #src_offset characters, scan through the buffer pointed + to by #psrc, stopping at the first byte matching #delimiter, whereupon, if + #pdst is not NULL, all the bytes previous to #delimiter are appended onto + the buffer pointed to by *#pdst. The buffer pointed to by #psrc is then + trimmed to only contain the bytes following #delimiter. The delimiter byte + is discarded. + Returns the number of characters flensed from #src. +*/ +ssize_t buf_flense(buf_t *psrc, size_t src_offset, int delimiter, buf_t *pdst); + +#endif /* BUF_H_B87OZOFU */ diff --git a/randomness.c b/randomness.c new file mode 100644 index 0000000..90b375a --- /dev/null +++ b/randomness.c @@ -0,0 +1,67 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "randomness.h" +#include "notify.h" + +static int randomness_fd_ = -1; + +int randomness_init(const char *filename) { + if (filename != NULL) { + randomness_fd_ = open(filename, O_RDONLY); + if (randomness_fd_ == -1) { + NOTIFY_ERROR("%s('%s'):%s", "open", filename, strerror(errno)); + return -1; + } + NOTIFY_DEBUG("reading randomness from '%s', fd:%d", filename, randomness_fd_); + } else { + if (randomness_fd_ != -1) { + close(randomness_fd_); + randomness_fd_ = -1; + } + srand48(time(NULL) ^ getpid()); + NOTIFY_DEBUG("reading randomness from system PRNG"); + } + + return 0; +} + +/* + Room for improvement: constrain bits of randomness consumed, based on #limit + Also maybe read chunks of randomness at a time + */ +unsigned long randomness_upto_inclusive(unsigned long limit) { + unsigned long randomness; + + if (limit == 0) + return 0; + + if (randomness_fd_ != -1) { + ssize_t len; + + do { + len = read(randomness_fd_, &randomness, sizeof randomness); + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); + if (len == sizeof randomness) { + randomness %= limit + 1; + NOTIFY_DEBUG("randomness:%lu", randomness); + return randomness; + } + NOTIFY_ERROR("%s(%d, %zu):%zd:%s", + "read", randomness_fd_, sizeof randomness, len, + (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read" ) + ); + } + + /* fall back to pseudo-randomness if read failed */ + randomness = mrand48(); + randomness %= limit + 1; + + NOTIFY_DEBUG("randomness:%lu", randomness); + return randomness; +} \ No newline at end of file diff --git a/randomness.h b/randomness.h new file mode 100644 index 0000000..24d7653 --- /dev/null +++ b/randomness.h @@ -0,0 +1,19 @@ +#ifndef RANDOMNESS_H_4LC721CM +#define RANDOMNESS_H_4LC721CM + +/* + Wrapper for wrangling random values. +*/ + +/* rand_init + Prepare to read randomness from #filename. + If filename is NULL, use system pseudorandom generator. +*/ +int randomness_init(const char *filename); + +/* randomness_upto_inclusive + Return a random number from zero up through #limit. +*/ +unsigned long randomness_upto_inclusive(unsigned long limit); + +#endif /* RANDOMNESS_H_4LC721CM */ diff --git a/reservoir.c b/reservoir.c new file mode 100644 index 0000000..1878203 --- /dev/null +++ b/reservoir.c @@ -0,0 +1,174 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "reservoir.h" +#include "notify.h" +#include "buf.h" +#include "randomness.h" +#include "test_suite.h" + +reservoir_t reservoir_new(size_t sz) { + reservoir_t reservoir; + + reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir); + if (reservoir == NULL) { + NOTIFY_ERROR("%s:%s", "malloc", strerror(errno)); + return NULL; + } + reservoir->reservoir_sz = sz; + reservoir->reservoir_used = 0; + reservoir->reservoir_tally = 0; + memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); + + return reservoir; +} + +int reservoir_grow(reservoir_t *preservoir, size_t growby) { + assert(preservoir != NULL); + + if (growby) { + void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir); + if (tmp_ptr == NULL) { + NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); + return -1; + } + *preservoir = tmp_ptr; + (*preservoir)->reservoir_sz += growby; + memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used); + } + + D_RESERVOIR(*preservoir); + + return 0; +} + +void reservoir_remember(reservoir_t reservoir, buf_t buf) { + buf_t old_buf; + + assert(reservoir != NULL); + + D_BUF("reserving ", buf); + + if (reservoir->reservoir_sz > 0) { + unsigned long randomness; + + if (reservoir->reservoir_used < reservoir->reservoir_sz) { + /* there are still unused slots, fill them up without discarding anything */ + /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ + randomness = randomness_upto_inclusive(reservoir->reservoir_used); + + assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */ + + NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used); + reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness]; + old_buf = NULL; /* no old entry to discard */ + reservoir->reservoir_used += 1; + } else { + randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1); + old_buf = reservoir->reservoir[randomness]; + } + NOTIFY_DEBUG("replacing reservoir index %zu", randomness); + reservoir->reservoir[randomness] = buf; + } else { + /* can't add anything to a zero-size reservoir, so just dispose of new item */ + old_buf = buf; + } + + reservoir->reservoir_tally += 1; + + if (old_buf != NULL) { + D_BUF("FREEING ", old_buf); + memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); + free(old_buf); + } + + D_RESERVOIR(reservoir); +} + +int reservoir_write(int fd, reservoir_t reservoir, char delim) { + ssize_t len; + size_t i; + struct iovec iov[2]; + + iov[1].iov_base = &delim; + iov[1].iov_len = sizeof delim; + + assert(reservoir != NULL); + D_RESERVOIR(reservoir); + + for (i = 0; i < reservoir->reservoir_sz; i++) { + if (reservoir->reservoir[i]) { + iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; + iov[0].iov_len = reservoir->reservoir[i]->buf_used; + } else { + iov[0].iov_base = NULL; + iov[0].iov_len = 0; + } + + do { + len = writev(fd, iov, sizeof iov / sizeof *iov); + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); + if (len < 0) { + NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); + return -1; + } + } + + return 0; +} + +#define META_BUF_SZ 128 +int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) { + char buf[META_BUF_SZ]; + int metalen; + ssize_t len; + + metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c", + reservoir->reservoir_sz, delim, + reservoir->reservoir_used, delim, + reservoir->reservoir_tally, delim, + samples, delim); + if ((size_t)metalen >= sizeof buf) { + NOTIFY_ERROR("out of buffer"); + return -1; + } + do { + len = write(fd, buf, metalen); + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); + if (len < metalen) { + if (len < 0) { + NOTIFY_ERROR("%s:%s", "write", strerror(errno)); + } + return -1; + } + + return 0; +} + +#ifdef TEST + +void *test_suite_data; + +int test_suite_pre(void *suite_data) { + (void)suite_data; + if (randomness_init(NULL)) { + return -1; + } + return 0; +} + +int test_suite_post(void *suite_data) { + (void)suite_data; + return 0; +} + +test_t test_suite[] = { + { NULL, NULL, NULL, NULL, NULL }, +}; + +#endif /* TEST */ diff --git a/reservoir.h b/reservoir.h new file mode 100644 index 0000000..c056ac5 --- /dev/null +++ b/reservoir.h @@ -0,0 +1,61 @@ +#ifndef RESERVOIR_H_MZSA8WJ3 +#define RESERVOIR_H_MZSA8WJ3 + +#include + +#include "buf.h" + +#ifndef NDEBUG +#include "notify.h" +#endif /* NDEBUG */ + +/* A pool of buf_t */ + +typedef struct reservoir_ { + size_t reservoir_sz; /* allocated slots */ + size_t reservoir_used; /* slots filled */ + unsigned long reservoir_tally; /* total number of items remembered */ + buf_t reservoir[]; +} *reservoir_t; + +#ifndef NDEBUG +#define D_RESERVOIR(__r__) do {\ + size_t i;\ + NOTIFY_DEBUG("reservoir:%p sz: %zu used:%zu tally:%zu", (__r__), (__r__)->reservoir_sz, (__r__)->reservoir_used, (__r__)->reservoir_tally);\ + for (i = 0; i < (__r__)->reservoir_sz; i++) {\ + D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\ + }\ +} while (0) +#else /* NDEBUG */ +#define D_RESERVOIR(...) +#endif /* NDEBUG */ + +/* reservoir_new + Allocate and return a new reservoir capable of holding #sz bufs. +*/ +reservoir_t reservoir_new(size_t sz); + +/* reservoir_grow + Increase the storage capacity of the reservoir pointed to by #preservoir + by #growby bufs. +*/ +int reservoir_grow(reservoir_t *preservoir, size_t growby); + +/* reservoir_remember + Remember #buf, forgetting another buf at random if the reservoir + pointed to by #preservoir is already full to capacity. +*/ +void reservoir_remember(reservoir_t reservoir, buf_t buf); + +/* reservoir_write + Write the contents of the bufs within #reservoir to #fd, each with a + trailing #delim. +*/ +int reservoir_write(int fd, reservoir_t reservoir, char delim); + +/* reservoir_write_meta + Write metadata of #reservoir and #samples to fd. +*/ +int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim); + +#endif /* RESERVOIR_H_MZSA8WJ3 */ diff --git a/reservoir_sample.c b/reservoir_sample.c index 167b5c9..6745062 100644 --- a/reservoir_sample.c +++ b/reservoir_sample.c @@ -1,22 +1,32 @@ /* reservoir_sample.c - This generates a randomized subset of its input, by means of reservoir- + This collates a randomized subset of its input, by means of reservoir- sampling, and a Fisher-Yates shuffle. */ +/* To do: + Allow user-supplied function (in relation to entry count) to define + grow-rate option, rather than static count. JIT would be nice there. + + The entire grow-rate thing is not very well thought-through, will bias + the results, and likely not very useful as currently implemented. +*/ + #include #include #include #include #include #include -#include -#include +#include +#include #include #include #include "version.h" #include "notify.h" -#include "test_suite.h" +#include "buf.h" +#include "randomness.h" +#include "reservoir.h" static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; @@ -24,63 +34,27 @@ static struct options_ { unsigned int verbosity; size_t read_buf_sz; size_t reservoir_sz; + unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */ + unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */ char delim; char delim_out; char *rand_file; + int status_fd; } options_ = { .verbosity = 0, .read_buf_sz = 8192, .reservoir_sz = 1, + .reservoir_grow_per = 0, .delim = '\n', .delim_out = '\n', - .rand_file = "/dev/random", + .rand_file = NULL, + .status_fd = STDERR_FILENO, }; -static int rand_fd_ = -1; - -/* a simple little sliding-window buffer */ -typedef struct buf_ { - size_t buf_sz; - size_t buf_start; - size_t buf_used; - unsigned char buf[]; -} *buf_t; -#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) - -typedef struct reservoir_ { - size_t reservoir_insertions; /* how many items have ever been added to reservoir */ - size_t reservoir_sz; - buf_t reservoir[]; -} *reservoir_t; - -#ifndef NDEBUG -#define D_BUF(__pre__,__b__,...) do {\ - if ( (__b__) == NULL )\ - NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\ - else {\ - NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\ - ## __VA_ARGS__,\ - (__b__),\ - (__b__)->buf_sz,\ - (__b__)->buf_start,\ - (__b__)->buf_used,\ - BUF_ROOM((__b__)),\ - (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\ - assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\ - }\ -} while (0) -#define D_RESERVOIR(__r__) do {\ - size_t i;\ - for (i = 0; i < (__r__)->reservoir_sz; i++) {\ - D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\ - }\ - NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\ -} while (0) -#else -#define D_RESERVOIR(...) -#define D_BUF(...) -#endif - +/* #status_requested_ will be set whenever a view of the current sample + reservoir is desired. +*/ +static int status_requested_ = 0; static void usage_(const char *prog, unsigned int full) { @@ -99,380 +73,94 @@ void usage_(const char *prog, unsigned int full) { if (full) { fprintf(f, "\nOptions:\n" - "\t-n -- returns samples [default: %zu]\n" - "\t-d -- use as input delimiter [default: '\\%03hho']\n" - "\t-r -- read randomness from [default: '%s']\n" - "\t-b -- read buffer size [default: %zu]\n" - "\t-v -- increase verbosity\n" - "\t-h -- this screen\n", + "\t-n -- returns samples [default: %zu]\n" + "\t-d -- use as input and output delimiter [default: '\\%03hho']\n" + "\t-r -- read randomness from [default: '%s']\n" + "\t-b -- set input buffer size [default: %zu]\n" + "\t-i -- grow reservoir by 1 for every input samples (0 inhibits behavior) [default: %lu]\n" + "\t-h -- double the reservoir growth interval every input samples (0 inhibits behavior) [default: %lu]\n " + "\t-s -- USR1 signals will write reservoir contents to rather than stderr (has no effect on normal output to stdout upon input EOF)\n" + "\t-v -- increase verbosity\n" + "\t-h -- this screen\n", options_.reservoir_sz, options_.delim, - options_.rand_file, - options_.read_buf_sz); + options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)", + options_.read_buf_sz, + options_.reservoir_grow_double_per, + options_.reservoir_grow_per); fprintf(f, "\n%78s\n", src_id_); } fflush(f); } - -/* rand_upto_inclusive_ - Room for improvement: constrain bits of randomness consumed, based on #limit - also maybe read chunks of randomness at a time - */ -static -unsigned long rand_upto_inclusive_(unsigned long limit) { - unsigned long randomness; - - if (limit == 0) - return 0; - - if (rand_fd_ != -1) { - ssize_t len; - - len = read(rand_fd_, &randomness, sizeof randomness); - if (len == sizeof randomness) { - randomness %= limit + 1; - - return randomness; - } - NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") ); - } - - /* fall back to dumb randomness */ - randomness = mrand48(); - randomness %= limit + 1; - - return randomness; -} - - -static -int rand_init_(char *rand_file) { - srand48(time(NULL) ^ getpid()); - if (rand_file) { - rand_fd_ = open(rand_file, O_RDONLY); - if (rand_fd_ == -1) { - NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno)); - return -1; - } - } - return 0; -} - - -static -buf_t buf_new_(size_t sz) { - buf_t buf = malloc(sz + sizeof *buf); - if (buf) { - buf->buf_sz = sz; - buf->buf_start = buf->buf_used = 0; - memset(buf->buf, 0, sz); - } - return buf; -} - - -static -void buf_rebase_(buf_t buf) { - if (buf->buf_start == 0) - return; - memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used); - buf->buf_start = 0; -} - - -static -int buf_makeroom_(buf_t *pbuf, size_t roomfor) { - size_t new_sz; - void *tmp_ptr; - - assert(pbuf != NULL); - - if (*pbuf == NULL) { - *pbuf = buf_new_(roomfor); - if (*pbuf == NULL) { - return -1; - } - } - - buf_rebase_(*pbuf); - - if (BUF_ROOM(*pbuf) >= roomfor) - return 0; - - new_sz = (*pbuf)->buf_used + roomfor; - tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf); - if (tmp_ptr == NULL) { - NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); - return -1; - } - *pbuf = tmp_ptr; - (*pbuf)->buf_sz = new_sz; - - return 0; -} - - -static -int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) { - assert(src != NULL); - assert(dst != NULL); - assert(src_skip + n <= src->buf_used); - - if (buf_makeroom_(dst, n)) { - return -1; - } - - memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n); - (*dst)->buf_used += n; - - return 0; -} - - -/* buf_flense_ - Starting after #src_offset characters, scan through #src, stopping at - the first character matching #delimiter, whereupon all the characters - leading up to #delimiter are copied into *#dst if #dst is not NULL. #src - becomes the characters following #delimiter. - Returns the number of characters flensed from #src. - - Room for improvement: - If flensed segment is more than half the buffer, copy remainder of src - into dst, then return src, leaving dst in its place. -*/ -static -ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) { - const size_t delimiter_len = 1; - size_t i; - - assert(src != NULL); - assert(src_offset <= (*src)->buf_used); - - NOTIFY_DEBUG("src_offset:%zu", src_offset); - D_BUF("src ", *src); - D_BUF("dst ", dst ? *dst : NULL); - - for (i = src_offset; i < (*src)->buf_used; i++) { - if ((*src)->buf[(*src)->buf_start + i] == delimiter) { - - if (dst != NULL) { - if (buf_range_dup_or_append_((*src), 0, i, dst)) { - return -1; - } - } - - (*src)->buf_start += i + delimiter_len; - (*src)->buf_used -= i + delimiter_len; - - D_BUF("src ", *src); - D_BUF("dst ", dst ? *dst : NULL); - return i + delimiter_len; - } - } - - return 0; -} - -#ifdef TEST - -static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers"; -static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n"; - -struct buf_flense_expected_result_ { - ssize_t r; - const char *buf; -} buf_flense_expected1[] = { - { 1 + 1, "a" }, - { 2 + 1, "bc" }, - { 3 + 1, "def" }, - { 4 + 1, "ghij" }, - { 5 + 1, "klmno" }, - { 3 + 1, "pqr" }, - { 1 + 1, "s" }, - { 3 + 1, "tuv" }, - { 5 + 1, "wxyz0" }, - { 4 + 1, "1234" }, - { 3 + 1, "567" }, - { 2 + 1, "89" }, - { 0, "0leftovers" }, -}, buf_flense_expected2[] = { - { 0 + 1, "" }, - { 0 + 1, "" }, - { 3 + 1, "foo" }, - { 3 + 1, "bar" }, - { 0 + 1, "" }, - { 0, "" }, -}; - -struct test_buf_flense_data_ { - const char *src; - const struct buf_flense_expected_result_ *expected; -} test_buf_flense_data1 = { - .src = buf_flense_testdata1, - .expected = buf_flense_expected1, -}, test_buf_flense_data2 = { - .src = buf_flense_testdata2, - .expected = buf_flense_expected2, -}; - -static int test_buf_flense_(void *test_arg, void *suite_arg) { - (void)suite_arg; - struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg; - const char testdata_len = strlen(test_data->src); - const struct buf_flense_expected_result_ *next_result = test_data->expected; - int retval = 0; - buf_t src; - - TEST_INFO("initializing src buffer with %zu characters", testdata_len); - src = buf_new_(testdata_len); - if (src == NULL) { - TEST_ERROR("%s:%s", "new_buf_", "failed"); - return -1; - } - memcpy(src->buf, test_data->src, testdata_len); - src->buf_used += testdata_len; - - D_BUF("src ", src); - - for (;;) { - ssize_t r; - buf_t dst = NULL; - - r = buf_flense_(&src, 0, '\n', &dst); - if (r != next_result->r) { - TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r); - retval = -1; - } - if (r == 0) { - TEST_INFO("finished flensing"); - break; - } - - if (strlen(next_result->buf) > dst->buf_used) { - TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf); - D_BUF("dst ", dst); - retval = -1; - } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) { - TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf); - D_BUF("dst ", dst); - retval = -1; - } - - TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start); - - memset(dst, 0, dst->buf_sz + sizeof *dst); - free(dst); - dst = NULL; - - next_result++; - } - if (strlen(next_result->buf) > src->buf_used) { - TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf); - D_BUF("src ", src); - retval = -1; - } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) { - TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf); - D_BUF("src ", src); - retval = -1; - } - - TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start); - - memset(src, 0, src->buf_sz + sizeof *src); - free(src); - src = NULL; - - return retval; -} -#endif /* TEST */ - - -/* reservoir_remember_ - Remember #line, forgetting a line if more than #num_lines have already - been remembered. Remembers them in random order. +/* request_snapshot_ + Signal handler to be bound to USR1. + Upon receiving a signal, take note that a snapshot of the current state + has been requested. */ static -void reservoir_remember_(reservoir_t reservoir, buf_t buf) { - buf_t old_buf; - - assert(reservoir != NULL); - - D_BUF("reserving ", buf); - - if (reservoir->reservoir_sz > 0) { - unsigned long randomness; - - if (reservoir->reservoir_insertions < reservoir->reservoir_sz) { - /* there are still unused slots, fill them up without discarding anything */ - /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ - randomness = rand_upto_inclusive_(reservoir->reservoir_insertions); - - assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */ - - NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions); - reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness]; - old_buf = NULL; /* no old entry to discard */ - } else { - randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1); - old_buf = reservoir->reservoir[randomness]; - } - NOTIFY_DEBUG("replacing reservoir index %zu", randomness); - reservoir->reservoir[randomness] = buf; - } else { - /* can't add anything to a zero-size reservoir, so just dispose of new item */ - old_buf = buf; - } - - reservoir->reservoir_insertions += 1; - - if (old_buf != NULL) { - D_BUF("FREEING ", old_buf); - memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); - free(old_buf); - } - - D_RESERVOIR(reservoir); +void request_snapshot_(int signum) { + (void)signum; + status_requested_ = 1; } - -/* reservoir_read_ +/* accumulate_input_ Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an accumulator buffer. For each #delimiter character found in what was just read, occasionally remember the preceeding characters. */ static -int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) { +int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) { buf_t read_buf, new_buf = NULL; - size_t line_scanned; /* how much of the buffer has already been searched for delimiter */ + size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; + unsigned long grow_count = 0; + unsigned long grow_grow_count = 0; assert(read_block_sz > 0); - assert(num_lines != NULL); - assert(reservoir != NULL); + assert(psamples != NULL); + assert(preservoir != NULL); + assert(*preservoir != NULL); if (fd < 0) { return -1; } - read_buf = buf_new_(read_block_sz); + read_buf = buf_new(read_block_sz); if (read_buf == NULL) { return -1; } - line_scanned = 0; + bytes_scanned = 0; + /* begin accumulating */ for (;;) { NOTIFY_DEBUG("read loop\n\n"); - if (buf_makeroom_(&read_buf, read_block_sz)) { + /* make sure there's enough room in our input buffer for a full read() */ + if (buf_makeroom(&read_buf, read_block_sz)) { free(read_buf); free(new_buf); return -1; } - NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf)); - len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf)); + NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf)); + do { + len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf)); + + /* a signal may have interrupted read(), deal with that before + doing anything else */ + if (status_requested_) { + status_requested_ = 0; + NOTIFY_DEBUG("dumping reservoir due to signal"); + if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) { + NOTIFY_ERROR("failed to output current reservoir contents"); + } + if (options_.verbosity) { + reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out); + } + } + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < 0) { NOTIFY_ERROR("%s:%s", "read", strerror(errno)); free(read_buf); @@ -492,22 +180,46 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin NOTIFY_DEBUG("len:%zd", len); + if (options_.reservoir_grow_per + && grow_count >= options_.reservoir_grow_per) { + NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1); + grow_count = 0; + if (reservoir_grow(preservoir, 1)) { + NOTIFY_ERROR("failed to increase reservoir size"); + free(read_buf); + return -1; + } + } + + if (options_.reservoir_grow_double_per + && grow_grow_count >= options_.reservoir_grow_double_per) { + if (grow_count > ULONG_MAX / 2) { + /* would overflow, never grow again */ + grow_count = 0; + grow_grow_count = 0; + NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue"); + } else { + NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2); + grow_count *= 2; + } + } + /* determine if we want to save the next buffer */ if (new_buf == NULL) { /* if new_buf is not null, we already want to save the next one.. */ /* otherwise, save if we've read in fewer lines than the reservoir holds */ - /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */ + /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */ - if (*num_lines < reservoir->reservoir_sz) { - NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz); + if (*psamples < (*preservoir)->reservoir_sz) { + NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz); } else { - NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz); + NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz); } - if (*num_lines < reservoir->reservoir_sz - || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) { - NOTIFY_DEBUG("next buffer will be reserved.."); - new_buf = buf_new_(0); + if (*psamples < (*preservoir)->reservoir_sz + || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) { + NOTIFY_DEBUG("next buffer will be remembered.."); + new_buf = buf_new(0); if (new_buf == NULL) { free(read_buf); return -1; @@ -517,7 +229,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin } } - len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL); + len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL); if (len_flensed < 0) { free(read_buf); free(new_buf); @@ -526,23 +238,25 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin if (len_flensed == 0) { /* no delimiter found yet, stop parsing and read more */ NOTIFY_DEBUG("no delim found after %zd", len); - line_scanned = len; - buf_rebase_(read_buf); + bytes_scanned = len; + buf_rebase(read_buf); break; } len -= len_flensed; - line_scanned = 0; + bytes_scanned = 0; D_BUF("read_buf: ", read_buf); if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); - reservoir_remember_(reservoir, new_buf); + reservoir_remember((*preservoir), new_buf); new_buf = NULL; } - *num_lines += 1; + *psamples += 1; + grow_count += 1; + grow_grow_count += 1; } } @@ -552,12 +266,12 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin D_BUF("leftovers: ", read_buf); if (new_buf != NULL - || *num_lines < reservoir->reservoir_sz) { - reservoir_remember_(reservoir, read_buf); + || *psamples < (*preservoir)->reservoir_sz) { + reservoir_remember((*preservoir), read_buf); read_buf = NULL; } - *num_lines += 1; + *psamples += 1; } free(read_buf); @@ -567,65 +281,45 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin } -/* reservoir_write_ - Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots. -*/ -static -int reservoir_write_(int fd, reservoir_t reservoir, char delim) { - ssize_t len; - size_t i; - struct iovec iov[2]; - - iov[1].iov_base = &delim; - iov[1].iov_len = sizeof delim; - - assert(reservoir != NULL); - D_RESERVOIR(reservoir); - - for (i = 0; i < reservoir->reservoir_sz; i++) { - if (reservoir->reservoir[i]) { - iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; - iov[0].iov_len = reservoir->reservoir[i]->buf_used; - } else { - iov[0].iov_base = NULL; - iov[0].iov_len = 0; - } - - len = writev(fd, iov, sizeof iov / sizeof *iov); - if (len < 0) { - NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); - return -1; - } - } - - return 0; -} - -#ifndef TEST - int main(int argc, char *argv[]) { + struct sigaction sa; + char *status_filename = NULL; reservoir_t reservoir; - size_t num_lines = 0; - int fd; + unsigned long num_samples = 0; int c; - while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) { + while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; break; case 'b': - options_.read_buf_sz = atoi(optarg); + options_.read_buf_sz = strtoul(optarg, NULL, 0); + /* XXX: validate */ break; case 'd': options_.delim = *optarg; + /* @fallthrough@ */ + case 'D': options_.delim_out = *optarg; break; + case 'i': + options_.reservoir_grow_per = strtoul(optarg, NULL, 0); + break; + + case 'j': + options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0); + break; + + case 's': + status_filename = optarg; + break; + case 'n': - options_.reservoir_sz = atoi(optarg); + options_.reservoir_sz = strtoul(optarg, NULL, 0); break; case 'r': @@ -642,44 +336,59 @@ int main(int argc, char *argv[]) { } } +#if 0 + /* zero-or-more arguments required */ + /* if that ever changes... */ if (argc - optind < 0) { usage_(argv[0], 0); exit(EX_USAGE); } +#endif - if (rand_init_(options_.rand_file)) { + if (randomness_init(options_.rand_file)) { NOTIFY_ERROR("failed to initialize randomness source\n"); exit(EX_NOINPUT); } - reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir); + reservoir = reservoir_new(options_.reservoir_sz); if (reservoir == NULL) { - NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + NOTIFY_ERROR("could not create new reservoir"); + exit(EX_OSERR); + } + + if (status_filename) { + options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT); + if (options_.status_fd < 0) { + NOTIFY_ERROR("could not open status file '%s'", status_filename); + exit(EX_OSERR); + } + } + + sa.sa_handler = request_snapshot_; + (void)sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + if (sigaction(SIGUSR1, &sa, NULL)) { + NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno)); exit(EX_OSERR); } - reservoir->reservoir_sz = options_.reservoir_sz; - reservoir->reservoir_insertions = 0; - memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); if (argc - optind == 0) { - fd = STDIN_FILENO; - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { while (optind < argc) { if (strcmp("-", argv[optind]) == 0) { - fd = STDIN_FILENO; - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { - fd = open(argv[optind], O_RDONLY); + int fd = open(argv[optind], O_RDONLY); if (fd < 0) { NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); exit(EX_NOINPUT); } - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } if (close(fd)) { @@ -692,41 +401,16 @@ int main(int argc, char *argv[]) { } if (options_.verbosity) { - fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines); + fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples); } if (options_.verbosity > 1) { - fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions); + fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally); } - if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) { + if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } exit(EX_OK); } - -#else /* TEST */ - -void *test_suite_data; - -int test_suite_pre(void *suite_data) { - (void)suite_data; - if (rand_init_(NULL)) { - return -1; - } - return 0; -} -int test_suite_post(void *suite_data) { - (void)suite_data; - return 0; -} - -test_t test_suite[] = { - { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 }, - { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 }, - { NULL, NULL, NULL, NULL, NULL }, -}; - - -#endif /* TEST */ -- 2.45.2