X-Git-Url: http://git.squeep.com/?p=reservoir_sample;a=blobdiff_plain;f=reservoir_sample.c;fp=reservoir_sample.c;h=492dd64975a24307c83ffd0530310594e8059490;hp=0000000000000000000000000000000000000000;hb=694d004f3c2a521672d468a8bc77dc2b635ffd9f;hpb=6bd502bb5a3c61bb3b0cd76d613973346e8a5d90 diff --git a/reservoir_sample.c b/reservoir_sample.c new file mode 100644 index 0000000..492dd64 --- /dev/null +++ b/reservoir_sample.c @@ -0,0 +1,707 @@ +/* reservoir_sample.c + This generates a randomized subset of its input, by means of reservoir- + sampling, and a Fisher-Yates shuffle. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "version.h" +#include "notify.h" +#include "test_suite.h" + +static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; + +static struct options_ { + unsigned int verbosity; + size_t read_buf_sz; + size_t reservoir_sz; + char delim; + char delim_out; + char *rand_file; +} options_ = { + .verbosity = 0, + .read_buf_sz = 8192, + .reservoir_sz = 1, + .delim = '\n', + .delim_out = '\n', + .rand_file = "/dev/random", +}; + +static int rand_fd_ = -1; + +typedef struct buf_ { + size_t buf_sz; + size_t buf_start; + size_t buf_used; + unsigned char buf[]; +} *buf_t; +#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) + +typedef struct reservoir_ { + size_t reservoir_sz; + buf_t reservoir[]; +} *reservoir_t; + +#ifndef NDEBUG +#define D_BUF(__pre__,__b__,...) do {\ + if ( (__b__) == NULL )\ + NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\ + else {\ + NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\ + ## __VA_ARGS__,\ + (__b__),\ + (__b__)->buf_sz,\ + (__b__)->buf_start,\ + (__b__)->buf_used,\ + BUF_ROOM((__b__)),\ + (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\ + assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\ + }\ +} while (0) +#define D_RESERVOIR(__r__) do {\ + size_t i;\ + for (i = 0; i < (__r__)->reservoir_sz; i++) {\ + D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0) +#else +#define D_RESERVOIR(...) +#define D_BUF(...) +#endif + + +static +void usage_(const char *prog, unsigned int full) { + FILE *f = full ? stdout : stderr; + char *x = strrchr(prog, '/'); + + if (x && *(x + 1)) + prog = x + 1; + + if (full) + fprintf(f, "%s -- returns a random sampling of input\n\n", + prog); + + fprintf(f, "Usage: %s options\n", + prog); + + if (full) { + fprintf(f, "\nOptions:\n" + "\t-n -- returns samples [default: %zu]\n" + "\t-d -- use as input delimiter [default: '\\%03hho']\n" + "\t-r -- read randomness from [default: '%s']\n" + "\t-b -- read buffer size [default: %zu]\n" + "\t-v -- increase verbosity\n" + "\t-h -- this screen\n", + options_.reservoir_sz, + options_.delim, + options_.rand_file, + options_.read_buf_sz); + + fprintf(f, "\n%78s\n", src_id_); + } + fflush(f); +} + + +/* rand_upto_inclusive_ + Room for improvement: constrain bits of randomness consumed, based on #limit + also maybe read chunks of randomness at a time + */ +static +unsigned long rand_upto_inclusive_(unsigned long limit) { + unsigned long randomness; + + if (limit == 0) + return 0; + + if (rand_fd_ != -1) { + ssize_t len; + + len = read(rand_fd_, &randomness, sizeof randomness); + if (len == sizeof randomness) { + randomness %= limit + 1; + + return randomness; + } + NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") ); + } + + randomness = mrand48(); + randomness %= limit + 1; + + return randomness; +} + + +static +int rand_init_(char *rand_file) { + srand48(time(NULL) ^ getpid()); + if (rand_file) { + rand_fd_ = open(rand_file, O_RDONLY); + if (rand_fd_ == -1) { + NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno)); + return -1; + } + } + return 0; +} + + +static +buf_t buf_new_(size_t sz) { + buf_t buf = malloc(sz + sizeof *buf); + if (buf) { + buf->buf_sz = sz; + buf->buf_start = buf->buf_used = 0; + memset(buf->buf, 0, sz); + } + return buf; +} + + +static +void buf_rebase_(buf_t buf) { + if (buf->buf_start == 0) + return; + memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used); + buf->buf_start = 0; +} + + +static +int buf_makeroom_(buf_t *pbuf, size_t roomfor) { + size_t new_sz; + void *tmp_ptr; + + assert(pbuf != NULL); + + if (*pbuf == NULL) { + *pbuf = buf_new_(roomfor); + if (*pbuf == NULL) { + return -1; + } + } + + buf_rebase_(*pbuf); + + if (BUF_ROOM(*pbuf) >= roomfor) + return 0; + + new_sz = (*pbuf)->buf_used + roomfor; + tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf); + if (tmp_ptr == NULL) { + NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); + return -1; + } + *pbuf = tmp_ptr; + (*pbuf)->buf_sz = new_sz; + + return 0; +} + + +static +int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) { + assert(src != NULL); + assert(dst != NULL); + assert(src_skip + n <= src->buf_used); + + if (buf_makeroom_(dst, n)) { + return -1; + } + + memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n); + (*dst)->buf_used += n; + + return 0; +} + + +/* buf_flense_ + Starting after #src_offset characters, scan through #src, stopping at + the first character matching #delimiter, whereupon all the characters + leading up to #delimiter are copied into *#dst if #dst is not NULL. #src + becomes the characters following #delimiter. + Returns the number of characters flensed from #src. + + Room for improvement: + If flensed segment is more than half the buffer, copy remainder of src + into dst, then return src, leaving dst in its place. +*/ +static +ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) { + const size_t delimiter_len = 1; + size_t i; + + assert(src != NULL); + assert(src_offset <= (*src)->buf_used); + + NOTIFY_DEBUG("src_offset:%zu", src_offset); + D_BUF("src ", *src); + D_BUF("dst ", dst ? *dst : NULL); + + for (i = src_offset; i < (*src)->buf_used; i++) { + if ((*src)->buf[(*src)->buf_start + i] == delimiter) { + + if (dst != NULL) { + if (buf_range_dup_or_append_((*src), 0, i, dst)) { + return -1; + } + } + + (*src)->buf_start += i + delimiter_len; + (*src)->buf_used -= i + delimiter_len; + + D_BUF("src ", *src); + D_BUF("dst ", dst ? *dst : NULL); + return i + delimiter_len; + } + } + + return 0; +} + +#ifdef TEST + +static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers"; +static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n"; + +struct buf_flense_expected_result_ { + ssize_t r; + const char *buf; +} buf_flense_expected1[] = { + { 1 + 1, "a" }, + { 2 + 1, "bc" }, + { 3 + 1, "def" }, + { 4 + 1, "ghij" }, + { 5 + 1, "klmno" }, + { 3 + 1, "pqr" }, + { 1 + 1, "s" }, + { 3 + 1, "tuv" }, + { 5 + 1, "wxyz0" }, + { 4 + 1, "1234" }, + { 3 + 1, "567" }, + { 2 + 1, "89" }, + { 0, "0leftovers" }, +}, buf_flense_expected2[] = { + { 0 + 1, "" }, + { 0 + 1, "" }, + { 3 + 1, "foo" }, + { 3 + 1, "bar" }, + { 0 + 1, "" }, + { 0, "" }, +}; + +struct test_buf_flense_data_ { + const char *src; + const struct buf_flense_expected_result_ *expected; +} test_buf_flense_data1 = { + .src = buf_flense_testdata1, + .expected = buf_flense_expected1, +}, test_buf_flense_data2 = { + .src = buf_flense_testdata2, + .expected = buf_flense_expected2, +}; + +static int test_buf_flense_(void *test_arg, void *suite_arg) { + (void)suite_arg; + struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg; + const char testdata_len = strlen(test_data->src); + const struct buf_flense_expected_result_ *next_result = test_data->expected; + int retval = 0; + buf_t src; + + TEST_INFO("initializing src buffer with %zu characters", testdata_len); + src = buf_new_(testdata_len); + if (src == NULL) { + TEST_ERROR("%s:%s", "new_buf_", "failed"); + return -1; + } + memcpy(src->buf, test_data->src, testdata_len); + src->buf_used += testdata_len; + + D_BUF("src ", src); + + for (;;) { + ssize_t r; + buf_t dst = NULL; + + r = buf_flense_(&src, 0, '\n', &dst); + if (r != next_result->r) { + TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r); + retval = -1; + } + if (r == 0) { + TEST_INFO("finished flensing"); + break; + } + + if (strlen(next_result->buf) > dst->buf_used) { + TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf); + D_BUF("dst ", dst); + retval = -1; + } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) { + TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf); + D_BUF("dst ", dst); + retval = -1; + } + + TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start); + + memset(dst, 0, dst->buf_sz + sizeof *dst); + free(dst); + dst = NULL; + + next_result++; + } + if (strlen(next_result->buf) > src->buf_used) { + TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf); + D_BUF("src ", src); + retval = -1; + } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) { + TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf); + D_BUF("src ", src); + retval = -1; + } + + TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start); + + memset(src, 0, src->buf_sz + sizeof *src); + free(src); + src = NULL; + + return retval; +} +#endif /* TEST */ + + +/* reservoir_remember_ + Remember #line, forgetting a line if more than #num_lines have already + been remembered. Remembers them in random order. +*/ +static +void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) { + unsigned long randomness; + buf_t old_buf; + + assert(reservoir != NULL); + + D_BUF("reserving ", buf); + + if (reservoir->reservoir_sz > 0) { + if (num_lines < reservoir->reservoir_sz) { + randomness = rand_upto_inclusive_(num_lines); + NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines); + reservoir->reservoir[num_lines] = reservoir->reservoir[randomness]; + old_buf = NULL; + } else { + randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1); + old_buf = reservoir->reservoir[randomness]; + } + NOTIFY_DEBUG("replacing reservoir index %zu", randomness); + reservoir->reservoir[randomness] = buf; + } else { + old_buf = buf; + } + + if (old_buf != NULL) { + D_BUF("FREEING ", old_buf); + memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); + free(old_buf); + } + + D_RESERVOIR(reservoir); +} + + +/* reservoir_read_ + Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an + accumulator buffer. For each #delimiter character found in what was just + read, occasionally remember the preceeding characters. +*/ +static +int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) { + buf_t read_buf, new_buf = NULL; + size_t line_scanned; /* how much of the buffer has already been searched for delimiter */ + ssize_t len; + + assert(read_block_sz > 0); + assert(num_lines != NULL); + assert(reservoir != NULL); + + if (fd < 0) { + return -1; + } + + read_buf = buf_new_(read_block_sz); + if (read_buf == NULL) { + return -1; + } + line_scanned = 0; + + for (;;) { + NOTIFY_DEBUG("read loop\n\n"); + + if (buf_makeroom_(&read_buf, read_block_sz)) { + free(read_buf); + free(new_buf); + return -1; + } + + NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf)); + len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf)); + if (len < 0) { + NOTIFY_ERROR("%s:%s", "read", strerror(errno)); + free(read_buf); + free(new_buf); + return -1; + } + if (len == 0) { + break; + } + read_buf->buf_used += len; + + NOTIFY_DEBUG("len:%zd", len); + D_BUF("read_buf: ", read_buf); + + while (len > 0) { + ssize_t len_flensed; + + NOTIFY_DEBUG("len:%zd", len); + + /* determine if we want to save the next buffer */ + if (new_buf == NULL) { + /* if new_buf is not null, we already want to save the next one.. */ + /* otherwise, save if we've read in fewer lines than the reservoir holds */ + /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */ + + if (*num_lines < reservoir->reservoir_sz) { + NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz); + } else { + NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz); + } + + if (*num_lines < reservoir->reservoir_sz + || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) { + NOTIFY_DEBUG("next buffer will be reserved.."); + new_buf = buf_new_(0); + if (new_buf == NULL) { + free(read_buf); + return -1; + } + } else { + NOTIFY_DEBUG("not saving next buffer.."); + } + } + + len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL); + if (len_flensed < 0) { + free(read_buf); + free(new_buf); + return -1; + } + if (len_flensed == 0) { + /* no delimiter found yet, stop parsing and read more */ + NOTIFY_DEBUG("no delim found after %zd", len); + line_scanned = len; + buf_rebase_(read_buf); + break; + } + + len -= len_flensed; + line_scanned = 0; + + D_BUF("read_buf: ", read_buf); + + if (new_buf != NULL) { + D_BUF("parsed complete line: ", new_buf); + reservoir_remember_(reservoir, *num_lines, new_buf); + new_buf = NULL; + *num_lines += 1; + } + + } + } + /* leftovers */ + NOTIFY_DEBUG("loop done\n\n"); + if (read_buf->buf_used) { + D_BUF("leftovers: ", read_buf); + + if (new_buf != NULL + || *num_lines < reservoir->reservoir_sz) { + reservoir_remember_(reservoir, *num_lines, read_buf); + read_buf = NULL; + } + + *num_lines += 1; + } + + free(read_buf); + free(new_buf); + + return 0; +} + + +/* reservoir_write_ + Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots. +*/ +static +int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) { + ssize_t len; + size_t i; + struct iovec iov[2]; + + iov[1].iov_base = &delim; + iov[1].iov_len = sizeof delim; + + assert(reservoir != NULL); + D_RESERVOIR(reservoir); + + for (i = 0; i < reservoir->reservoir_sz; i++) { + if (reservoir->reservoir[i]) { + iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; + iov[0].iov_len = reservoir->reservoir[i]->buf_used; + } else { + iov[0].iov_base = NULL; + iov[0].iov_len = 0; + } + + len = writev(fd, iov, sizeof iov / sizeof *iov); + if (len < 0) { + NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); + return -1; + } + } + + return 0; +} + +#ifndef TEST + +int main(int argc, char *argv[]) { + struct reservoir_ *reservoir; + size_t num_lines = 0; + int fd; + int c; + + while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) { + switch (c) { + case 'v': + options_.verbosity++; + break; + + case 'b': + options_.read_buf_sz = atoi(optarg); + break; + + case 'd': + options_.delim = *optarg; + options_.delim_out = *optarg; + break; + + case 'n': + options_.reservoir_sz = atoi(optarg); + break; + + case 'h': + usage_(argv[0], 1); + exit(EX_OK); + + default: + usage_(argv[0], 0); + exit(EX_USAGE); + } + } + + if (argc - optind < 0) { + usage_(argv[0], 0); + exit(EX_USAGE); + } + + if (rand_init_(options_.rand_file)) { + NOTIFY_ERROR("failed to initialize randomness source\n"); + exit(EX_NOINPUT); + } + + reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir); + if (reservoir == NULL) { + NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); + exit(EX_OSERR); + } + reservoir->reservoir_sz = options_.reservoir_sz; + memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); + + if (argc - optind == 0) { + fd = STDIN_FILENO; + if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + exit(EX_SOFTWARE); + } + } else { + while (optind < argc) { + if (strcmp("-", argv[optind]) == 0) { + fd = STDIN_FILENO; + if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + exit(EX_SOFTWARE); + } + } else { + fd = open(argv[optind], O_RDONLY); + if (fd < 0) { + NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); + exit(EX_NOINPUT); + } + if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { + exit(EX_SOFTWARE); + } + if (close(fd)) { + NOTIFY_ERROR("%s:%s", "close", strerror(errno)); + exit(EX_OSERR); + } + } + optind++; + } + } + + if (options_.verbosity) { + fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines); + } + + if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) { + exit(EX_SOFTWARE); + } + + exit(EX_OK); +} + +#else /* TEST */ + +void *test_suite_data; + +int test_suite_pre(void *suite_data) { + (void)suite_data; + if (rand_init_(NULL)) { + return -1; + } + return 0; +} +int test_suite_post(void *suite_data) { + (void)suite_data; + return 0; +} + +test_t test_suite[] = { + { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 }, + { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 }, + { NULL, NULL, NULL, NULL, NULL }, +}; + + +#endif /* TEST */