Merge branch 'release-1.1'
[reservoir_sample] / reservoir_sample.c
1 /* reservoir_sample.c
2 This generates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
4 */
5
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <string.h>
9 #include <errno.h>
10 #include <stdio.h>
11 #include <fcntl.h>
12 #include <sys/uio.h>
13 #include <time.h>
14 #include <sysexits.h>
15 #include <assert.h>
16
17 #include "version.h"
18 #include "notify.h"
19 #include "test_suite.h"
20
21 static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
22
23 static struct options_ {
24 unsigned int verbosity;
25 size_t read_buf_sz;
26 size_t reservoir_sz;
27 char delim;
28 char delim_out;
29 char *rand_file;
30 } options_ = {
31 .verbosity = 0,
32 .read_buf_sz = 8192,
33 .reservoir_sz = 1,
34 .delim = '\n',
35 .delim_out = '\n',
36 .rand_file = "/dev/random",
37 };
38
39 static int rand_fd_ = -1;
40
41 /* a simple little sliding-window buffer */
42 typedef struct buf_ {
43 size_t buf_sz;
44 size_t buf_start;
45 size_t buf_used;
46 unsigned char buf[];
47 } *buf_t;
48 #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
49
50 typedef struct reservoir_ {
51 size_t reservoir_insertions; /* how many items have ever been added to reservoir */
52 size_t reservoir_sz;
53 buf_t reservoir[];
54 } *reservoir_t;
55
56 #ifndef NDEBUG
57 #define D_BUF(__pre__,__b__,...) do {\
58 if ( (__b__) == NULL )\
59 NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
60 else {\
61 NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
62 ## __VA_ARGS__,\
63 (__b__),\
64 (__b__)->buf_sz,\
65 (__b__)->buf_start,\
66 (__b__)->buf_used,\
67 BUF_ROOM((__b__)),\
68 (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
69 assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
70 }\
71 } while (0)
72 #define D_RESERVOIR(__r__) do {\
73 size_t i;\
74 for (i = 0; i < (__r__)->reservoir_sz; i++) {\
75 D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
76 }\
77 NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
78 } while (0)
79 #else
80 #define D_RESERVOIR(...)
81 #define D_BUF(...)
82 #endif
83
84
85 static
86 void usage_(const char *prog, unsigned int full) {
87 FILE *f = full ? stdout : stderr;
88 char *x = strrchr(prog, '/');
89
90 if (x && *(x + 1))
91 prog = x + 1;
92
93 if (full)
94 fprintf(f, "%s -- returns a random sampling of input\n\n",
95 prog);
96
97 fprintf(f, "Usage: %s options\n",
98 prog);
99
100 if (full) {
101 fprintf(f, "\nOptions:\n"
102 "\t-n <num> -- returns <num> samples [default: %zu]\n"
103 "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
104 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
105 "\t-b <bytes> -- read buffer size [default: %zu]\n"
106 "\t-v -- increase verbosity\n"
107 "\t-h -- this screen\n",
108 options_.reservoir_sz,
109 options_.delim,
110 options_.rand_file,
111 options_.read_buf_sz);
112
113 fprintf(f, "\n%78s\n", src_id_);
114 }
115 fflush(f);
116 }
117
118
119 /* rand_upto_inclusive_
120 Room for improvement: constrain bits of randomness consumed, based on #limit
121 also maybe read chunks of randomness at a time
122 */
123 static
124 unsigned long rand_upto_inclusive_(unsigned long limit) {
125 unsigned long randomness;
126
127 if (limit == 0)
128 return 0;
129
130 if (rand_fd_ != -1) {
131 ssize_t len;
132
133 len = read(rand_fd_, &randomness, sizeof randomness);
134 if (len == sizeof randomness) {
135 randomness %= limit + 1;
136
137 return randomness;
138 }
139 NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
140 }
141
142 /* fall back to dumb randomness */
143 randomness = mrand48();
144 randomness %= limit + 1;
145
146 return randomness;
147 }
148
149
150 static
151 int rand_init_(char *rand_file) {
152 srand48(time(NULL) ^ getpid());
153 if (rand_file) {
154 rand_fd_ = open(rand_file, O_RDONLY);
155 if (rand_fd_ == -1) {
156 NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno));
157 return -1;
158 }
159 }
160 return 0;
161 }
162
163
164 static
165 buf_t buf_new_(size_t sz) {
166 buf_t buf = malloc(sz + sizeof *buf);
167 if (buf) {
168 buf->buf_sz = sz;
169 buf->buf_start = buf->buf_used = 0;
170 memset(buf->buf, 0, sz);
171 }
172 return buf;
173 }
174
175
176 static
177 void buf_rebase_(buf_t buf) {
178 if (buf->buf_start == 0)
179 return;
180 memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
181 buf->buf_start = 0;
182 }
183
184
185 static
186 int buf_makeroom_(buf_t *pbuf, size_t roomfor) {
187 size_t new_sz;
188 void *tmp_ptr;
189
190 assert(pbuf != NULL);
191
192 if (*pbuf == NULL) {
193 *pbuf = buf_new_(roomfor);
194 if (*pbuf == NULL) {
195 return -1;
196 }
197 }
198
199 buf_rebase_(*pbuf);
200
201 if (BUF_ROOM(*pbuf) >= roomfor)
202 return 0;
203
204 new_sz = (*pbuf)->buf_used + roomfor;
205 tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
206 if (tmp_ptr == NULL) {
207 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
208 return -1;
209 }
210 *pbuf = tmp_ptr;
211 (*pbuf)->buf_sz = new_sz;
212
213 return 0;
214 }
215
216
217 static
218 int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) {
219 assert(src != NULL);
220 assert(dst != NULL);
221 assert(src_skip + n <= src->buf_used);
222
223 if (buf_makeroom_(dst, n)) {
224 return -1;
225 }
226
227 memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n);
228 (*dst)->buf_used += n;
229
230 return 0;
231 }
232
233
234 /* buf_flense_
235 Starting after #src_offset characters, scan through #src, stopping at
236 the first character matching #delimiter, whereupon all the characters
237 leading up to #delimiter are copied into *#dst if #dst is not NULL. #src
238 becomes the characters following #delimiter.
239 Returns the number of characters flensed from #src.
240
241 Room for improvement:
242 If flensed segment is more than half the buffer, copy remainder of src
243 into dst, then return src, leaving dst in its place.
244 */
245 static
246 ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) {
247 const size_t delimiter_len = 1;
248 size_t i;
249
250 assert(src != NULL);
251 assert(src_offset <= (*src)->buf_used);
252
253 NOTIFY_DEBUG("src_offset:%zu", src_offset);
254 D_BUF("src ", *src);
255 D_BUF("dst ", dst ? *dst : NULL);
256
257 for (i = src_offset; i < (*src)->buf_used; i++) {
258 if ((*src)->buf[(*src)->buf_start + i] == delimiter) {
259
260 if (dst != NULL) {
261 if (buf_range_dup_or_append_((*src), 0, i, dst)) {
262 return -1;
263 }
264 }
265
266 (*src)->buf_start += i + delimiter_len;
267 (*src)->buf_used -= i + delimiter_len;
268
269 D_BUF("src ", *src);
270 D_BUF("dst ", dst ? *dst : NULL);
271 return i + delimiter_len;
272 }
273 }
274
275 return 0;
276 }
277
278 #ifdef TEST
279
280 static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
281 static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n";
282
283 struct buf_flense_expected_result_ {
284 ssize_t r;
285 const char *buf;
286 } buf_flense_expected1[] = {
287 { 1 + 1, "a" },
288 { 2 + 1, "bc" },
289 { 3 + 1, "def" },
290 { 4 + 1, "ghij" },
291 { 5 + 1, "klmno" },
292 { 3 + 1, "pqr" },
293 { 1 + 1, "s" },
294 { 3 + 1, "tuv" },
295 { 5 + 1, "wxyz0" },
296 { 4 + 1, "1234" },
297 { 3 + 1, "567" },
298 { 2 + 1, "89" },
299 { 0, "0leftovers" },
300 }, buf_flense_expected2[] = {
301 { 0 + 1, "" },
302 { 0 + 1, "" },
303 { 3 + 1, "foo" },
304 { 3 + 1, "bar" },
305 { 0 + 1, "" },
306 { 0, "" },
307 };
308
309 struct test_buf_flense_data_ {
310 const char *src;
311 const struct buf_flense_expected_result_ *expected;
312 } test_buf_flense_data1 = {
313 .src = buf_flense_testdata1,
314 .expected = buf_flense_expected1,
315 }, test_buf_flense_data2 = {
316 .src = buf_flense_testdata2,
317 .expected = buf_flense_expected2,
318 };
319
320 static int test_buf_flense_(void *test_arg, void *suite_arg) {
321 (void)suite_arg;
322 struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
323 const char testdata_len = strlen(test_data->src);
324 const struct buf_flense_expected_result_ *next_result = test_data->expected;
325 int retval = 0;
326 buf_t src;
327
328 TEST_INFO("initializing src buffer with %zu characters", testdata_len);
329 src = buf_new_(testdata_len);
330 if (src == NULL) {
331 TEST_ERROR("%s:%s", "new_buf_", "failed");
332 return -1;
333 }
334 memcpy(src->buf, test_data->src, testdata_len);
335 src->buf_used += testdata_len;
336
337 D_BUF("src ", src);
338
339 for (;;) {
340 ssize_t r;
341 buf_t dst = NULL;
342
343 r = buf_flense_(&src, 0, '\n', &dst);
344 if (r != next_result->r) {
345 TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
346 retval = -1;
347 }
348 if (r == 0) {
349 TEST_INFO("finished flensing");
350 break;
351 }
352
353 if (strlen(next_result->buf) > dst->buf_used) {
354 TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
355 D_BUF("dst ", dst);
356 retval = -1;
357 } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
358 TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
359 D_BUF("dst ", dst);
360 retval = -1;
361 }
362
363 TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
364
365 memset(dst, 0, dst->buf_sz + sizeof *dst);
366 free(dst);
367 dst = NULL;
368
369 next_result++;
370 }
371 if (strlen(next_result->buf) > src->buf_used) {
372 TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
373 D_BUF("src ", src);
374 retval = -1;
375 } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
376 TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
377 D_BUF("src ", src);
378 retval = -1;
379 }
380
381 TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
382
383 memset(src, 0, src->buf_sz + sizeof *src);
384 free(src);
385 src = NULL;
386
387 return retval;
388 }
389 #endif /* TEST */
390
391
392 /* reservoir_remember_
393 Remember #line, forgetting a line if more than #num_lines have already
394 been remembered. Remembers them in random order.
395 */
396 static
397 void reservoir_remember_(reservoir_t reservoir, buf_t buf) {
398 buf_t old_buf;
399
400 assert(reservoir != NULL);
401
402 D_BUF("reserving ", buf);
403
404 if (reservoir->reservoir_sz > 0) {
405 unsigned long randomness;
406
407 if (reservoir->reservoir_insertions < reservoir->reservoir_sz) {
408 /* there are still unused slots, fill them up without discarding anything */
409 /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
410 randomness = rand_upto_inclusive_(reservoir->reservoir_insertions);
411
412 assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */
413
414 NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions);
415 reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness];
416 old_buf = NULL; /* no old entry to discard */
417 } else {
418 randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
419 old_buf = reservoir->reservoir[randomness];
420 }
421 NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
422 reservoir->reservoir[randomness] = buf;
423 } else {
424 /* can't add anything to a zero-size reservoir, so just dispose of new item */
425 old_buf = buf;
426 }
427
428 reservoir->reservoir_insertions += 1;
429
430 if (old_buf != NULL) {
431 D_BUF("FREEING ", old_buf);
432 memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
433 free(old_buf);
434 }
435
436 D_RESERVOIR(reservoir);
437 }
438
439
440 /* reservoir_read_
441 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
442 accumulator buffer. For each #delimiter character found in what was just
443 read, occasionally remember the preceeding characters.
444 */
445 static
446 int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) {
447 buf_t read_buf, new_buf = NULL;
448 size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
449 ssize_t len;
450
451 assert(read_block_sz > 0);
452 assert(num_lines != NULL);
453 assert(reservoir != NULL);
454
455 if (fd < 0) {
456 return -1;
457 }
458
459 read_buf = buf_new_(read_block_sz);
460 if (read_buf == NULL) {
461 return -1;
462 }
463 line_scanned = 0;
464
465 for (;;) {
466 NOTIFY_DEBUG("read loop\n\n");
467
468 if (buf_makeroom_(&read_buf, read_block_sz)) {
469 free(read_buf);
470 free(new_buf);
471 return -1;
472 }
473
474 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf));
475 len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf));
476 if (len < 0) {
477 NOTIFY_ERROR("%s:%s", "read", strerror(errno));
478 free(read_buf);
479 free(new_buf);
480 return -1;
481 }
482 if (len == 0) {
483 break;
484 }
485 read_buf->buf_used += len;
486
487 NOTIFY_DEBUG("len:%zd", len);
488 D_BUF("read_buf: ", read_buf);
489
490 while (len > 0) {
491 ssize_t len_flensed;
492
493 NOTIFY_DEBUG("len:%zd", len);
494
495 /* determine if we want to save the next buffer */
496 if (new_buf == NULL) {
497 /* if new_buf is not null, we already want to save the next one.. */
498 /* otherwise, save if we've read in fewer lines than the reservoir holds */
499 /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
500
501 if (*num_lines < reservoir->reservoir_sz) {
502 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz);
503 } else {
504 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz);
505 }
506
507 if (*num_lines < reservoir->reservoir_sz
508 || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) {
509 NOTIFY_DEBUG("next buffer will be reserved..");
510 new_buf = buf_new_(0);
511 if (new_buf == NULL) {
512 free(read_buf);
513 return -1;
514 }
515 } else {
516 NOTIFY_DEBUG("not saving next buffer..");
517 }
518 }
519
520 len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL);
521 if (len_flensed < 0) {
522 free(read_buf);
523 free(new_buf);
524 return -1;
525 }
526 if (len_flensed == 0) {
527 /* no delimiter found yet, stop parsing and read more */
528 NOTIFY_DEBUG("no delim found after %zd", len);
529 line_scanned = len;
530 buf_rebase_(read_buf);
531 break;
532 }
533
534 len -= len_flensed;
535 line_scanned = 0;
536
537 D_BUF("read_buf: ", read_buf);
538
539 if (new_buf != NULL) {
540 D_BUF("parsed complete line: ", new_buf);
541 reservoir_remember_(reservoir, new_buf);
542 new_buf = NULL;
543 }
544
545 *num_lines += 1;
546
547 }
548 }
549 /* leftovers */
550 NOTIFY_DEBUG("loop done\n\n");
551 if (read_buf->buf_used) {
552 D_BUF("leftovers: ", read_buf);
553
554 if (new_buf != NULL
555 || *num_lines < reservoir->reservoir_sz) {
556 reservoir_remember_(reservoir, read_buf);
557 read_buf = NULL;
558 }
559
560 *num_lines += 1;
561 }
562
563 free(read_buf);
564 free(new_buf);
565
566 return 0;
567 }
568
569
570 /* reservoir_write_
571 Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
572 */
573 static
574 int reservoir_write_(int fd, reservoir_t reservoir, char delim) {
575 ssize_t len;
576 size_t i;
577 struct iovec iov[2];
578
579 iov[1].iov_base = &delim;
580 iov[1].iov_len = sizeof delim;
581
582 assert(reservoir != NULL);
583 D_RESERVOIR(reservoir);
584
585 for (i = 0; i < reservoir->reservoir_sz; i++) {
586 if (reservoir->reservoir[i]) {
587 iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
588 iov[0].iov_len = reservoir->reservoir[i]->buf_used;
589 } else {
590 iov[0].iov_base = NULL;
591 iov[0].iov_len = 0;
592 }
593
594 len = writev(fd, iov, sizeof iov / sizeof *iov);
595 if (len < 0) {
596 NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
597 return -1;
598 }
599 }
600
601 return 0;
602 }
603
604 #ifndef TEST
605
606 int main(int argc, char *argv[]) {
607 reservoir_t reservoir;
608 size_t num_lines = 0;
609 int fd;
610 int c;
611
612 while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) {
613 switch (c) {
614 case 'v':
615 options_.verbosity++;
616 break;
617
618 case 'b':
619 options_.read_buf_sz = atoi(optarg);
620 break;
621
622 case 'd':
623 options_.delim = *optarg;
624 options_.delim_out = *optarg;
625 break;
626
627 case 'n':
628 options_.reservoir_sz = atoi(optarg);
629 break;
630
631 case 'r':
632 options_.rand_file = optarg;
633 break;
634
635 case 'h':
636 usage_(argv[0], 1);
637 exit(EX_OK);
638
639 default:
640 usage_(argv[0], 0);
641 exit(EX_USAGE);
642 }
643 }
644
645 if (argc - optind < 0) {
646 usage_(argv[0], 0);
647 exit(EX_USAGE);
648 }
649
650 if (rand_init_(options_.rand_file)) {
651 NOTIFY_ERROR("failed to initialize randomness source\n");
652 exit(EX_NOINPUT);
653 }
654
655 reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
656 if (reservoir == NULL) {
657 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
658 exit(EX_OSERR);
659 }
660 reservoir->reservoir_sz = options_.reservoir_sz;
661 reservoir->reservoir_insertions = 0;
662 memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
663
664 if (argc - optind == 0) {
665 fd = STDIN_FILENO;
666 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
667 exit(EX_SOFTWARE);
668 }
669 } else {
670 while (optind < argc) {
671 if (strcmp("-", argv[optind]) == 0) {
672 fd = STDIN_FILENO;
673 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
674 exit(EX_SOFTWARE);
675 }
676 } else {
677 fd = open(argv[optind], O_RDONLY);
678 if (fd < 0) {
679 NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
680 exit(EX_NOINPUT);
681 }
682 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
683 exit(EX_SOFTWARE);
684 }
685 if (close(fd)) {
686 NOTIFY_ERROR("%s:%s", "close", strerror(errno));
687 exit(EX_OSERR);
688 }
689 }
690 optind++;
691 }
692 }
693
694 if (options_.verbosity) {
695 fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
696 }
697
698 if (options_.verbosity > 1) {
699 fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions);
700 }
701
702 if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
703 exit(EX_SOFTWARE);
704 }
705
706 exit(EX_OK);
707 }
708
709 #else /* TEST */
710
711 void *test_suite_data;
712
713 int test_suite_pre(void *suite_data) {
714 (void)suite_data;
715 if (rand_init_(NULL)) {
716 return -1;
717 }
718 return 0;
719 }
720 int test_suite_post(void *suite_data) {
721 (void)suite_data;
722 return 0;
723 }
724
725 test_t test_suite[] = {
726 { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
727 { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
728 { NULL, NULL, NULL, NULL, NULL },
729 };
730
731
732 #endif /* TEST */