/* reservoir_sample.c This collates a randomized subset of its input, by means of reservoir- sampling, and a Fisher-Yates shuffle. */ /* To do: Allow user-supplied function (in relation to entry count) to define grow-rate option, rather than static count. JIT would be nice there. The entire grow-rate thing is not very well thought-through, will bias the results, and likely not very useful as currently implemented. */ #include #include #include #include #include #include #include #include #include #include #include "version.h" #include "notify.h" #include "buf.h" #include "randomness.h" #include "reservoir.h" static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE; static struct options_ { unsigned int verbosity; size_t read_buf_sz; size_t reservoir_sz; unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */ unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */ char delim; char delim_out; char *rand_file; int status_fd; } options_ = { .verbosity = 0, .read_buf_sz = 8192, .reservoir_sz = 1, .reservoir_grow_per = 0, .delim = '\n', .delim_out = '\n', .rand_file = NULL, .status_fd = STDERR_FILENO, }; /* #status_requested_ This will be set whenever a snapshot of the current state of the sample reservoir is desired. Typically this will occur on a signal. */ static int status_requested_ = 0; static void usage_(const char *prog, unsigned int full) { FILE *f = full ? stdout : stderr; char *x = strrchr(prog, '/'); if (x && *(x + 1)) prog = x + 1; if (full) fprintf(f, "%s -- returns a random sampling of input\n\n", prog); fprintf(f, "Usage: %s options\n", prog); if (full) { fprintf(f, "\nOptions:\n" "\t-n -- returns samples [default: %zu]\n" "\t-d -- use as input and output delimiter [default: '\\%03hho']\n" "\t-r -- read randomness from [default: '%s']\n" "\t-b -- input buffer size [default: %zu]\n" "\t-i -- grow reservoir by 1 for every input samples (0 inhibits behavior) [default: %lu]\n" "\t-j -- double the reservoir growth interval every input samples (0 inhibits behavior) [default: %lu]\n " "\t-s -- USR1 signals will write reservoir contents to rather than stderr (has no effect on normal output to stdout upon input EOF)\n" "\t-v -- increase verbosity\n" "\t-h -- this screen\n", options_.reservoir_sz, options_.delim, options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)", options_.read_buf_sz, options_.reservoir_grow_double_per, options_.reservoir_grow_per); fprintf(f, "\n%78s\n", src_id_); } fflush(f); } /* request_snapshot_ Signal handler to be bound to USR1. Upon receiving a signal, take note that a snapshot of the current state has been requested. */ static void request_snapshot_(int signum) { (void)signum; status_requested_ = 1; } /* accumulate_input_ Read (up to #read_block_sz bytes at a time, or until EOF) from #fd into an accumulator buffer. For each #delimiter character found in what was just read, occasionally remember the group of preceeding characters. This behavioral logic lives here, rather than being contained in the reservoir module, simply because it was easier to reduce the number of allocations by handling it here. */ static int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) { buf_t read_buf, new_buf = NULL; size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; unsigned long grow_count = 0; unsigned long grow_grow_count = 0; assert(read_block_sz > 0); assert(psamples != NULL); assert(preservoir != NULL); assert(*preservoir != NULL); if (fd < 0) { return -1; } read_buf = buf_new(read_block_sz); if (read_buf == NULL) { return -1; } bytes_scanned = 0; /* begin accumulating */ for (;;) { NOTIFY_DEBUG("read loop\n\n"); /* Make sure there's enough room in our input buffer to append a full read() of #read_block_sz onto the end. */ if (buf_makeroom(&read_buf, read_block_sz)) { int e = errno; buf_del(&read_buf); buf_del(&new_buf); errno = e; return -1; } NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf)); do { len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf)); /* A signal may have interrupted read(), deal with that before doing anything else. Status output is best handled here anyhow, due to the naturally quiescent state of things, and also due to not being about to finish and output the final state. */ if (status_requested_) { status_requested_ = 0; NOTIFY_DEBUG("dumping reservoir due to signal"); if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) { NOTIFY_ERROR("failed to output current reservoir contents"); } if (options_.verbosity) { reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out); } } } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < 0) { NOTIFY_ERROR("%s:%s", "read", strerror(errno)); buf_del(&read_buf); buf_del(&new_buf); return -1; } if (len == 0) { break; } read_buf->buf_used += len; NOTIFY_DEBUG("len:%zd", len); D_BUF("read_buf: ", read_buf); /* Accumulator has been filled, pare and process. */ while (len > 0) { ssize_t len_flensed; NOTIFY_DEBUG("len:%zd", len); if (options_.reservoir_grow_per && grow_count >= options_.reservoir_grow_per) { NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1); grow_count = 0; if (reservoir_grow(preservoir, 1)) { NOTIFY_ERROR("failed to increase reservoir size"); buf_del(&read_buf); return -1; } } if (options_.reservoir_grow_double_per && grow_grow_count >= options_.reservoir_grow_double_per) { if (grow_count > ULONG_MAX / 2) { /* would overflow, never grow again */ grow_count = 0; grow_grow_count = 0; NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue"); } else { NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2); grow_count *= 2; } } /* determine if we want to save the next buffer */ if (new_buf == NULL) { /* if new_buf is not null, we already want to save the next one.. */ /* otherwise, save if we've read in fewer lines than the reservoir holds */ /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */ if (*psamples < (*preservoir)->reservoir_sz) { NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz); } else { NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz); } if (*psamples < (*preservoir)->reservoir_sz || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) { NOTIFY_DEBUG("next buffer will be remembered.."); new_buf = buf_new(0); if (new_buf == NULL) { buf_del(&read_buf); return -1; } } else { NOTIFY_DEBUG("not saving next buffer.."); } } len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL); if (len_flensed < 0) { buf_del(&read_buf); buf_del(&new_buf); return -1; } if (len_flensed == 0) { /* no delimiter found yet, stop parsing and read more */ NOTIFY_DEBUG("no delim found after %zd", len); bytes_scanned = len; buf_rebase(read_buf); break; } len -= len_flensed; bytes_scanned = 0; D_BUF("read_buf: ", read_buf); if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); reservoir_remember((*preservoir), new_buf); new_buf = NULL; } *psamples += 1; grow_count += 1; grow_grow_count += 1; } } /* leftovers */ NOTIFY_DEBUG("loop done\n\n"); if (read_buf->buf_used) { D_BUF("leftovers: ", read_buf); if (new_buf != NULL || *psamples < (*preservoir)->reservoir_sz) { reservoir_remember((*preservoir), read_buf); read_buf = NULL; } *psamples += 1; } buf_del(&read_buf); buf_del(&new_buf); return 0; } /* validate_strtoul_ Wrap strtoul and check for success. */ static unsigned long validate_strtoul_(const char *str, unsigned int *invalid) { char *ep; unsigned long num; errno = 0; num = strtoul(str, &ep, 0); if (errno || ! (optarg && *optarg != '\0' && *ep == '\0') ) { *invalid += 1; NOTIFY_ERROR("could not parse '%s' as a number: %s", str, strerror(errno)); } return num; } int main(int argc, char *argv[]) { struct sigaction sa; char *status_filename = NULL; reservoir_t reservoir; unsigned long num_samples = 0; unsigned int options_error = 0; int c; while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; break; case 'b': options_.read_buf_sz = validate_strtoul_(optarg, &options_error); break; case 'd': options_.delim = *optarg; /* @fallthrough@ */ case 'D': options_.delim_out = *optarg; break; case 'i': options_.reservoir_grow_per = validate_strtoul_(optarg, &options_error); break; case 'j': options_.reservoir_grow_double_per = validate_strtoul_(optarg, &options_error); break; case 's': status_filename = optarg; break; case 'n': options_.reservoir_sz = validate_strtoul_(optarg, &options_error); break; case 'r': options_.rand_file = optarg; break; case 'h': usage_(argv[0], 1); exit(EX_OK); default: usage_(argv[0], 0); exit(EX_USAGE); } } if (options_error) { exit(EX_USAGE); } #if 0 /* zero-or-more arguments required */ /* if that ever changes... */ if (argc - optind < 0) { usage_(argv[0], 0); exit(EX_USAGE); } #endif if (randomness_init(options_.rand_file)) { NOTIFY_ERROR("failed to initialize randomness source\n"); exit(EX_NOINPUT); } reservoir = reservoir_new(options_.reservoir_sz); if (reservoir == NULL) { NOTIFY_ERROR("could not create new reservoir"); exit(EX_OSERR); } if (status_filename) { options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT, 0666); if (options_.status_fd < 0) { NOTIFY_ERROR("could not open status file '%s'", status_filename); exit(EX_OSERR); } } sa.sa_handler = request_snapshot_; (void)sigemptyset(&sa.sa_mask); sa.sa_flags = 0; if (sigaction(SIGUSR1, &sa, NULL)) { NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno)); exit(EX_OSERR); } if (argc - optind == 0) { if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { while (optind < argc) { if (strcmp("-", argv[optind]) == 0) { if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } } else { int fd = open(argv[optind], O_RDONLY); if (fd < 0) { NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno)); exit(EX_NOINPUT); } if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) { exit(EX_SOFTWARE); } if (close(fd)) { NOTIFY_ERROR("%s:%s", "close", strerror(errno)); exit(EX_OSERR); } } optind++; } } if (options_.verbosity) { fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples); } if (options_.verbosity > 1) { fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally); } if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } exit(EX_OK); }