2 This generates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
19 #include "test_suite.h"
21 static const char * const src_id_
= "v" VERSION_STR
" " VERSION_DATE
;
23 static struct options_
{
24 unsigned int verbosity
;
36 .rand_file
= "/dev/random",
39 static int rand_fd_
= -1;
47 #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
49 typedef struct reservoir_
{
55 #define D_BUF(__pre__,__b__,...) do {\
56 if ( (__b__) == NULL )\
57 NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
59 NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
66 (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
67 assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
70 #define D_RESERVOIR(__r__) do {\
72 for (i = 0; i < (__r__)->reservoir_sz; i++) {\
73 D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0)
75 #define D_RESERVOIR(...)
81 void usage_(const char *prog
, unsigned int full
) {
82 FILE *f
= full
? stdout
: stderr
;
83 char *x
= strrchr(prog
, '/');
89 fprintf(f
, "%s -- returns a random sampling of input\n\n",
92 fprintf(f
, "Usage: %s options\n",
96 fprintf(f
, "\nOptions:\n"
97 "\t-n <num> -- returns <num> samples [default: %zu]\n"
98 "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
99 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
100 "\t-b <bytes> -- read buffer size [default: %zu]\n"
101 "\t-v -- increase verbosity\n"
102 "\t-h -- this screen\n",
103 options_
.reservoir_sz
,
106 options_
.read_buf_sz
);
108 fprintf(f
, "\n%78s\n", src_id_
);
114 /* rand_upto_inclusive_
115 Room for improvement: constrain bits of randomness consumed, based on #limit
116 also maybe read chunks of randomness at a time
119 unsigned long rand_upto_inclusive_(unsigned long limit
) {
120 unsigned long randomness
;
125 if (rand_fd_
!= -1) {
128 len
= read(rand_fd_
, &randomness
, sizeof randomness
);
129 if (len
== sizeof randomness
) {
130 randomness
%= limit
+ 1;
134 NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_
, sizeof randomness
, len
, (len
< 0) ? strerror(errno
) : ( (len
== 0) ? "EOF" : "not enough read consecutively") );
137 randomness
= mrand48();
138 randomness
%= limit
+ 1;
145 int rand_init_(char *rand_file
) {
146 srand48(time(NULL
) ^ getpid());
148 rand_fd_
= open(rand_file
, O_RDONLY
);
149 if (rand_fd_
== -1) {
150 NOTIFY_ERROR("%s('%s'):%s", "open", rand_file
, strerror(errno
));
159 buf_t
buf_new_(size_t sz
) {
160 buf_t buf
= malloc(sz
+ sizeof *buf
);
163 buf
->buf_start
= buf
->buf_used
= 0;
164 memset(buf
->buf
, 0, sz
);
171 void buf_rebase_(buf_t buf
) {
172 if (buf
->buf_start
== 0)
174 memmove(buf
->buf
, buf
->buf
+ buf
->buf_start
, buf
->buf_used
);
180 int buf_makeroom_(buf_t
*pbuf
, size_t roomfor
) {
184 assert(pbuf
!= NULL
);
187 *pbuf
= buf_new_(roomfor
);
195 if (BUF_ROOM(*pbuf
) >= roomfor
)
198 new_sz
= (*pbuf
)->buf_used
+ roomfor
;
199 tmp_ptr
= realloc(*pbuf
, new_sz
+ sizeof **pbuf
);
200 if (tmp_ptr
== NULL
) {
201 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno
));
205 (*pbuf
)->buf_sz
= new_sz
;
212 int buf_range_dup_or_append_(buf_t src
, size_t src_skip
, size_t n
, buf_t
*dst
) {
215 assert(src_skip
+ n
<= src
->buf_used
);
217 if (buf_makeroom_(dst
, n
)) {
221 memcpy((*dst
)->buf
+ (*dst
)->buf_used
, src
->buf
+ src
->buf_start
+ src_skip
, n
);
222 (*dst
)->buf_used
+= n
;
229 Starting after #src_offset characters, scan through #src, stopping at
230 the first character matching #delimiter, whereupon all the characters
231 leading up to #delimiter are copied into *#dst if #dst is not NULL. #src
232 becomes the characters following #delimiter.
233 Returns the number of characters flensed from #src.
235 Room for improvement:
236 If flensed segment is more than half the buffer, copy remainder of src
237 into dst, then return src, leaving dst in its place.
240 ssize_t
buf_flense_(buf_t
*src
, size_t src_offset
, int delimiter
, buf_t
*dst
) {
241 const size_t delimiter_len
= 1;
245 assert(src_offset
<= (*src
)->buf_used
);
247 NOTIFY_DEBUG("src_offset:%zu", src_offset
);
249 D_BUF("dst ", dst
? *dst
: NULL
);
251 for (i
= src_offset
; i
< (*src
)->buf_used
; i
++) {
252 if ((*src
)->buf
[(*src
)->buf_start
+ i
] == delimiter
) {
255 if (buf_range_dup_or_append_((*src
), 0, i
, dst
)) {
260 (*src
)->buf_start
+= i
+ delimiter_len
;
261 (*src
)->buf_used
-= i
+ delimiter_len
;
264 D_BUF("dst ", dst
? *dst
: NULL
);
265 return i
+ delimiter_len
;
274 static const char buf_flense_testdata1
[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
275 static const char buf_flense_testdata2
[] = "\n\nfoo\nbar\n\n";
277 struct buf_flense_expected_result_
{
280 } buf_flense_expected1
[] = {
294 }, buf_flense_expected2
[] = {
303 struct test_buf_flense_data_
{
305 const struct buf_flense_expected_result_
*expected
;
306 } test_buf_flense_data1
= {
307 .src
= buf_flense_testdata1
,
308 .expected
= buf_flense_expected1
,
309 }, test_buf_flense_data2
= {
310 .src
= buf_flense_testdata2
,
311 .expected
= buf_flense_expected2
,
314 static int test_buf_flense_(void *test_arg
, void *suite_arg
) {
316 struct test_buf_flense_data_
*test_data
= (struct test_buf_flense_data_
*)test_arg
;
317 const char testdata_len
= strlen(test_data
->src
);
318 const struct buf_flense_expected_result_
*next_result
= test_data
->expected
;
322 TEST_INFO("initializing src buffer with %zu characters", testdata_len
);
323 src
= buf_new_(testdata_len
);
325 TEST_ERROR("%s:%s", "new_buf_", "failed");
328 memcpy(src
->buf
, test_data
->src
, testdata_len
);
329 src
->buf_used
+= testdata_len
;
337 r
= buf_flense_(&src
, 0, '\n', &dst
);
338 if (r
!= next_result
->r
) {
339 TEST_ERROR("result '%zd' does not match expected result '%zd'", r
, next_result
->r
);
343 TEST_INFO("finished flensing");
347 if (strlen(next_result
->buf
) > dst
->buf_used
) {
348 TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result
->buf
);
351 } else if (memcmp(next_result
->buf
, dst
->buf
+ dst
->buf_start
, strlen(next_result
->buf
))) {
352 TEST_ERROR("flensed buffer does not match expected result '%s'", next_result
->buf
);
357 TEST_INFO("flensed: '%.*s'", (int)dst
->buf_used
, dst
->buf
+ dst
->buf_start
);
359 memset(dst
, 0, dst
->buf_sz
+ sizeof *dst
);
365 if (strlen(next_result
->buf
) > src
->buf_used
) {
366 TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result
->buf
);
369 } else if (memcmp(next_result
->buf
, src
->buf
+ src
->buf_start
, strlen(next_result
->buf
))) {
370 TEST_ERROR("remaining buffer does not match expected result '%s'", next_result
->buf
);
375 TEST_INFO("remains: '%.*s'", (int)src
->buf_used
, src
->buf
+ src
->buf_start
);
377 memset(src
, 0, src
->buf_sz
+ sizeof *src
);
386 /* reservoir_remember_
387 Remember #line, forgetting a line if more than #num_lines have already
388 been remembered. Remembers them in random order.
391 void reservoir_remember_(struct reservoir_
*reservoir
, size_t num_lines
, buf_t buf
) {
392 unsigned long randomness
;
395 assert(reservoir
!= NULL
);
397 D_BUF("reserving ", buf
);
399 if (reservoir
->reservoir_sz
> 0) {
400 if (num_lines
< reservoir
->reservoir_sz
) {
401 randomness
= rand_upto_inclusive_(num_lines
);
402 NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness
, num_lines
);
403 reservoir
->reservoir
[num_lines
] = reservoir
->reservoir
[randomness
];
406 randomness
= rand_upto_inclusive_(reservoir
->reservoir_sz
- 1);
407 old_buf
= reservoir
->reservoir
[randomness
];
409 NOTIFY_DEBUG("replacing reservoir index %zu", randomness
);
410 reservoir
->reservoir
[randomness
] = buf
;
415 if (old_buf
!= NULL
) {
416 D_BUF("FREEING ", old_buf
);
417 memset(old_buf
, 0, old_buf
->buf_sz
+ sizeof *old_buf
);
421 D_RESERVOIR(reservoir
);
426 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
427 accumulator buffer. For each #delimiter character found in what was just
428 read, occasionally remember the preceeding characters.
431 int reservoir_read_(int fd
, size_t read_block_sz
, int delimiter
, size_t *num_lines
, struct reservoir_
*reservoir
) {
432 buf_t read_buf
, new_buf
= NULL
;
433 size_t line_scanned
; /* how much of the buffer has already been searched for delimiter */
436 assert(read_block_sz
> 0);
437 assert(num_lines
!= NULL
);
438 assert(reservoir
!= NULL
);
444 read_buf
= buf_new_(read_block_sz
);
445 if (read_buf
== NULL
) {
451 NOTIFY_DEBUG("read loop\n\n");
453 if (buf_makeroom_(&read_buf
, read_block_sz
)) {
459 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd
, read_buf
->buf
, read_buf
->buf_start
, line_scanned
, BUF_ROOM(read_buf
));
460 len
= read(fd
, read_buf
->buf
+ read_buf
->buf_start
+ line_scanned
, BUF_ROOM(read_buf
));
462 NOTIFY_ERROR("%s:%s", "read", strerror(errno
));
470 read_buf
->buf_used
+= len
;
472 NOTIFY_DEBUG("len:%zd", len
);
473 D_BUF("read_buf: ", read_buf
);
478 NOTIFY_DEBUG("len:%zd", len
);
480 /* determine if we want to save the next buffer */
481 if (new_buf
== NULL
) {
482 /* if new_buf is not null, we already want to save the next one.. */
483 /* otherwise, save if we've read in fewer lines than the reservoir holds */
484 /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
486 if (*num_lines
< reservoir
->reservoir_sz
) {
487 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines
, reservoir
->reservoir_sz
);
489 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines
, reservoir
->reservoir_sz
);
492 if (*num_lines
< reservoir
->reservoir_sz
493 || rand_upto_inclusive_(*num_lines
) < reservoir
->reservoir_sz
) {
494 NOTIFY_DEBUG("next buffer will be reserved..");
495 new_buf
= buf_new_(0);
496 if (new_buf
== NULL
) {
501 NOTIFY_DEBUG("not saving next buffer..");
505 len_flensed
= buf_flense_(&read_buf
, line_scanned
, delimiter
, new_buf
? &new_buf
: NULL
);
506 if (len_flensed
< 0) {
511 if (len_flensed
== 0) {
512 /* no delimiter found yet, stop parsing and read more */
513 NOTIFY_DEBUG("no delim found after %zd", len
);
515 buf_rebase_(read_buf
);
522 D_BUF("read_buf: ", read_buf
);
524 if (new_buf
!= NULL
) {
525 D_BUF("parsed complete line: ", new_buf
);
526 reservoir_remember_(reservoir
, *num_lines
, new_buf
);
534 NOTIFY_DEBUG("loop done\n\n");
535 if (read_buf
->buf_used
) {
536 D_BUF("leftovers: ", read_buf
);
539 || *num_lines
< reservoir
->reservoir_sz
) {
540 reservoir_remember_(reservoir
, *num_lines
, read_buf
);
555 Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
558 int reservoir_write_(int fd
, struct reservoir_
*reservoir
, char delim
) {
563 iov
[1].iov_base
= &delim
;
564 iov
[1].iov_len
= sizeof delim
;
566 assert(reservoir
!= NULL
);
567 D_RESERVOIR(reservoir
);
569 for (i
= 0; i
< reservoir
->reservoir_sz
; i
++) {
570 if (reservoir
->reservoir
[i
]) {
571 iov
[0].iov_base
= reservoir
->reservoir
[i
]->buf
+ reservoir
->reservoir
[i
]->buf_start
;
572 iov
[0].iov_len
= reservoir
->reservoir
[i
]->buf_used
;
574 iov
[0].iov_base
= NULL
;
578 len
= writev(fd
, iov
, sizeof iov
/ sizeof *iov
);
580 NOTIFY_ERROR("%s:%s", "writev", strerror(errno
));
590 int main(int argc
, char *argv
[]) {
591 struct reservoir_
*reservoir
;
592 size_t num_lines
= 0;
596 while ( (c
= getopt(argc
, argv
, "hvb:n:d:")) != EOF
) {
599 options_
.verbosity
++;
603 options_
.read_buf_sz
= atoi(optarg
);
607 options_
.delim
= *optarg
;
608 options_
.delim_out
= *optarg
;
612 options_
.reservoir_sz
= atoi(optarg
);
625 if (argc
- optind
< 0) {
630 if (rand_init_(options_
.rand_file
)) {
631 NOTIFY_ERROR("failed to initialize randomness source\n");
635 reservoir
= malloc((options_
.reservoir_sz
* sizeof *reservoir
->reservoir
) + sizeof *reservoir
);
636 if (reservoir
== NULL
) {
637 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
640 reservoir
->reservoir_sz
= options_
.reservoir_sz
;
641 memset(reservoir
->reservoir
, 0, reservoir
->reservoir_sz
* sizeof *reservoir
->reservoir
);
643 if (argc
- optind
== 0) {
645 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
649 while (optind
< argc
) {
650 if (strcmp("-", argv
[optind
]) == 0) {
652 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
656 fd
= open(argv
[optind
], O_RDONLY
);
658 NOTIFY_ERROR("%s('%s'):%s", "open", argv
[optind
], strerror(errno
));
661 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
665 NOTIFY_ERROR("%s:%s", "close", strerror(errno
));
673 if (options_
.verbosity
) {
674 fprintf(stderr
, "%zu sample%s, out of %zu total choices\n", options_
.reservoir_sz
, options_
.reservoir_sz
> 1 ? "s" : "", num_lines
);
677 if (reservoir_write_(STDOUT_FILENO
, reservoir
, options_
.delim_out
)) {
686 void *test_suite_data
;
688 int test_suite_pre(void *suite_data
) {
690 if (rand_init_(NULL
)) {
695 int test_suite_post(void *suite_data
) {
700 test_t test_suite
[] = {
701 { "test_buf_flense_ 1", test_buf_flense_
, NULL
, NULL
, &test_buf_flense_data1
},
702 { "test_buf_flense_ 2", test_buf_flense_
, NULL
, NULL
, &test_buf_flense_data2
},
703 { NULL
, NULL
, NULL
, NULL
, NULL
},