Merge branch 'release/1.3'
[reservoir_sample] / reservoir_sample.c
1 /* reservoir_sample.c
2 This collates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
4 */
5
6 /* To do:
7 Allow user-supplied function (in relation to entry count) to define
8 grow-rate option, rather than static count. JIT would be nice there.
9
10 The entire grow-rate thing is not very well thought-through, will bias
11 the results, and likely not very useful as currently implemented.
12 */
13
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <stdio.h>
19 #include <fcntl.h>
20 #include <limits.h>
21 #include <signal.h>
22 #include <sysexits.h>
23 #include <assert.h>
24
25 #include "version.h"
26 #include "notify.h"
27 #include "buf.h"
28 #include "randomness.h"
29 #include "reservoir.h"
30
31 static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
32
33 static struct options_ {
34 unsigned int verbosity;
35 size_t read_buf_sz;
36 size_t reservoir_sz;
37 unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */
38 unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */
39 char delim;
40 char delim_out;
41 char *rand_file;
42 int status_fd;
43 } options_ = {
44 .verbosity = 0,
45 .read_buf_sz = 8192,
46 .reservoir_sz = 1,
47 .reservoir_grow_per = 0,
48 .delim = '\n',
49 .delim_out = '\n',
50 .rand_file = NULL,
51 .status_fd = STDERR_FILENO,
52 };
53
54 /* #status_requested_
55 This will be set whenever a snapshot of the current state of the sample
56 reservoir is desired. Typically this will occur on a signal.
57 */
58 static int status_requested_ = 0;
59
60 static
61 void usage_(const char *prog, unsigned int full) {
62 FILE *f = full ? stdout : stderr;
63 char *x = strrchr(prog, '/');
64
65 if (x && *(x + 1))
66 prog = x + 1;
67
68 if (full)
69 fprintf(f, "%s -- returns a random sampling of input\n\n",
70 prog);
71
72 fprintf(f, "Usage: %s options\n",
73 prog);
74
75 if (full) {
76 fprintf(f, "\nOptions:\n"
77 "\t-n <num> -- returns <num> samples [default: %zu]\n"
78 "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
79 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
80 "\t-b <bytes> -- input buffer size [default: %zu]\n"
81 "\t-i <num> -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
82 "\t-j <num> -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
83 "\t-s <file> -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
84 "\t-v -- increase verbosity\n"
85 "\t-h -- this screen\n",
86 options_.reservoir_sz,
87 options_.delim,
88 options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)",
89 options_.read_buf_sz,
90 options_.reservoir_grow_double_per,
91 options_.reservoir_grow_per);
92
93 fprintf(f, "\n%78s\n", src_id_);
94 }
95 fflush(f);
96 }
97
98 /* request_snapshot_
99 Signal handler to be bound to USR1.
100 Upon receiving a signal, take note that a snapshot of the current state
101 has been requested.
102 */
103 static
104 void request_snapshot_(int signum) {
105 (void)signum;
106 status_requested_ = 1;
107 }
108
109 /* accumulate_input_
110 Read (up to #read_block_sz bytes at a time, or until EOF) from #fd into
111 an accumulator buffer. For each #delimiter character found in what was
112 just read, occasionally remember the group of preceeding characters.
113 This behavioral logic lives here, rather than being contained in the
114 reservoir module, simply because it was easier to reduce the number of
115 allocations by handling it here.
116 */
117 static
118 int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) {
119 buf_t read_buf, new_buf = NULL;
120 size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */
121 ssize_t len;
122 unsigned long grow_count = 0;
123 unsigned long grow_grow_count = 0;
124
125 assert(read_block_sz > 0);
126 assert(psamples != NULL);
127 assert(preservoir != NULL);
128 assert(*preservoir != NULL);
129
130 if (fd < 0) {
131 return -1;
132 }
133
134 read_buf = buf_new(read_block_sz);
135 if (read_buf == NULL) {
136 return -1;
137 }
138 bytes_scanned = 0;
139
140 /* begin accumulating */
141 for (;;) {
142 NOTIFY_DEBUG("read loop\n\n");
143
144 /*
145 Make sure there's enough room in our input buffer to append a
146 full read() of #read_block_sz onto the end.
147 */
148 if (buf_makeroom(&read_buf, read_block_sz)) {
149 int e = errno;
150 buf_del(&read_buf);
151 buf_del(&new_buf);
152 errno = e;
153 return -1;
154 }
155
156 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf));
157 do {
158 len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf));
159
160 /*
161 A signal may have interrupted read(), deal with that before
162 doing anything else. Status output is best handled here anyhow,
163 due to the naturally quiescent state of things, and also due to
164 not being about to finish and output the final state.
165 */
166 if (status_requested_) {
167 status_requested_ = 0;
168 NOTIFY_DEBUG("dumping reservoir due to signal");
169 if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) {
170 NOTIFY_ERROR("failed to output current reservoir contents");
171 }
172 if (options_.verbosity) {
173 reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out);
174 }
175 }
176 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
177 if (len < 0) {
178 NOTIFY_ERROR("%s:%s", "read", strerror(errno));
179 buf_del(&read_buf);
180 buf_del(&new_buf);
181 return -1;
182 }
183 if (len == 0) {
184 break;
185 }
186 read_buf->buf_used += len;
187
188 NOTIFY_DEBUG("len:%zd", len);
189 D_BUF("read_buf: ", read_buf);
190
191 /*
192 Accumulator has been filled, pare and process.
193 */
194
195 while (len > 0) {
196 ssize_t len_flensed;
197
198 NOTIFY_DEBUG("len:%zd", len);
199
200 if (options_.reservoir_grow_per
201 && grow_count >= options_.reservoir_grow_per) {
202 NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1);
203 grow_count = 0;
204 if (reservoir_grow(preservoir, 1)) {
205 NOTIFY_ERROR("failed to increase reservoir size");
206 buf_del(&read_buf);
207 return -1;
208 }
209 }
210
211 if (options_.reservoir_grow_double_per
212 && grow_grow_count >= options_.reservoir_grow_double_per) {
213 if (grow_count > ULONG_MAX / 2) {
214 /* would overflow, never grow again */
215 grow_count = 0;
216 grow_grow_count = 0;
217 NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
218 } else {
219 NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2);
220 grow_count *= 2;
221 }
222 }
223
224 /* determine if we want to save the next buffer */
225 if (new_buf == NULL) {
226 /* if new_buf is not null, we already want to save the next one.. */
227 /* otherwise, save if we've read in fewer lines than the reservoir holds */
228 /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
229
230 if (*psamples < (*preservoir)->reservoir_sz) {
231 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz);
232 } else {
233 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz);
234 }
235
236 if (*psamples < (*preservoir)->reservoir_sz
237 || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) {
238 NOTIFY_DEBUG("next buffer will be remembered..");
239 new_buf = buf_new(0);
240 if (new_buf == NULL) {
241 buf_del(&read_buf);
242 return -1;
243 }
244 } else {
245 NOTIFY_DEBUG("not saving next buffer..");
246 }
247 }
248
249 len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL);
250 if (len_flensed < 0) {
251 buf_del(&read_buf);
252 buf_del(&new_buf);
253 return -1;
254 }
255 if (len_flensed == 0) {
256 /* no delimiter found yet, stop parsing and read more */
257 NOTIFY_DEBUG("no delim found after %zd", len);
258 bytes_scanned = len;
259 buf_rebase(read_buf);
260 break;
261 }
262
263 len -= len_flensed;
264 bytes_scanned = 0;
265
266 D_BUF("read_buf: ", read_buf);
267
268 if (new_buf != NULL) {
269 D_BUF("parsed complete line: ", new_buf);
270 reservoir_remember((*preservoir), new_buf);
271 new_buf = NULL;
272 }
273
274 *psamples += 1;
275 grow_count += 1;
276 grow_grow_count += 1;
277
278 }
279 }
280 /* leftovers */
281 NOTIFY_DEBUG("loop done\n\n");
282 if (read_buf->buf_used) {
283 D_BUF("leftovers: ", read_buf);
284
285 if (new_buf != NULL
286 || *psamples < (*preservoir)->reservoir_sz) {
287 reservoir_remember((*preservoir), read_buf);
288 read_buf = NULL;
289 }
290
291 *psamples += 1;
292 }
293
294 buf_del(&read_buf);
295 buf_del(&new_buf);
296
297 return 0;
298 }
299
300 /* validate_strtoul_
301 Wrap strtoul and check for success.
302 */
303 static
304 unsigned long validate_strtoul_(const char *str, unsigned int *invalid) {
305 char *ep;
306 unsigned long num;
307
308 errno = 0;
309 num = strtoul(str, &ep, 0);
310 if (errno
311 || ! (optarg && *optarg != '\0' && *ep == '\0') ) {
312 *invalid += 1;
313 NOTIFY_ERROR("could not parse '%s' as a number: %s", str, strerror(errno));
314 }
315 return num;
316 }
317
318 int main(int argc, char *argv[]) {
319 struct sigaction sa;
320 char *status_filename = NULL;
321 reservoir_t reservoir;
322 unsigned long num_samples = 0;
323 unsigned int options_error = 0;
324 int c;
325
326 while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) {
327 switch (c) {
328 case 'v':
329 options_.verbosity++;
330 break;
331
332 case 'b':
333 options_.read_buf_sz = validate_strtoul_(optarg, &options_error);
334 break;
335
336 case 'd':
337 options_.delim = *optarg;
338 /* @fallthrough@ */
339 case 'D':
340 options_.delim_out = *optarg;
341 break;
342
343 case 'i':
344 options_.reservoir_grow_per = validate_strtoul_(optarg, &options_error);
345 break;
346
347 case 'j':
348 options_.reservoir_grow_double_per = validate_strtoul_(optarg, &options_error);
349 break;
350
351 case 's':
352 status_filename = optarg;
353 break;
354
355 case 'n':
356 options_.reservoir_sz = validate_strtoul_(optarg, &options_error);
357 break;
358
359 case 'r':
360 options_.rand_file = optarg;
361 break;
362
363 case 'h':
364 usage_(argv[0], 1);
365 exit(EX_OK);
366
367 default:
368 usage_(argv[0], 0);
369 exit(EX_USAGE);
370 }
371 }
372
373 if (options_error) {
374 exit(EX_USAGE);
375 }
376
377 #if 0
378 /* zero-or-more arguments required */
379 /* if that ever changes... */
380 if (argc - optind < 0) {
381 usage_(argv[0], 0);
382 exit(EX_USAGE);
383 }
384 #endif
385
386 if (randomness_init(options_.rand_file)) {
387 NOTIFY_ERROR("failed to initialize randomness source\n");
388 exit(EX_NOINPUT);
389 }
390
391 reservoir = reservoir_new(options_.reservoir_sz);
392 if (reservoir == NULL) {
393 NOTIFY_ERROR("could not create new reservoir");
394 exit(EX_OSERR);
395 }
396
397 if (status_filename) {
398 options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT, 0666);
399 if (options_.status_fd < 0) {
400 NOTIFY_ERROR("could not open status file '%s'", status_filename);
401 exit(EX_OSERR);
402 }
403 }
404
405 sa.sa_handler = request_snapshot_;
406 (void)sigemptyset(&sa.sa_mask);
407 sa.sa_flags = 0;
408 if (sigaction(SIGUSR1, &sa, NULL)) {
409 NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno));
410 exit(EX_OSERR);
411 }
412
413 if (argc - optind == 0) {
414 if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
415 exit(EX_SOFTWARE);
416 }
417 } else {
418 while (optind < argc) {
419 if (strcmp("-", argv[optind]) == 0) {
420 if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
421 exit(EX_SOFTWARE);
422 }
423 } else {
424 int fd = open(argv[optind], O_RDONLY);
425 if (fd < 0) {
426 NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
427 exit(EX_NOINPUT);
428 }
429 if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
430 exit(EX_SOFTWARE);
431 }
432 if (close(fd)) {
433 NOTIFY_ERROR("%s:%s", "close", strerror(errno));
434 exit(EX_OSERR);
435 }
436 }
437 optind++;
438 }
439 }
440
441 if (options_.verbosity) {
442 fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples);
443 }
444
445 if (options_.verbosity > 1) {
446 fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally);
447 }
448
449 if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) {
450 exit(EX_SOFTWARE);
451 }
452
453 exit(EX_OK);
454 }