2 This collates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
7 Allow user-supplied function (in relation to entry count) to define
8 grow-rate option, rather than static count. JIT would be nice there.
10 The entire grow-rate thing is not very well thought-through, will bias
11 the results, and likely not very useful as currently implemented.
28 #include "randomness.h"
29 #include "reservoir.h"
31 static const char * const src_id_
= "v" VERSION_STR
" " VERSION_DATE
;
33 static struct options_
{
34 unsigned int verbosity
;
37 unsigned long reservoir_grow_per
; /* how many samples before adding a new slot to reservoir */
38 unsigned long reservoir_grow_double_per
; /* how many samples before doubling reservoir_grow_per */
47 .reservoir_grow_per
= 0,
51 .status_fd
= STDERR_FILENO
,
54 /* #status_requested_ will be set whenever a view of the current sample
57 static int status_requested_
= 0;
60 void usage_(const char *prog
, unsigned int full
) {
61 FILE *f
= full
? stdout
: stderr
;
62 char *x
= strrchr(prog
, '/');
68 fprintf(f
, "%s -- returns a random sampling of input\n\n",
71 fprintf(f
, "Usage: %s options\n",
75 fprintf(f
, "\nOptions:\n"
76 "\t-n <num> -- returns <num> samples [default: %zu]\n"
77 "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
78 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
79 "\t-b <bytes> -- set input buffer size [default: %zu]\n"
80 "\t-i <num> -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
81 "\t-h <num> -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
82 "\t-s <file> -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
83 "\t-v -- increase verbosity\n"
84 "\t-h -- this screen\n",
85 options_
.reservoir_sz
,
87 options_
.rand_file
? options_
.rand_file
: "(use system pseudorandom generator)",
89 options_
.reservoir_grow_double_per
,
90 options_
.reservoir_grow_per
);
92 fprintf(f
, "\n%78s\n", src_id_
);
98 Signal handler to be bound to USR1.
99 Upon receiving a signal, take note that a snapshot of the current state
103 void request_snapshot_(int signum
) {
105 status_requested_
= 1;
109 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
110 accumulator buffer. For each #delimiter character found in what was just
111 read, occasionally remember the preceeding characters.
114 int accumulate_input_(int fd
, size_t read_block_sz
, int delimiter
, unsigned long *psamples
, reservoir_t
*preservoir
) {
115 buf_t read_buf
, new_buf
= NULL
;
116 size_t bytes_scanned
; /* how much of the buffer has already been searched for delimiter */
118 unsigned long grow_count
= 0;
119 unsigned long grow_grow_count
= 0;
121 assert(read_block_sz
> 0);
122 assert(psamples
!= NULL
);
123 assert(preservoir
!= NULL
);
124 assert(*preservoir
!= NULL
);
130 read_buf
= buf_new(read_block_sz
);
131 if (read_buf
== NULL
) {
136 /* begin accumulating */
138 NOTIFY_DEBUG("read loop\n\n");
140 /* make sure there's enough room in our input buffer for a full read() */
141 if (buf_makeroom(&read_buf
, read_block_sz
)) {
147 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd
, read_buf
->buf
, read_buf
->buf_start
, bytes_scanned
, BUF_ROOM(read_buf
));
149 len
= read(fd
, read_buf
->buf
+ read_buf
->buf_start
+ read_buf
->buf_used
, BUF_ROOM(read_buf
));
151 /* a signal may have interrupted read(), deal with that before
152 doing anything else */
153 if (status_requested_
) {
154 status_requested_
= 0;
155 NOTIFY_DEBUG("dumping reservoir due to signal");
156 if (reservoir_write(STDOUT_FILENO
, *preservoir
, options_
.delim_out
)) {
157 NOTIFY_ERROR("failed to output current reservoir contents");
159 if (options_
.verbosity
) {
160 reservoir_write_meta(options_
.status_fd
, *preservoir
, *psamples
, options_
.delim_out
);
163 } while (len
== -1 && (errno
== EINTR
|| errno
== EAGAIN
));
165 NOTIFY_ERROR("%s:%s", "read", strerror(errno
));
173 read_buf
->buf_used
+= len
;
175 NOTIFY_DEBUG("len:%zd", len
);
176 D_BUF("read_buf: ", read_buf
);
181 NOTIFY_DEBUG("len:%zd", len
);
183 if (options_
.reservoir_grow_per
184 && grow_count
>= options_
.reservoir_grow_per
) {
185 NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count
, (*preservoir
)->reservoir_sz
+ 1);
187 if (reservoir_grow(preservoir
, 1)) {
188 NOTIFY_ERROR("failed to increase reservoir size");
194 if (options_
.reservoir_grow_double_per
195 && grow_grow_count
>= options_
.reservoir_grow_double_per
) {
196 if (grow_count
> ULONG_MAX
/ 2) {
197 /* would overflow, never grow again */
200 NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
202 NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count
* 2);
207 /* determine if we want to save the next buffer */
208 if (new_buf
== NULL
) {
209 /* if new_buf is not null, we already want to save the next one.. */
210 /* otherwise, save if we've read in fewer lines than the reservoir holds */
211 /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
213 if (*psamples
< (*preservoir
)->reservoir_sz
) {
214 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples
, (*preservoir
)->reservoir_sz
);
216 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples
, (*preservoir
)->reservoir_sz
);
219 if (*psamples
< (*preservoir
)->reservoir_sz
220 || randomness_upto_inclusive(*psamples
) < (*preservoir
)->reservoir_sz
) {
221 NOTIFY_DEBUG("next buffer will be remembered..");
222 new_buf
= buf_new(0);
223 if (new_buf
== NULL
) {
228 NOTIFY_DEBUG("not saving next buffer..");
232 len_flensed
= buf_flense(&read_buf
, bytes_scanned
, delimiter
, new_buf
? &new_buf
: NULL
);
233 if (len_flensed
< 0) {
238 if (len_flensed
== 0) {
239 /* no delimiter found yet, stop parsing and read more */
240 NOTIFY_DEBUG("no delim found after %zd", len
);
242 buf_rebase(read_buf
);
249 D_BUF("read_buf: ", read_buf
);
251 if (new_buf
!= NULL
) {
252 D_BUF("parsed complete line: ", new_buf
);
253 reservoir_remember((*preservoir
), new_buf
);
259 grow_grow_count
+= 1;
264 NOTIFY_DEBUG("loop done\n\n");
265 if (read_buf
->buf_used
) {
266 D_BUF("leftovers: ", read_buf
);
269 || *psamples
< (*preservoir
)->reservoir_sz
) {
270 reservoir_remember((*preservoir
), read_buf
);
284 int main(int argc
, char *argv
[]) {
286 char *status_filename
= NULL
;
287 reservoir_t reservoir
;
288 unsigned long num_samples
= 0;
291 while ( (c
= getopt(argc
, argv
, "hvb:d:D:i:j:s:n:r:")) != EOF
) {
294 options_
.verbosity
++;
298 options_
.read_buf_sz
= strtoul(optarg
, NULL
, 0);
303 options_
.delim
= *optarg
;
306 options_
.delim_out
= *optarg
;
310 options_
.reservoir_grow_per
= strtoul(optarg
, NULL
, 0);
314 options_
.reservoir_grow_double_per
= strtoul(optarg
, NULL
, 0);
318 status_filename
= optarg
;
322 options_
.reservoir_sz
= strtoul(optarg
, NULL
, 0);
326 options_
.rand_file
= optarg
;
340 /* zero-or-more arguments required */
341 /* if that ever changes... */
342 if (argc
- optind
< 0) {
348 if (randomness_init(options_
.rand_file
)) {
349 NOTIFY_ERROR("failed to initialize randomness source\n");
353 reservoir
= reservoir_new(options_
.reservoir_sz
);
354 if (reservoir
== NULL
) {
355 NOTIFY_ERROR("could not create new reservoir");
359 if (status_filename
) {
360 options_
.status_fd
= open(status_filename
, O_RDONLY
|O_APPEND
|O_CREAT
);
361 if (options_
.status_fd
< 0) {
362 NOTIFY_ERROR("could not open status file '%s'", status_filename
);
367 sa
.sa_handler
= request_snapshot_
;
368 (void)sigemptyset(&sa
.sa_mask
);
370 if (sigaction(SIGUSR1
, &sa
, NULL
)) {
371 NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno
));
375 if (argc
- optind
== 0) {
376 if (accumulate_input_(STDIN_FILENO
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
380 while (optind
< argc
) {
381 if (strcmp("-", argv
[optind
]) == 0) {
382 if (accumulate_input_(STDIN_FILENO
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
386 int fd
= open(argv
[optind
], O_RDONLY
);
388 NOTIFY_ERROR("%s('%s'):%s", "open", argv
[optind
], strerror(errno
));
391 if (accumulate_input_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_samples
, &reservoir
)) {
395 NOTIFY_ERROR("%s:%s", "close", strerror(errno
));
403 if (options_
.verbosity
) {
404 fprintf(stderr
, "%zu sample%s, out of %lu total choices\n", reservoir
->reservoir_sz
, options_
.reservoir_sz
> 1 ? "s" : "", num_samples
);
407 if (options_
.verbosity
> 1) {
408 fprintf(stderr
, "%zu selection events\n", reservoir
->reservoir_tally
);
411 if (reservoir_write(STDOUT_FILENO
, reservoir
, options_
.delim_out
)) {