Merge branch 'release-1.0' into develop
[reservoir_sample] / reservoir_sample.c
1 /* reservoir_sample.c
2 This generates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
4 */
5
6 #include <stdlib.h>
7 #include <unistd.h>
8 #include <string.h>
9 #include <errno.h>
10 #include <stdio.h>
11 #include <fcntl.h>
12 #include <sys/uio.h>
13 #include <time.h>
14 #include <sysexits.h>
15 #include <assert.h>
16
17 #include "version.h"
18 #include "notify.h"
19 #include "test_suite.h"
20
21 static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
22
23 static struct options_ {
24 unsigned int verbosity;
25 size_t read_buf_sz;
26 size_t reservoir_sz;
27 char delim;
28 char delim_out;
29 char *rand_file;
30 } options_ = {
31 .verbosity = 0,
32 .read_buf_sz = 8192,
33 .reservoir_sz = 1,
34 .delim = '\n',
35 .delim_out = '\n',
36 .rand_file = "/dev/random",
37 };
38
39 static int rand_fd_ = -1;
40
41 typedef struct buf_ {
42 size_t buf_sz;
43 size_t buf_start;
44 size_t buf_used;
45 unsigned char buf[];
46 } *buf_t;
47 #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
48
49 typedef struct reservoir_ {
50 size_t reservoir_sz;
51 buf_t reservoir[];
52 } *reservoir_t;
53
54 #ifndef NDEBUG
55 #define D_BUF(__pre__,__b__,...) do {\
56 if ( (__b__) == NULL )\
57 NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
58 else {\
59 NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
60 ## __VA_ARGS__,\
61 (__b__),\
62 (__b__)->buf_sz,\
63 (__b__)->buf_start,\
64 (__b__)->buf_used,\
65 BUF_ROOM((__b__)),\
66 (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
67 assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
68 }\
69 } while (0)
70 #define D_RESERVOIR(__r__) do {\
71 size_t i;\
72 for (i = 0; i < (__r__)->reservoir_sz; i++) {\
73 D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0)
74 #else
75 #define D_RESERVOIR(...)
76 #define D_BUF(...)
77 #endif
78
79
80 static
81 void usage_(const char *prog, unsigned int full) {
82 FILE *f = full ? stdout : stderr;
83 char *x = strrchr(prog, '/');
84
85 if (x && *(x + 1))
86 prog = x + 1;
87
88 if (full)
89 fprintf(f, "%s -- returns a random sampling of input\n\n",
90 prog);
91
92 fprintf(f, "Usage: %s options\n",
93 prog);
94
95 if (full) {
96 fprintf(f, "\nOptions:\n"
97 "\t-n <num> -- returns <num> samples [default: %zu]\n"
98 "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
99 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
100 "\t-b <bytes> -- read buffer size [default: %zu]\n"
101 "\t-v -- increase verbosity\n"
102 "\t-h -- this screen\n",
103 options_.reservoir_sz,
104 options_.delim,
105 options_.rand_file,
106 options_.read_buf_sz);
107
108 fprintf(f, "\n%78s\n", src_id_);
109 }
110 fflush(f);
111 }
112
113
114 /* rand_upto_inclusive_
115 Room for improvement: constrain bits of randomness consumed, based on #limit
116 also maybe read chunks of randomness at a time
117 */
118 static
119 unsigned long rand_upto_inclusive_(unsigned long limit) {
120 unsigned long randomness;
121
122 if (limit == 0)
123 return 0;
124
125 if (rand_fd_ != -1) {
126 ssize_t len;
127
128 len = read(rand_fd_, &randomness, sizeof randomness);
129 if (len == sizeof randomness) {
130 randomness %= limit + 1;
131
132 return randomness;
133 }
134 NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
135 }
136
137 randomness = mrand48();
138 randomness %= limit + 1;
139
140 return randomness;
141 }
142
143
144 static
145 int rand_init_(char *rand_file) {
146 srand48(time(NULL) ^ getpid());
147 if (rand_file) {
148 rand_fd_ = open(rand_file, O_RDONLY);
149 if (rand_fd_ == -1) {
150 NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno));
151 return -1;
152 }
153 }
154 return 0;
155 }
156
157
158 static
159 buf_t buf_new_(size_t sz) {
160 buf_t buf = malloc(sz + sizeof *buf);
161 if (buf) {
162 buf->buf_sz = sz;
163 buf->buf_start = buf->buf_used = 0;
164 memset(buf->buf, 0, sz);
165 }
166 return buf;
167 }
168
169
170 static
171 void buf_rebase_(buf_t buf) {
172 if (buf->buf_start == 0)
173 return;
174 memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
175 buf->buf_start = 0;
176 }
177
178
179 static
180 int buf_makeroom_(buf_t *pbuf, size_t roomfor) {
181 size_t new_sz;
182 void *tmp_ptr;
183
184 assert(pbuf != NULL);
185
186 if (*pbuf == NULL) {
187 *pbuf = buf_new_(roomfor);
188 if (*pbuf == NULL) {
189 return -1;
190 }
191 }
192
193 buf_rebase_(*pbuf);
194
195 if (BUF_ROOM(*pbuf) >= roomfor)
196 return 0;
197
198 new_sz = (*pbuf)->buf_used + roomfor;
199 tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
200 if (tmp_ptr == NULL) {
201 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
202 return -1;
203 }
204 *pbuf = tmp_ptr;
205 (*pbuf)->buf_sz = new_sz;
206
207 return 0;
208 }
209
210
211 static
212 int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) {
213 assert(src != NULL);
214 assert(dst != NULL);
215 assert(src_skip + n <= src->buf_used);
216
217 if (buf_makeroom_(dst, n)) {
218 return -1;
219 }
220
221 memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n);
222 (*dst)->buf_used += n;
223
224 return 0;
225 }
226
227
228 /* buf_flense_
229 Starting after #src_offset characters, scan through #src, stopping at
230 the first character matching #delimiter, whereupon all the characters
231 leading up to #delimiter are copied into *#dst if #dst is not NULL. #src
232 becomes the characters following #delimiter.
233 Returns the number of characters flensed from #src.
234
235 Room for improvement:
236 If flensed segment is more than half the buffer, copy remainder of src
237 into dst, then return src, leaving dst in its place.
238 */
239 static
240 ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) {
241 const size_t delimiter_len = 1;
242 size_t i;
243
244 assert(src != NULL);
245 assert(src_offset <= (*src)->buf_used);
246
247 NOTIFY_DEBUG("src_offset:%zu", src_offset);
248 D_BUF("src ", *src);
249 D_BUF("dst ", dst ? *dst : NULL);
250
251 for (i = src_offset; i < (*src)->buf_used; i++) {
252 if ((*src)->buf[(*src)->buf_start + i] == delimiter) {
253
254 if (dst != NULL) {
255 if (buf_range_dup_or_append_((*src), 0, i, dst)) {
256 return -1;
257 }
258 }
259
260 (*src)->buf_start += i + delimiter_len;
261 (*src)->buf_used -= i + delimiter_len;
262
263 D_BUF("src ", *src);
264 D_BUF("dst ", dst ? *dst : NULL);
265 return i + delimiter_len;
266 }
267 }
268
269 return 0;
270 }
271
272 #ifdef TEST
273
274 static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
275 static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n";
276
277 struct buf_flense_expected_result_ {
278 ssize_t r;
279 const char *buf;
280 } buf_flense_expected1[] = {
281 { 1 + 1, "a" },
282 { 2 + 1, "bc" },
283 { 3 + 1, "def" },
284 { 4 + 1, "ghij" },
285 { 5 + 1, "klmno" },
286 { 3 + 1, "pqr" },
287 { 1 + 1, "s" },
288 { 3 + 1, "tuv" },
289 { 5 + 1, "wxyz0" },
290 { 4 + 1, "1234" },
291 { 3 + 1, "567" },
292 { 2 + 1, "89" },
293 { 0, "0leftovers" },
294 }, buf_flense_expected2[] = {
295 { 0 + 1, "" },
296 { 0 + 1, "" },
297 { 3 + 1, "foo" },
298 { 3 + 1, "bar" },
299 { 0 + 1, "" },
300 { 0, "" },
301 };
302
303 struct test_buf_flense_data_ {
304 const char *src;
305 const struct buf_flense_expected_result_ *expected;
306 } test_buf_flense_data1 = {
307 .src = buf_flense_testdata1,
308 .expected = buf_flense_expected1,
309 }, test_buf_flense_data2 = {
310 .src = buf_flense_testdata2,
311 .expected = buf_flense_expected2,
312 };
313
314 static int test_buf_flense_(void *test_arg, void *suite_arg) {
315 (void)suite_arg;
316 struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
317 const char testdata_len = strlen(test_data->src);
318 const struct buf_flense_expected_result_ *next_result = test_data->expected;
319 int retval = 0;
320 buf_t src;
321
322 TEST_INFO("initializing src buffer with %zu characters", testdata_len);
323 src = buf_new_(testdata_len);
324 if (src == NULL) {
325 TEST_ERROR("%s:%s", "new_buf_", "failed");
326 return -1;
327 }
328 memcpy(src->buf, test_data->src, testdata_len);
329 src->buf_used += testdata_len;
330
331 D_BUF("src ", src);
332
333 for (;;) {
334 ssize_t r;
335 buf_t dst = NULL;
336
337 r = buf_flense_(&src, 0, '\n', &dst);
338 if (r != next_result->r) {
339 TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
340 retval = -1;
341 }
342 if (r == 0) {
343 TEST_INFO("finished flensing");
344 break;
345 }
346
347 if (strlen(next_result->buf) > dst->buf_used) {
348 TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
349 D_BUF("dst ", dst);
350 retval = -1;
351 } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
352 TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
353 D_BUF("dst ", dst);
354 retval = -1;
355 }
356
357 TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
358
359 memset(dst, 0, dst->buf_sz + sizeof *dst);
360 free(dst);
361 dst = NULL;
362
363 next_result++;
364 }
365 if (strlen(next_result->buf) > src->buf_used) {
366 TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
367 D_BUF("src ", src);
368 retval = -1;
369 } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
370 TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
371 D_BUF("src ", src);
372 retval = -1;
373 }
374
375 TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
376
377 memset(src, 0, src->buf_sz + sizeof *src);
378 free(src);
379 src = NULL;
380
381 return retval;
382 }
383 #endif /* TEST */
384
385
386 /* reservoir_remember_
387 Remember #line, forgetting a line if more than #num_lines have already
388 been remembered. Remembers them in random order.
389 */
390 static
391 void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) {
392 unsigned long randomness;
393 buf_t old_buf;
394
395 assert(reservoir != NULL);
396
397 D_BUF("reserving ", buf);
398
399 if (reservoir->reservoir_sz > 0) {
400 if (num_lines < reservoir->reservoir_sz) {
401 randomness = rand_upto_inclusive_(num_lines);
402 NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines);
403 reservoir->reservoir[num_lines] = reservoir->reservoir[randomness];
404 old_buf = NULL;
405 } else {
406 randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
407 old_buf = reservoir->reservoir[randomness];
408 }
409 NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
410 reservoir->reservoir[randomness] = buf;
411 } else {
412 old_buf = buf;
413 }
414
415 if (old_buf != NULL) {
416 D_BUF("FREEING ", old_buf);
417 memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
418 free(old_buf);
419 }
420
421 D_RESERVOIR(reservoir);
422 }
423
424
425 /* reservoir_read_
426 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
427 accumulator buffer. For each #delimiter character found in what was just
428 read, occasionally remember the preceeding characters.
429 */
430 static
431 int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) {
432 buf_t read_buf, new_buf = NULL;
433 size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
434 ssize_t len;
435
436 assert(read_block_sz > 0);
437 assert(num_lines != NULL);
438 assert(reservoir != NULL);
439
440 if (fd < 0) {
441 return -1;
442 }
443
444 read_buf = buf_new_(read_block_sz);
445 if (read_buf == NULL) {
446 return -1;
447 }
448 line_scanned = 0;
449
450 for (;;) {
451 NOTIFY_DEBUG("read loop\n\n");
452
453 if (buf_makeroom_(&read_buf, read_block_sz)) {
454 free(read_buf);
455 free(new_buf);
456 return -1;
457 }
458
459 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf));
460 len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf));
461 if (len < 0) {
462 NOTIFY_ERROR("%s:%s", "read", strerror(errno));
463 free(read_buf);
464 free(new_buf);
465 return -1;
466 }
467 if (len == 0) {
468 break;
469 }
470 read_buf->buf_used += len;
471
472 NOTIFY_DEBUG("len:%zd", len);
473 D_BUF("read_buf: ", read_buf);
474
475 while (len > 0) {
476 ssize_t len_flensed;
477
478 NOTIFY_DEBUG("len:%zd", len);
479
480 /* determine if we want to save the next buffer */
481 if (new_buf == NULL) {
482 /* if new_buf is not null, we already want to save the next one.. */
483 /* otherwise, save if we've read in fewer lines than the reservoir holds */
484 /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
485
486 if (*num_lines < reservoir->reservoir_sz) {
487 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz);
488 } else {
489 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz);
490 }
491
492 if (*num_lines < reservoir->reservoir_sz
493 || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) {
494 NOTIFY_DEBUG("next buffer will be reserved..");
495 new_buf = buf_new_(0);
496 if (new_buf == NULL) {
497 free(read_buf);
498 return -1;
499 }
500 } else {
501 NOTIFY_DEBUG("not saving next buffer..");
502 }
503 }
504
505 len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL);
506 if (len_flensed < 0) {
507 free(read_buf);
508 free(new_buf);
509 return -1;
510 }
511 if (len_flensed == 0) {
512 /* no delimiter found yet, stop parsing and read more */
513 NOTIFY_DEBUG("no delim found after %zd", len);
514 line_scanned = len;
515 buf_rebase_(read_buf);
516 break;
517 }
518
519 len -= len_flensed;
520 line_scanned = 0;
521
522 D_BUF("read_buf: ", read_buf);
523
524 if (new_buf != NULL) {
525 D_BUF("parsed complete line: ", new_buf);
526 reservoir_remember_(reservoir, *num_lines, new_buf);
527 new_buf = NULL;
528 *num_lines += 1;
529 }
530
531 }
532 }
533 /* leftovers */
534 NOTIFY_DEBUG("loop done\n\n");
535 if (read_buf->buf_used) {
536 D_BUF("leftovers: ", read_buf);
537
538 if (new_buf != NULL
539 || *num_lines < reservoir->reservoir_sz) {
540 reservoir_remember_(reservoir, *num_lines, read_buf);
541 read_buf = NULL;
542 }
543
544 *num_lines += 1;
545 }
546
547 free(read_buf);
548 free(new_buf);
549
550 return 0;
551 }
552
553
554 /* reservoir_write_
555 Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
556 */
557 static
558 int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) {
559 ssize_t len;
560 size_t i;
561 struct iovec iov[2];
562
563 iov[1].iov_base = &delim;
564 iov[1].iov_len = sizeof delim;
565
566 assert(reservoir != NULL);
567 D_RESERVOIR(reservoir);
568
569 for (i = 0; i < reservoir->reservoir_sz; i++) {
570 if (reservoir->reservoir[i]) {
571 iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
572 iov[0].iov_len = reservoir->reservoir[i]->buf_used;
573 } else {
574 iov[0].iov_base = NULL;
575 iov[0].iov_len = 0;
576 }
577
578 len = writev(fd, iov, sizeof iov / sizeof *iov);
579 if (len < 0) {
580 NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
581 return -1;
582 }
583 }
584
585 return 0;
586 }
587
588 #ifndef TEST
589
590 int main(int argc, char *argv[]) {
591 struct reservoir_ *reservoir;
592 size_t num_lines = 0;
593 int fd;
594 int c;
595
596 while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) {
597 switch (c) {
598 case 'v':
599 options_.verbosity++;
600 break;
601
602 case 'b':
603 options_.read_buf_sz = atoi(optarg);
604 break;
605
606 case 'd':
607 options_.delim = *optarg;
608 options_.delim_out = *optarg;
609 break;
610
611 case 'n':
612 options_.reservoir_sz = atoi(optarg);
613 break;
614
615 case 'h':
616 usage_(argv[0], 1);
617 exit(EX_OK);
618
619 default:
620 usage_(argv[0], 0);
621 exit(EX_USAGE);
622 }
623 }
624
625 if (argc - optind < 0) {
626 usage_(argv[0], 0);
627 exit(EX_USAGE);
628 }
629
630 if (rand_init_(options_.rand_file)) {
631 NOTIFY_ERROR("failed to initialize randomness source\n");
632 exit(EX_NOINPUT);
633 }
634
635 reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
636 if (reservoir == NULL) {
637 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
638 exit(EX_OSERR);
639 }
640 reservoir->reservoir_sz = options_.reservoir_sz;
641 memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
642
643 if (argc - optind == 0) {
644 fd = STDIN_FILENO;
645 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
646 exit(EX_SOFTWARE);
647 }
648 } else {
649 while (optind < argc) {
650 if (strcmp("-", argv[optind]) == 0) {
651 fd = STDIN_FILENO;
652 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
653 exit(EX_SOFTWARE);
654 }
655 } else {
656 fd = open(argv[optind], O_RDONLY);
657 if (fd < 0) {
658 NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
659 exit(EX_NOINPUT);
660 }
661 if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
662 exit(EX_SOFTWARE);
663 }
664 if (close(fd)) {
665 NOTIFY_ERROR("%s:%s", "close", strerror(errno));
666 exit(EX_OSERR);
667 }
668 }
669 optind++;
670 }
671 }
672
673 if (options_.verbosity) {
674 fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
675 }
676
677 if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
678 exit(EX_SOFTWARE);
679 }
680
681 exit(EX_OK);
682 }
683
684 #else /* TEST */
685
686 void *test_suite_data;
687
688 int test_suite_pre(void *suite_data) {
689 (void)suite_data;
690 if (rand_init_(NULL)) {
691 return -1;
692 }
693 return 0;
694 }
695 int test_suite_post(void *suite_data) {
696 (void)suite_data;
697 return 0;
698 }
699
700 test_t test_suite[] = {
701 { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
702 { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
703 { NULL, NULL, NULL, NULL, NULL },
704 };
705
706
707 #endif /* TEST */