/* reservoir_sample.c This collates a randomized subset of its input, by means of reservoir- sampling, and a Fisher-Yates shuffle. */ /* To do: Allow user-supplied function (in relation to entry count) to define grow-rate option, rather than static count. JIT would be nice there. The entire grow-rate thing is not very well thought-through, will bias the results, and likely not very useful as currently implemented. */ #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "notify.h" #include "buf.h" #include "randomness.h" #include "reservoir.h" static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; static struct options_ { unsigned int verbosity; size_t read_buf_sz; size_t reservoir_sz; unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */ unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */ char delim; char delim_out; char *rand_file; int status_fd; } options_ = { .verbosity = 0, .read_buf_sz = 8192, .reservoir_sz = 1, .reservoir_grow_per = 0, .delim = '\n', .delim_out = '\n', .rand_file = NULL, .status_fd = STDERR_FILENO, }; /* #status_requested_ will be set whenever a view of the current sample reservoir is desired. */ static int status_requested_ = 0; static void usage_(const char *prog, unsigned int full) { FILE *f = full ? stdout : stderr; char *x = strrchr(prog, '/'); if (x && *(x + 1)) prog = x + 1; if (full) fprintf(f, "%s -- returns a random sampling of input\n\n", prog); fprintf(f, "Usage: %s options\n", prog); if (full) { fprintf(f, "\nOptions:\n" "\t-n -- returns samples [default: %zu]\n" "\t-d -- use as input and output delimiter [default: '\\%03hho']\n" "\t-r -- read randomness from [default: '%s']\n" "\t-b -- set input buffer size [default: %zu]\n" "\t-i -- grow reservoir by 1 for every input samples (0 inhibits behavior) [default: %lu]\n" "\t-h -- double the reservoir growth interval every input samples (0 inhibits behavior) [default: %lu]\n " "\t-s -- USR1 signals will write reservoir contents to rather than stderr (has no effect on normal output to stdout upon input EOF)\n" "\t-v -- increase verbosity\n" "\t-h -- this screen\n", options_.reservoir_sz, options_.delim, options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)", options_.read_buf_sz, options_.reservoir_grow_double_per, options_.reservoir_grow_per); fprintf(f, "\n%78s\n", src_id_); } fflush(f); } /* request_snapshot_ Signal handler to be bound to USR1. Upon receiving a signal, take note that a snapshot of the current state has been requested. */ static void request_snapshot_(int signum) { (void)signum; status_requested_ = 1; } /* accumulate_input_ Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an accumulator buffer. For each #delimiter character found in what was just read, occasionally remember the preceeding characters. */ static int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) { buf_t read_buf, new_buf = NULL; size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; unsigned long grow_count = 0; unsigned long grow_grow_count = 0; assert(read_block_sz > 0); assert(psamples != NULL); assert(preservoir != NULL); assert(*preservoir != NULL); if (fd < 0) { return -1; } read_buf = buf_new(read_block_sz); if (read_buf == NULL) { return -1; } bytes_scanned = 0; /* begin accumulating */ for (;;) { NOTIFY_DEBUG("read loop\n\n"); /* make sure there's enough room in our input buffer for a full read() */ if (buf_makeroom(&read_buf, read_block_sz)) { free(read_buf); free(new_buf); return -1; } NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf)); do { len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf)); /* a signal may have interrupted read(), deal with that before doing anything else */ if (status_requested_) { status_requested_ = 0; NOTIFY_DEBUG("dumping reservoir due to signal"); if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) { NOTIFY_ERROR("failed to output current reservoir contents"); } if (options_.verbosity) { reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out); } } } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < 0) { NOTIFY_ERROR("%s:%s", "read", strerror(errno)); free(read_buf); free(new_buf); return -1; } if (len == 0) { break; } read_buf->buf_used += len; NOTIFY_DEBUG("len:%zd", len); D_BUF("read_buf: ", read_buf); while (len > 0) { ssize_t len_flensed; NOTIFY_DEBUG("len:%zd", len); if (options_.reservoir_grow_per && grow_count >= options_.reservoir_grow_per) { NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1); grow_count = 0; if (reservoir_grow(preservoir, 1)) { NOTIFY_ERROR("failed to increase reservoir size"); free(read_buf); return -1; } } if (options_.reservoir_grow_double_per && grow_grow_count >= options_.reservoir_grow_double_per) { if (grow_count > ULONG_MAX / 2) { /* would overflow, never grow again */ grow_count = 0; grow_grow_count = 0; NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue"); } else { NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2); grow_count *= 2; } } /* determine if we want to save the next buffer */ if (new_buf == NULL) { /* if new_buf is not null, we already want to save the next one.. */ /* otherwise, save if we've read in fewer lines than the reservoir holds */ /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */ if (*psamples < (*preservoir)->reservoir_sz) { NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz); } else { NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz); } if (*psamples < (*preservoir)->reservoir_sz || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) { NOTIFY_DEBUG("next buffer will be remembered.."); new_buf = buf_new(0); if (new_buf == NULL) { free(read_buf); return -1; } } else { NOTIFY_DEBUG("not saving next buffer.."); } } len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL); if (len_flensed < 0) { free(read_buf); free(new_buf); return -1; } if (len_flensed == 0) { /* no delimiter found yet, stop parsing and read more */ NOTIFY_DEBUG("no delim found after %zd", len); bytes_scanned = len; buf_rebase(read_buf); break; } len -= len_flensed; bytes_scanned = 0; D_BUF("read_buf: ", read_buf); if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); reservoir_remember((*preservoir), new_buf); new_buf = NULL; } *psamples += 1; grow_count += 1; grow_grow_count += 1; } } /* leftovers */ NOTIFY_DEBUG("loop done\n\n"); if (read_buf->buf_used) { D_BUF("leftovers: ", read_buf); if (new_buf != NULL || *psamples < (*preservoir)->reservoir_sz) { reservoir_remember((*preservoir), read_buf); read_buf = NULL; } *psamples += 1; } free(read_buf); free(new_buf); return 0; } int main(int argc, char *argv[]) { struct sigaction sa; char *status_filename = NULL; reservoir_t reservoir; unsigned long num_samples = 0; int c; while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; break; case 'b': options_.read_buf_sz = strtoul(optarg, NULL, 0); /* XXX: validate */ break; case 'd': options_.delim = *optarg; /* @fallthrough@ */ case 'D': options_.delim_out = *optarg; break; case 'i': options_.reservoir_grow_per = strtoul(optarg, NULL, 0); break; case 'j': options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0); break; case 's': status_filename = optarg; break; case 'n': options_.reservoir_sz = strtoul(optarg, NULL, 0); break; case 'r': options_.rand_file = optarg; break; case 'h': usage_(argv[0], 1); exit(EX_OK); default: usage_(argv[0], 0); exit(EX_USAGE); } } #if 0 /* zero-or-more arguments required */ /* if that ever changes... */ if (argc - optind < 0) { usage_(argv[0], 0); exit(EX_USAGE); } #endif if (randomness_init(options_.rand_file)) { NOTIFY_ERROR("failed to initialize randomness source\n"); exit(EX_NOINPUT); } reservoir = reservoir_new(options_.reservoir_sz); if (reservoir == NULL) { NOTIFY_ERROR("could not create new reservoir"); exit(EX_OSERR); } if (status_filename) { options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT, 0666); if (options_.status_fd < 0) { NOTIFY_ERROR("could not open status file '%s'", status_filename); exit(EX_OSERR); } } sa.sa_handler = request_snapshot_; (void)sigemptyset(&sa.sa_mask); sa.sa_flags = 0; if (sigaction(SIGUSR1, &sa, NULL)) { NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno)); exit(EX_OSERR); } if (argc - optind == 0) { if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { while (optind < argc) { if (strcmp("-", argv[optind]) == 0) { if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { int fd = open(argv[optind], O_RDONLY); if (fd < 0) { NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); exit(EX_NOINPUT); } if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } if (close(fd)) { NOTIFY_ERROR("%s:%s", "close", strerror(errno)); exit(EX_OSERR); } } optind++; } } if (options_.verbosity) { fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples); } if (options_.verbosity > 1) { fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally); } if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } exit(EX_OK); }