2 This collates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
7 Allow user-supplied function (in relation to entry count) to define
8 grow-rate option, rather than static count. JIT would be nice there.
10 The entire grow-rate thing is not very well thought-through, will bias
11 the results, and likely not very useful as currently implemented.
28 #include "randomness.h"
29 #include "reservoir.h"
31 static const char * const src_id_
= "v" VERSION_STR
" " VERSION_DATE
;
33 static struct options_
{
34 unsigned int verbosity
;
37 unsigned long reservoir_grow_per
; /* how many samples before adding a new slot to reservoir */
38 unsigned long reservoir_grow_double_per
; /* how many samples before doubling reservoir_grow_per */
47 .reservoir_grow_per
= 0,
51 .status_fd
= STDERR_FILENO
,
55 This will be set whenever a snapshot of the current state of the sample
56 reservoir is desired. Typically this will occur on a signal.
58 static int status_requested_
= 0;
61 void usage_(const char *prog
, unsigned int full
) {
62 FILE *f
= full
? stdout
: stderr
;
63 char *x
= strrchr(prog
, '/');
69 fprintf(f
, "%s -- returns a random sampling of input\n\n",
72 fprintf(f
, "Usage: %s options\n",
76 fprintf(f
, "\nOptions:\n"
77 "\t-n <num> -- returns <num> samples [default: %zu]\n"
78 "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
79 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
80 "\t-b <bytes> -- input buffer size [default: %zu]\n"
81 "\t-i <num> -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
82 "\t-j <num> -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
83 "\t-s <file> -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
84 "\t-v -- increase verbosity\n"
85 "\t-h -- this screen\n",
86 options_
.reservoir_sz
,
88 options_
.rand_file
? options_
.rand_file
: "(use system pseudorandom generator)",
90 options_
.reservoir_grow_double_per
,
91 options_
.reservoir_grow_per
);
93 fprintf(f
, "\n%78s\n", src_id_
);
99 Signal handler to be bound to USR1.
100 Upon receiving a signal, take note that a snapshot of the current state
104 void request_snapshot_(int signum
) {
106 status_requested_
= 1;
110 Read (up to #read_block_sz bytes at a time, or until EOF) from #fd into
111 an accumulator buffer. For each #delimiter character found in what was
112 just read, occasionally remember the group of preceeding characters.
113 This behavioral logic lives here, rather than being contained in the
114 reservoir module, simply because it was easier to reduce the number of
115 allocations by handling it here.
118 int accumulate_input_(int fd
, size_t read_block_sz
, int delimiter
, unsigned long *psamples
, reservoir_t
*preservoir
) {
119 buf_t read_buf
, new_buf
= NULL
;
120 size_t bytes_scanned
; /* how much of the buffer has already been searched for delimiter */
122 unsigned long grow_count
= 0;
123 unsigned long grow_grow_count
= 0;
125 assert(read_block_sz
> 0);
126 assert(psamples
!= NULL
);
127 assert(preservoir
!= NULL
);
128 assert(*preservoir
!= NULL
);
134 read_buf
= buf_new(read_block_sz
);
135 if (read_buf
== NULL
) {
140 /* begin accumulating */
142 NOTIFY_DEBUG("read loop\n\n");
145 Make sure there's enough room in our input buffer to append a
146 full read() of #read_block_sz onto the end.
148 if (buf_makeroom(&read_buf
, read_block_sz
)) {
156 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd
, read_buf
->buf
, read_buf
->buf_start
, bytes_scanned
, BUF_ROOM(read_buf
));
158 len
= read(fd
, read_buf
->buf
+ read_buf
->buf_start
+ read_buf
->buf_used
, BUF_ROOM(read_buf
));
161 A signal may have interrupted read(), deal with that before
162 doing anything else. Status output is best handled here anyhow,
163 due to the naturally quiescent state of things, and also due to
164 not being about to finish and output the final state.
166 if (status_requested_
) {
167 status_requested_
= 0;
168 NOTIFY_DEBUG("dumping reservoir due to signal");
169 if (reservoir_write(STDOUT_FILENO
, *preservoir
, options_
.delim_out
)) {
170 NOTIFY_ERROR("failed to output current reservoir contents");
172 if (options_
.verbosity
) {
173 reservoir_write_meta(options_
.status_fd
, *preservoir
, *psamples
, options_
.delim_out
);
176 } while (len
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
178 NOTIFY_ERROR("%s:%s", "read", strerror(errno
));
186 read_buf
->buf_used
+= len
;
188 NOTIFY_DEBUG("len:%zd", len
);
189 D_BUF("read_buf: ", read_buf
);
192 Accumulator has been filled, pare and process.
198 NOTIFY_DEBUG("len:%zd", len
);
200 if (options_
.reservoir_grow_per
201 && grow_count
>= options_
.reservoir_grow_per
) {
202 NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count
, (*preservoir
)->reservoir_sz
+ 1);
204 if (reservoir_grow(preservoir
, 1)) {
205 NOTIFY_ERROR("failed to increase reservoir size");
211 if (options_
.reservoir_grow_double_per
212 && grow_grow_count
>= options_
.reservoir_grow_double_per
) {
213 if (grow_count
> ULONG_MAX
/ 2) {
214 /* would overflow, never grow again */
217 NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
219 NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count
* 2);
224 /* determine if we want to save the next buffer */
225 if (new_buf
== NULL
) {
226 /* if new_buf is not null, we already want to save the next one.. */
227 /* otherwise, save if we've read in fewer lines than the reservoir holds */
228 /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
230 if (*psamples
< (*preservoir
)->reservoir_sz
) {
231 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples
, (*preservoir
)->reservoir_sz
);
233 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples
, (*preservoir
)->reservoir_sz
);
236 if (*psamples
< (*preservoir
)->reservoir_sz
237 || randomness_upto_inclusive(*psamples
) < (*preservoir
)->reservoir_sz
) {
238 NOTIFY_DEBUG("next buffer will be remembered..");
239 new_buf
= buf_new(0);
240 if (new_buf
== NULL
) {
245 NOTIFY_DEBUG("not saving next buffer..");
249 len_flensed
= buf_flense(&read_buf
, bytes_scanned
, delimiter
, new_buf
? &new_buf
: NULL
);
250 if (len_flensed
< 0) {
255 if (len_flensed
== 0) {
256 /* no delimiter found yet, stop parsing and read more */
257 NOTIFY_DEBUG("no delim found after %zd", len
);
259 buf_rebase(read_buf
);
266 D_BUF("read_buf: ", read_buf
);
268 if (new_buf
!= NULL
) {
269 D_BUF("parsed complete line: ", new_buf
);
270 reservoir_remember((*preservoir
), new_buf
);
276 grow_grow_count
+= 1;
281 NOTIFY_DEBUG("loop done\n\n");
282 if (read_buf
->buf_used
) {
283 D_BUF("leftovers: ", read_buf
);
286 || *psamples
< (*preservoir
)->reservoir_sz
) {
287 reservoir_remember((*preservoir
), read_buf
);
301 Wrap strtoul and check for success.
304 unsigned long validate_strtoul_(const char *str
, unsigned int *invalid
) {
309 num
= strtoul(str
, &ep
, 0);
311 || ! (optarg
&& *optarg
!= '\0' && *ep
== '\0') ) {
313 NOTIFY_ERROR("could not parse '%s' as a number: %s", str
, strerror(errno
));
318 int main(int argc
, char *argv
[]) {
320 char *status_filename
= NULL
;
321 reservoir_t reservoir
;
322 unsigned long num_samples
= 0;
323 unsigned int options_error
= 0;
326 while ( (c
= getopt(argc
, argv
, "hvb:d:D:i:j:s:n:r:")) != EOF
) {
329 options_
.verbosity
++;
333 options_
.read_buf_sz
= validate_strtoul_(optarg
, &options_error
);
337 options_
.delim
= *optarg
;
340 options_
.delim_out
= *optarg
;
344 options_
.reservoir_grow_per
= validate_strtoul_(optarg
, &options_error
);
348 options_
.reservoir_grow_double_per
= validate_strtoul_(optarg
, &options_error
);
352 status_filename
= optarg
;
356 options_
.reservoir_sz
= validate_strtoul_(optarg
, &options_error
);
360 options_
.rand_file
= optarg
;
378 /* zero-or-more arguments required */
379 /* if that ever changes... */
380 if (argc
- optind
< 0) {
386 if (randomness_init(options_
.rand_file
)) {
387 NOTIFY_ERROR("failed to initialize randomness source\n");
391 reservoir
= reservoir_new(options_
.reservoir_sz
);
392 if (reservoir
== NULL
) {
393 NOTIFY_ERROR("could not create new reservoir");
397 if (status_filename
) {
398 options_
.status_fd
= open(status_filename
, O_RDONLY
|O_APPEND
|O_CREAT
, 0666);
399 if (options_
.status_fd
< 0) {
400 NOTIFY_ERROR("could not open status file '%s'", status_filename
);
405 sa
.sa_handler
= request_snapshot_
;
406 (void)sigemptyset(&sa
.sa_mask
);
408 if (sigaction(SIGUSR1
, &sa
, NULL
)) {
409 NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno
));
413 if (argc
- optind
== 0) {
414 if (accumulate_input_(STDIN_FILENO
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
418 while (optind
< argc
) {
419 if (strcmp("-", argv
[optind
]) == 0) {
420 if (accumulate_input_(STDIN_FILENO
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
424 int fd
= open(argv
[optind
], O_RDONLY
);
426 NOTIFY_ERROR("%s('%s'):%s", "open", argv
[optind
], strerror(errno
));
429 if (accumulate_input_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
433 NOTIFY_ERROR("%s:%s", "close", strerror(errno
));
441 if (options_
.verbosity
) {
442 fprintf(stderr
, "%zu sample%s, out of %lu total choices\n", reservoir
->reservoir_sz
, options_
.reservoir_sz
> 1 ? "s" : "", num_samples
);
445 if (options_
.verbosity
> 1) {
446 fprintf(stderr
, "%zu selection events\n", reservoir
->reservoir_tally
);
449 if (reservoir_write(STDOUT_FILENO
, reservoir
, options_
.delim_out
)) {