X-Git-Url: http://git.squeep.com/?p=reservoir_sample;a=blobdiff_plain;f=reservoir_sample.c;fp=reservoir_sample.c;h=674506250024495fdd8d50bd40899f80985b667e;hp=167b5c9531aab1960d0d7fdc35a7daecc3bc52aa;hb=9b5d13ce510e4668d165c0b5ede7fd7f74adcbfc;hpb=c294f0883b05016744fcbfc83241bbb5133a2cb9 diff --git a/reservoir_sample.c b/reservoir_sample.c index 167b5c9..6745062 100644 --- a/reservoir_sample.c +++ b/reservoir_sample.c @@ -1,22 +1,32 @@ /* reservoir_sample.c - This generates a randomized subset of its input, by means of reservoir- + This collates a randomized subset of its input, by means of reservoir- sampling, and a Fisher-Yates shuffle. */ +/* To do: + Allow user-supplied function (in relation to entry count) to define + grow-rate option, rather than static count. JIT would be nice there. + + The entire grow-rate thing is not very well thought-through, will bias + the results, and likely not very useful as currently implemented. +*/ + #include #include #include #include #include #include -#include -#include +#include +#include #include #include #include "version.h" #include "notify.h" -#include "test_suite.h" +#include "buf.h" +#include "randomness.h" +#include "reservoir.h" static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; @@ -24,63 +34,27 @@ static struct options_ { unsigned int verbosity; size_t read_buf_sz; size_t reservoir_sz; + unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */ + unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */ char delim; char delim_out; char *rand_file; + int status_fd; } options_ = { .verbosity = 0, .read_buf_sz = 8192, .reservoir_sz = 1, + .reservoir_grow_per = 0, .delim = '\n', .delim_out = '\n', - .rand_file = "/dev/random", + .rand_file = NULL, + .status_fd = STDERR_FILENO, }; -static int rand_fd_ = -1; - -/* a simple little sliding-window buffer */ -typedef struct buf_ { - size_t buf_sz; - size_t buf_start; - size_t buf_used; - unsigned char buf[]; -} *buf_t; -#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) - -typedef struct reservoir_ { - size_t reservoir_insertions; /* how many items have ever been added to reservoir */ - size_t reservoir_sz; - buf_t reservoir[]; -} *reservoir_t; - -#ifndef NDEBUG -#define D_BUF(__pre__,__b__,...) do {\ - if ( (__b__) == NULL )\ - NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\ - else {\ - NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\ - ## __VA_ARGS__,\ - (__b__),\ - (__b__)->buf_sz,\ - (__b__)->buf_start,\ - (__b__)->buf_used,\ - BUF_ROOM((__b__)),\ - (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\ - assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\ - }\ -} while (0) -#define D_RESERVOIR(__r__) do {\ - size_t i;\ - for (i = 0; i < (__r__)->reservoir_sz; i++) {\ - D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\ - }\ - NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\ -} while (0) -#else -#define D_RESERVOIR(...) -#define D_BUF(...) -#endif - +/* #status_requested_ will be set whenever a view of the current sample + reservoir is desired. +*/ +static int status_requested_ = 0; static void usage_(const char *prog, unsigned int full) { @@ -99,380 +73,94 @@ void usage_(const char *prog, unsigned int full) { if (full) { fprintf(f, "\nOptions:\n" - "\t-n -- returns samples [default: %zu]\n" - "\t-d -- use as input delimiter [default: '\\%03hho']\n" - "\t-r -- read randomness from [default: '%s']\n" - "\t-b -- read buffer size [default: %zu]\n" - "\t-v -- increase verbosity\n" - "\t-h -- this screen\n", + "\t-n -- returns samples [default: %zu]\n" + "\t-d -- use as input and output delimiter [default: '\\%03hho']\n" + "\t-r -- read randomness from [default: '%s']\n" + "\t-b -- set input buffer size [default: %zu]\n" + "\t-i -- grow reservoir by 1 for every input samples (0 inhibits behavior) [default: %lu]\n" + "\t-h -- double the reservoir growth interval every input samples (0 inhibits behavior) [default: %lu]\n " + "\t-s -- USR1 signals will write reservoir contents to rather than stderr (has no effect on normal output to stdout upon input EOF)\n" + "\t-v -- increase verbosity\n" + "\t-h -- this screen\n", options_.reservoir_sz, options_.delim, - options_.rand_file, - options_.read_buf_sz); + options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)", + options_.read_buf_sz, + options_.reservoir_grow_double_per, + options_.reservoir_grow_per); fprintf(f, "\n%78s\n", src_id_); } fflush(f); } - -/* rand_upto_inclusive_ - Room for improvement: constrain bits of randomness consumed, based on #limit - also maybe read chunks of randomness at a time - */ -static -unsigned long rand_upto_inclusive_(unsigned long limit) { - unsigned long randomness; - - if (limit == 0) - return 0; - - if (rand_fd_ != -1) { - ssize_t len; - - len = read(rand_fd_, &randomness, sizeof randomness); - if (len == sizeof randomness) { - randomness %= limit + 1; - - return randomness; - } - NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") ); - } - - /* fall back to dumb randomness */ - randomness = mrand48(); - randomness %= limit + 1; - - return randomness; -} - - -static -int rand_init_(char *rand_file) { - srand48(time(NULL) ^ getpid()); - if (rand_file) { - rand_fd_ = open(rand_file, O_RDONLY); - if (rand_fd_ == -1) { - NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno)); - return -1; - } - } - return 0; -} - - -static -buf_t buf_new_(size_t sz) { - buf_t buf = malloc(sz + sizeof *buf); - if (buf) { - buf->buf_sz = sz; - buf->buf_start = buf->buf_used = 0; - memset(buf->buf, 0, sz); - } - return buf; -} - - -static -void buf_rebase_(buf_t buf) { - if (buf->buf_start == 0) - return; - memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used); - buf->buf_start = 0; -} - - -static -int buf_makeroom_(buf_t *pbuf, size_t roomfor) { - size_t new_sz; - void *tmp_ptr; - - assert(pbuf != NULL); - - if (*pbuf == NULL) { - *pbuf = buf_new_(roomfor); - if (*pbuf == NULL) { - return -1; - } - } - - buf_rebase_(*pbuf); - - if (BUF_ROOM(*pbuf) >= roomfor) - return 0; - - new_sz = (*pbuf)->buf_used + roomfor; - tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf); - if (tmp_ptr == NULL) { - NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); - return -1; - } - *pbuf = tmp_ptr; - (*pbuf)->buf_sz = new_sz; - - return 0; -} - - -static -int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) { - assert(src != NULL); - assert(dst != NULL); - assert(src_skip + n <= src->buf_used); - - if (buf_makeroom_(dst, n)) { - return -1; - } - - memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n); - (*dst)->buf_used += n; - - return 0; -} - - -/* buf_flense_ - Starting after #src_offset characters, scan through #src, stopping at - the first character matching #delimiter, whereupon all the characters - leading up to #delimiter are copied into *#dst if #dst is not NULL. #src - becomes the characters following #delimiter. - Returns the number of characters flensed from #src. - - Room for improvement: - If flensed segment is more than half the buffer, copy remainder of src - into dst, then return src, leaving dst in its place. -*/ -static -ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) { - const size_t delimiter_len = 1; - size_t i; - - assert(src != NULL); - assert(src_offset <= (*src)->buf_used); - - NOTIFY_DEBUG("src_offset:%zu", src_offset); - D_BUF("src ", *src); - D_BUF("dst ", dst ? *dst : NULL); - - for (i = src_offset; i < (*src)->buf_used; i++) { - if ((*src)->buf[(*src)->buf_start + i] == delimiter) { - - if (dst != NULL) { - if (buf_range_dup_or_append_((*src), 0, i, dst)) { - return -1; - } - } - - (*src)->buf_start += i + delimiter_len; - (*src)->buf_used -= i + delimiter_len; - - D_BUF("src ", *src); - D_BUF("dst ", dst ? *dst : NULL); - return i + delimiter_len; - } - } - - return 0; -} - -#ifdef TEST - -static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers"; -static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n"; - -struct buf_flense_expected_result_ { - ssize_t r; - const char *buf; -} buf_flense_expected1[] = { - { 1 + 1, "a" }, - { 2 + 1, "bc" }, - { 3 + 1, "def" }, - { 4 + 1, "ghij" }, - { 5 + 1, "klmno" }, - { 3 + 1, "pqr" }, - { 1 + 1, "s" }, - { 3 + 1, "tuv" }, - { 5 + 1, "wxyz0" }, - { 4 + 1, "1234" }, - { 3 + 1, "567" }, - { 2 + 1, "89" }, - { 0, "0leftovers" }, -}, buf_flense_expected2[] = { - { 0 + 1, "" }, - { 0 + 1, "" }, - { 3 + 1, "foo" }, - { 3 + 1, "bar" }, - { 0 + 1, "" }, - { 0, "" }, -}; - -struct test_buf_flense_data_ { - const char *src; - const struct buf_flense_expected_result_ *expected; -} test_buf_flense_data1 = { - .src = buf_flense_testdata1, - .expected = buf_flense_expected1, -}, test_buf_flense_data2 = { - .src = buf_flense_testdata2, - .expected = buf_flense_expected2, -}; - -static int test_buf_flense_(void *test_arg, void *suite_arg) { - (void)suite_arg; - struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg; - const char testdata_len = strlen(test_data->src); - const struct buf_flense_expected_result_ *next_result = test_data->expected; - int retval = 0; - buf_t src; - - TEST_INFO("initializing src buffer with %zu characters", testdata_len); - src = buf_new_(testdata_len); - if (src == NULL) { - TEST_ERROR("%s:%s", "new_buf_", "failed"); - return -1; - } - memcpy(src->buf, test_data->src, testdata_len); - src->buf_used += testdata_len; - - D_BUF("src ", src); - - for (;;) { - ssize_t r; - buf_t dst = NULL; - - r = buf_flense_(&src, 0, '\n', &dst); - if (r != next_result->r) { - TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r); - retval = -1; - } - if (r == 0) { - TEST_INFO("finished flensing"); - break; - } - - if (strlen(next_result->buf) > dst->buf_used) { - TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf); - D_BUF("dst ", dst); - retval = -1; - } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) { - TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf); - D_BUF("dst ", dst); - retval = -1; - } - - TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start); - - memset(dst, 0, dst->buf_sz + sizeof *dst); - free(dst); - dst = NULL; - - next_result++; - } - if (strlen(next_result->buf) > src->buf_used) { - TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf); - D_BUF("src ", src); - retval = -1; - } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) { - TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf); - D_BUF("src ", src); - retval = -1; - } - - TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start); - - memset(src, 0, src->buf_sz + sizeof *src); - free(src); - src = NULL; - - return retval; -} -#endif /* TEST */ - - -/* reservoir_remember_ - Remember #line, forgetting a line if more than #num_lines have already - been remembered. Remembers them in random order. +/* request_snapshot_ + Signal handler to be bound to USR1. + Upon receiving a signal, take note that a snapshot of the current state + has been requested. */ static -void reservoir_remember_(reservoir_t reservoir, buf_t buf) { - buf_t old_buf; - - assert(reservoir != NULL); - - D_BUF("reserving ", buf); - - if (reservoir->reservoir_sz > 0) { - unsigned long randomness; - - if (reservoir->reservoir_insertions < reservoir->reservoir_sz) { - /* there are still unused slots, fill them up without discarding anything */ - /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ - randomness = rand_upto_inclusive_(reservoir->reservoir_insertions); - - assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */ - - NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions); - reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness]; - old_buf = NULL; /* no old entry to discard */ - } else { - randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1); - old_buf = reservoir->reservoir[randomness]; - } - NOTIFY_DEBUG("replacing reservoir index %zu", randomness); - reservoir->reservoir[randomness] = buf; - } else { - /* can't add anything to a zero-size reservoir, so just dispose of new item */ - old_buf = buf; - } - - reservoir->reservoir_insertions += 1; - - if (old_buf != NULL) { - D_BUF("FREEING ", old_buf); - memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); - free(old_buf); - } - - D_RESERVOIR(reservoir); +void request_snapshot_(int signum) { + (void)signum; + status_requested_ = 1; } - -/* reservoir_read_ +/* accumulate_input_ Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an accumulator buffer. For each #delimiter character found in what was just read, occasionally remember the preceeding characters. */ static -int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) { +int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) { buf_t read_buf, new_buf = NULL; - size_t line_scanned; /* how much of the buffer has already been searched for delimiter */ + size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; + unsigned long grow_count = 0; + unsigned long grow_grow_count = 0; assert(read_block_sz > 0); - assert(num_lines != NULL); - assert(reservoir != NULL); + assert(psamples != NULL); + assert(preservoir != NULL); + assert(*preservoir != NULL); if (fd < 0) { return -1; } - read_buf = buf_new_(read_block_sz); + read_buf = buf_new(read_block_sz); if (read_buf == NULL) { return -1; } - line_scanned = 0; + bytes_scanned = 0; + /* begin accumulating */ for (;;) { NOTIFY_DEBUG("read loop\n\n"); - if (buf_makeroom_(&read_buf, read_block_sz)) { + /* make sure there's enough room in our input buffer for a full read() */ + if (buf_makeroom(&read_buf, read_block_sz)) { free(read_buf); free(new_buf); return -1; } - NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf)); - len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf)); + NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf)); + do { + len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf)); + + /* a signal may have interrupted read(), deal with that before + doing anything else */ + if (status_requested_) { + status_requested_ = 0; + NOTIFY_DEBUG("dumping reservoir due to signal"); + if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) { + NOTIFY_ERROR("failed to output current reservoir contents"); + } + if (options_.verbosity) { + reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out); + } + } + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < 0) { NOTIFY_ERROR("%s:%s", "read", strerror(errno)); free(read_buf); @@ -492,22 +180,46 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin NOTIFY_DEBUG("len:%zd", len); + if (options_.reservoir_grow_per + && grow_count >= options_.reservoir_grow_per) { + NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1); + grow_count = 0; + if (reservoir_grow(preservoir, 1)) { + NOTIFY_ERROR("failed to increase reservoir size"); + free(read_buf); + return -1; + } + } + + if (options_.reservoir_grow_double_per + && grow_grow_count >= options_.reservoir_grow_double_per) { + if (grow_count > ULONG_MAX / 2) { + /* would overflow, never grow again */ + grow_count = 0; + grow_grow_count = 0; + NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue"); + } else { + NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2); + grow_count *= 2; + } + } + /* determine if we want to save the next buffer */ if (new_buf == NULL) { /* if new_buf is not null, we already want to save the next one.. */ /* otherwise, save if we've read in fewer lines than the reservoir holds */ - /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */ + /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */ - if (*num_lines < reservoir->reservoir_sz) { - NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz); + if (*psamples < (*preservoir)->reservoir_sz) { + NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz); } else { - NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz); + NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz); } - if (*num_lines < reservoir->reservoir_sz - || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) { - NOTIFY_DEBUG("next buffer will be reserved.."); - new_buf = buf_new_(0); + if (*psamples < (*preservoir)->reservoir_sz + || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) { + NOTIFY_DEBUG("next buffer will be remembered.."); + new_buf = buf_new(0); if (new_buf == NULL) { free(read_buf); return -1; @@ -517,7 +229,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin } } - len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL); + len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL); if (len_flensed < 0) { free(read_buf); free(new_buf); @@ -526,23 +238,25 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin if (len_flensed == 0) { /* no delimiter found yet, stop parsing and read more */ NOTIFY_DEBUG("no delim found after %zd", len); - line_scanned = len; - buf_rebase_(read_buf); + bytes_scanned = len; + buf_rebase(read_buf); break; } len -= len_flensed; - line_scanned = 0; + bytes_scanned = 0; D_BUF("read_buf: ", read_buf); if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); - reservoir_remember_(reservoir, new_buf); + reservoir_remember((*preservoir), new_buf); new_buf = NULL; } - *num_lines += 1; + *psamples += 1; + grow_count += 1; + grow_grow_count += 1; } } @@ -552,12 +266,12 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin D_BUF("leftovers: ", read_buf); if (new_buf != NULL - || *num_lines < reservoir->reservoir_sz) { - reservoir_remember_(reservoir, read_buf); + || *psamples < (*preservoir)->reservoir_sz) { + reservoir_remember((*preservoir), read_buf); read_buf = NULL; } - *num_lines += 1; + *psamples += 1; } free(read_buf); @@ -567,65 +281,45 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin } -/* reservoir_write_ - Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots. -*/ -static -int reservoir_write_(int fd, reservoir_t reservoir, char delim) { - ssize_t len; - size_t i; - struct iovec iov[2]; - - iov[1].iov_base = &delim; - iov[1].iov_len = sizeof delim; - - assert(reservoir != NULL); - D_RESERVOIR(reservoir); - - for (i = 0; i < reservoir->reservoir_sz; i++) { - if (reservoir->reservoir[i]) { - iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; - iov[0].iov_len = reservoir->reservoir[i]->buf_used; - } else { - iov[0].iov_base = NULL; - iov[0].iov_len = 0; - } - - len = writev(fd, iov, sizeof iov / sizeof *iov); - if (len < 0) { - NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); - return -1; - } - } - - return 0; -} - -#ifndef TEST - int main(int argc, char *argv[]) { + struct sigaction sa; + char *status_filename = NULL; reservoir_t reservoir; - size_t num_lines = 0; - int fd; + unsigned long num_samples = 0; int c; - while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) { + while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; break; case 'b': - options_.read_buf_sz = atoi(optarg); + options_.read_buf_sz = strtoul(optarg, NULL, 0); + /* XXX: validate */ break; case 'd': options_.delim = *optarg; + /* @fallthrough@ */ + case 'D': options_.delim_out = *optarg; break; + case 'i': + options_.reservoir_grow_per = strtoul(optarg, NULL, 0); + break; + + case 'j': + options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0); + break; + + case 's': + status_filename = optarg; + break; + case 'n': - options_.reservoir_sz = atoi(optarg); + options_.reservoir_sz = strtoul(optarg, NULL, 0); break; case 'r': @@ -642,44 +336,59 @@ int main(int argc, char *argv[]) { } } +#if 0 + /* zero-or-more arguments required */ + /* if that ever changes... */ if (argc - optind < 0) { usage_(argv[0], 0); exit(EX_USAGE); } +#endif - if (rand_init_(options_.rand_file)) { + if (randomness_init(options_.rand_file)) { NOTIFY_ERROR("failed to initialize randomness source\n"); exit(EX_NOINPUT); } - reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir); + reservoir = reservoir_new(options_.reservoir_sz); if (reservoir == NULL) { - NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + NOTIFY_ERROR("could not create new reservoir"); + exit(EX_OSERR); + } + + if (status_filename) { + options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT); + if (options_.status_fd < 0) { + NOTIFY_ERROR("could not open status file '%s'", status_filename); + exit(EX_OSERR); + } + } + + sa.sa_handler = request_snapshot_; + (void)sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + if (sigaction(SIGUSR1, &sa, NULL)) { + NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno)); exit(EX_OSERR); } - reservoir->reservoir_sz = options_.reservoir_sz; - reservoir->reservoir_insertions = 0; - memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); if (argc - optind == 0) { - fd = STDIN_FILENO; - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { while (optind < argc) { if (strcmp("-", argv[optind]) == 0) { - fd = STDIN_FILENO; - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { - fd = open(argv[optind], O_RDONLY); + int fd = open(argv[optind], O_RDONLY); if (fd < 0) { NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); exit(EX_NOINPUT); } - if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } if (close(fd)) { @@ -692,41 +401,16 @@ int main(int argc, char *argv[]) { } if (options_.verbosity) { - fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines); + fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples); } if (options_.verbosity > 1) { - fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions); + fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally); } - if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) { + if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } exit(EX_OK); } - -#else /* TEST */ - -void *test_suite_data; - -int test_suite_pre(void *suite_data) { - (void)suite_data; - if (rand_init_(NULL)) { - return -1; - } - return 0; -} -int test_suite_post(void *suite_data) { - (void)suite_data; - return 0; -} - -test_t test_suite[] = { - { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 }, - { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 }, - { NULL, NULL, NULL, NULL, NULL }, -}; - - -#endif /* TEST */