/* reservoir_sample.c This generates a randomized subset of its input, by means of reservoir- sampling, and a Fisher-Yates shuffle. */ #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "notify.h" #include "test_suite.h" static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; static struct options_ { unsigned int verbosity; size_t read_buf_sz; size_t reservoir_sz; char delim; char delim_out; char *rand_file; } options_ = { .verbosity = 0, .read_buf_sz = 8192, .reservoir_sz = 1, .delim = '\n', .delim_out = '\n', .rand_file = "/dev/random", }; static int rand_fd_ = -1; typedef struct buf_ { size_t buf_sz; size_t buf_start; size_t buf_used; unsigned char buf[]; } *buf_t; #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) typedef struct reservoir_ { size_t reservoir_sz; buf_t reservoir[]; } *reservoir_t; #ifndef NDEBUG #define D_BUF(__pre__,__b__,...) do {\ if ( (__b__) == NULL )\ NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\ else {\ NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\ ## __VA_ARGS__,\ (__b__),\ (__b__)->buf_sz,\ (__b__)->buf_start,\ (__b__)->buf_used,\ BUF_ROOM((__b__)),\ (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\ assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\ }\ } while (0) #define D_RESERVOIR(__r__) do {\ size_t i;\ for (i = 0; i < (__r__)->reservoir_sz; i++) {\ D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0) #else #define D_RESERVOIR(...) #define D_BUF(...) #endif static void usage_(const char *prog, unsigned int full) { FILE *f = full ? stdout : stderr; char *x = strrchr(prog, '/'); if (x && *(x + 1)) prog = x + 1; if (full) fprintf(f, "%s -- returns a random sampling of input\n\n", prog); fprintf(f, "Usage: %s options\n", prog); if (full) { fprintf(f, "\nOptions:\n" "\t-n -- returns samples [default: %zu]\n" "\t-d -- use as input delimiter [default: '\\%03hho']\n" "\t-r -- read randomness from [default: '%s']\n" "\t-b -- read buffer size [default: %zu]\n" "\t-v -- increase verbosity\n" "\t-h -- this screen\n", options_.reservoir_sz, options_.delim, options_.rand_file, options_.read_buf_sz); fprintf(f, "\n%78s\n", src_id_); } fflush(f); } /* rand_upto_inclusive_ Room for improvement: constrain bits of randomness consumed, based on #limit also maybe read chunks of randomness at a time */ static unsigned long rand_upto_inclusive_(unsigned long limit) { unsigned long randomness; if (limit == 0) return 0; if (rand_fd_ != -1) { ssize_t len; len = read(rand_fd_, &randomness, sizeof randomness); if (len == sizeof randomness) { randomness %= limit + 1; return randomness; } NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") ); } randomness = mrand48(); randomness %= limit + 1; return randomness; } static int rand_init_(char *rand_file) { srand48(time(NULL) ^ getpid()); if (rand_file) { rand_fd_ = open(rand_file, O_RDONLY); if (rand_fd_ == -1) { NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno)); return -1; } } return 0; } static buf_t buf_new_(size_t sz) { buf_t buf = malloc(sz + sizeof *buf); if (buf) { buf->buf_sz = sz; buf->buf_start = buf->buf_used = 0; memset(buf->buf, 0, sz); } return buf; } static void buf_rebase_(buf_t buf) { if (buf->buf_start == 0) return; memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used); buf->buf_start = 0; } static int buf_makeroom_(buf_t *pbuf, size_t roomfor) { size_t new_sz; void *tmp_ptr; assert(pbuf != NULL); if (*pbuf == NULL) { *pbuf = buf_new_(roomfor); if (*pbuf == NULL) { return -1; } } buf_rebase_(*pbuf); if (BUF_ROOM(*pbuf) >= roomfor) return 0; new_sz = (*pbuf)->buf_used + roomfor; tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf); if (tmp_ptr == NULL) { NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); return -1; } *pbuf = tmp_ptr; (*pbuf)->buf_sz = new_sz; return 0; } static int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) { assert(src != NULL); assert(dst != NULL); assert(src_skip + n <= src->buf_used); if (buf_makeroom_(dst, n)) { return -1; } memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n); (*dst)->buf_used += n; return 0; } /* buf_flense_ Starting after #src_offset characters, scan through #src, stopping at the first character matching #delimiter, whereupon all the characters leading up to #delimiter are copied into *#dst if #dst is not NULL. #src becomes the characters following #delimiter. Returns the number of characters flensed from #src. Room for improvement: If flensed segment is more than half the buffer, copy remainder of src into dst, then return src, leaving dst in its place. */ static ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) { const size_t delimiter_len = 1; size_t i; assert(src != NULL); assert(src_offset <= (*src)->buf_used); NOTIFY_DEBUG("src_offset:%zu", src_offset); D_BUF("src ", *src); D_BUF("dst ", dst ? *dst : NULL); for (i = src_offset; i < (*src)->buf_used; i++) { if ((*src)->buf[(*src)->buf_start + i] == delimiter) { if (dst != NULL) { if (buf_range_dup_or_append_((*src), 0, i, dst)) { return -1; } } (*src)->buf_start += i + delimiter_len; (*src)->buf_used -= i + delimiter_len; D_BUF("src ", *src); D_BUF("dst ", dst ? *dst : NULL); return i + delimiter_len; } } return 0; } #ifdef TEST static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers"; static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n"; struct buf_flense_expected_result_ { ssize_t r; const char *buf; } buf_flense_expected1[] = { { 1 + 1, "a" }, { 2 + 1, "bc" }, { 3 + 1, "def" }, { 4 + 1, "ghij" }, { 5 + 1, "klmno" }, { 3 + 1, "pqr" }, { 1 + 1, "s" }, { 3 + 1, "tuv" }, { 5 + 1, "wxyz0" }, { 4 + 1, "1234" }, { 3 + 1, "567" }, { 2 + 1, "89" }, { 0, "0leftovers" }, }, buf_flense_expected2[] = { { 0 + 1, "" }, { 0 + 1, "" }, { 3 + 1, "foo" }, { 3 + 1, "bar" }, { 0 + 1, "" }, { 0, "" }, }; struct test_buf_flense_data_ { const char *src; const struct buf_flense_expected_result_ *expected; } test_buf_flense_data1 = { .src = buf_flense_testdata1, .expected = buf_flense_expected1, }, test_buf_flense_data2 = { .src = buf_flense_testdata2, .expected = buf_flense_expected2, }; static int test_buf_flense_(void *test_arg, void *suite_arg) { (void)suite_arg; struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg; const char testdata_len = strlen(test_data->src); const struct buf_flense_expected_result_ *next_result = test_data->expected; int retval = 0; buf_t src; TEST_INFO("initializing src buffer with %zu characters", testdata_len); src = buf_new_(testdata_len); if (src == NULL) { TEST_ERROR("%s:%s", "new_buf_", "failed"); return -1; } memcpy(src->buf, test_data->src, testdata_len); src->buf_used += testdata_len; D_BUF("src ", src); for (;;) { ssize_t r; buf_t dst = NULL; r = buf_flense_(&src, 0, '\n', &dst); if (r != next_result->r) { TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r); retval = -1; } if (r == 0) { TEST_INFO("finished flensing"); break; } if (strlen(next_result->buf) > dst->buf_used) { TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf); D_BUF("dst ", dst); retval = -1; } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) { TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf); D_BUF("dst ", dst); retval = -1; } TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start); memset(dst, 0, dst->buf_sz + sizeof *dst); free(dst); dst = NULL; next_result++; } if (strlen(next_result->buf) > src->buf_used) { TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf); D_BUF("src ", src); retval = -1; } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) { TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf); D_BUF("src ", src); retval = -1; } TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start); memset(src, 0, src->buf_sz + sizeof *src); free(src); src = NULL; return retval; } #endif /* TEST */ /* reservoir_remember_ Remember #line, forgetting a line if more than #num_lines have already been remembered. Remembers them in random order. */ static void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) { unsigned long randomness; buf_t old_buf; assert(reservoir != NULL); D_BUF("reserving ", buf); if (reservoir->reservoir_sz > 0) { if (num_lines < reservoir->reservoir_sz) { randomness = rand_upto_inclusive_(num_lines); NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines); reservoir->reservoir[num_lines] = reservoir->reservoir[randomness]; old_buf = NULL; } else { randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1); old_buf = reservoir->reservoir[randomness]; } NOTIFY_DEBUG("replacing reservoir index %zu", randomness); reservoir->reservoir[randomness] = buf; } else { old_buf = buf; } if (old_buf != NULL) { D_BUF("FREEING ", old_buf); memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); free(old_buf); } D_RESERVOIR(reservoir); } /* reservoir_read_ Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an accumulator buffer. For each #delimiter character found in what was just read, occasionally remember the preceeding characters. */ static int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) { buf_t read_buf, new_buf = NULL; size_t line_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; assert(read_block_sz > 0); assert(num_lines != NULL); assert(reservoir != NULL); if (fd < 0) { return -1; } read_buf = buf_new_(read_block_sz); if (read_buf == NULL) { return -1; } line_scanned = 0; for (;;) { NOTIFY_DEBUG("read loop\n\n"); if (buf_makeroom_(&read_buf, read_block_sz)) { free(read_buf); free(new_buf); return -1; } NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf)); len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf)); if (len < 0) { NOTIFY_ERROR("%s:%s", "read", strerror(errno)); free(read_buf); free(new_buf); return -1; } if (len == 0) { break; } read_buf->buf_used += len; NOTIFY_DEBUG("len:%zd", len); D_BUF("read_buf: ", read_buf); while (len > 0) { ssize_t len_flensed; NOTIFY_DEBUG("len:%zd", len); /* determine if we want to save the next buffer */ if (new_buf == NULL) { /* if new_buf is not null, we already want to save the next one.. */ /* otherwise, save if we've read in fewer lines than the reservoir holds */ /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */ if (*num_lines < reservoir->reservoir_sz) { NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz); } else { NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz); } if (*num_lines < reservoir->reservoir_sz || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) { NOTIFY_DEBUG("next buffer will be reserved.."); new_buf = buf_new_(0); if (new_buf == NULL) { free(read_buf); return -1; } } else { NOTIFY_DEBUG("not saving next buffer.."); } } len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL); if (len_flensed < 0) { free(read_buf); free(new_buf); return -1; } if (len_flensed == 0) { /* no delimiter found yet, stop parsing and read more */ NOTIFY_DEBUG("no delim found after %zd", len); line_scanned = len; buf_rebase_(read_buf); break; } len -= len_flensed; line_scanned = 0; D_BUF("read_buf: ", read_buf); if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); reservoir_remember_(reservoir, *num_lines, new_buf); new_buf = NULL; *num_lines += 1; } } } /* leftovers */ NOTIFY_DEBUG("loop done\n\n"); if (read_buf->buf_used) { D_BUF("leftovers: ", read_buf); if (new_buf != NULL || *num_lines < reservoir->reservoir_sz) { reservoir_remember_(reservoir, *num_lines, read_buf); read_buf = NULL; } *num_lines += 1; } free(read_buf); free(new_buf); return 0; } /* reservoir_write_ Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots. */ static int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) { ssize_t len; size_t i; struct iovec iov[2]; iov[1].iov_base = &delim; iov[1].iov_len = sizeof delim; assert(reservoir != NULL); D_RESERVOIR(reservoir); for (i = 0; i < reservoir->reservoir_sz; i++) { if (reservoir->reservoir[i]) { iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; iov[0].iov_len = reservoir->reservoir[i]->buf_used; } else { iov[0].iov_base = NULL; iov[0].iov_len = 0; } len = writev(fd, iov, sizeof iov / sizeof *iov); if (len < 0) { NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); return -1; } } return 0; } #ifndef TEST int main(int argc, char *argv[]) { struct reservoir_ *reservoir; size_t num_lines = 0; int fd; int c; while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; break; case 'b': options_.read_buf_sz = atoi(optarg); break; case 'd': options_.delim = *optarg; options_.delim_out = *optarg; break; case 'n': options_.reservoir_sz = atoi(optarg); break; case 'h': usage_(argv[0], 1); exit(EX_OK); default: usage_(argv[0], 0); exit(EX_USAGE); } } if (argc - optind < 0) { usage_(argv[0], 0); exit(EX_USAGE); } if (rand_init_(options_.rand_file)) { NOTIFY_ERROR("failed to initialize randomness source\n"); exit(EX_NOINPUT); } reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir); if (reservoir == NULL) { NOTIFY_ERROR("%s:%s", "calloc", strerror(errno)); exit(EX_OSERR); } reservoir->reservoir_sz = options_.reservoir_sz; memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); if (argc - optind == 0) { fd = STDIN_FILENO; if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { exit(EX_SOFTWARE); } } else { while (optind < argc) { if (strcmp("-", argv[optind]) == 0) { fd = STDIN_FILENO; if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { exit(EX_SOFTWARE); } } else { fd = open(argv[optind], O_RDONLY); if (fd < 0) { NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); exit(EX_NOINPUT); } if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) { exit(EX_SOFTWARE); } if (close(fd)) { NOTIFY_ERROR("%s:%s", "close", strerror(errno)); exit(EX_OSERR); } } optind++; } } if (options_.verbosity) { fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines); } if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } exit(EX_OK); } #else /* TEST */ void *test_suite_data; int test_suite_pre(void *suite_data) { (void)suite_data; if (rand_init_(NULL)) { return -1; } return 0; } int test_suite_post(void *suite_data) { (void)suite_data; return 0; } test_t test_suite[] = { { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 }, { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 }, { NULL, NULL, NULL, NULL, NULL }, }; #endif /* TEST */