2 This generates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
19 #include "test_suite.h"
21 static const char * const src_id_
= "v" VERSION_STR
" " VERSION_DATE
;
23 static struct options_
{
24 unsigned int verbosity
;
36 .rand_file
= "/dev/random",
39 static int rand_fd_
= -1;
41 /* a simple little sliding-window buffer */
48 #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
50 typedef struct reservoir_
{
51 size_t reservoir_insertions
; /* how many items have ever been added to reservoir */
57 #define D_BUF(__pre__,__b__,...) do {\
58 if ( (__b__) == NULL )\
59 NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
61 NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
68 (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
69 assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
72 #define D_RESERVOIR(__r__) do {\
74 for (i = 0; i < (__r__)->reservoir_sz; i++) {\
75 D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
77 NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
80 #define D_RESERVOIR(...)
86 void usage_(const char *prog
, unsigned int full
) {
87 FILE *f
= full
? stdout
: stderr
;
88 char *x
= strrchr(prog
, '/');
94 fprintf(f
, "%s -- returns a random sampling of input\n\n",
97 fprintf(f
, "Usage: %s options\n",
101 fprintf(f
, "\nOptions:\n"
102 "\t-n <num> -- returns <num> samples [default: %zu]\n"
103 "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
104 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
105 "\t-b <bytes> -- read buffer size [default: %zu]\n"
106 "\t-v -- increase verbosity\n"
107 "\t-h -- this screen\n",
108 options_
.reservoir_sz
,
111 options_
.read_buf_sz
);
113 fprintf(f
, "\n%78s\n", src_id_
);
119 /* rand_upto_inclusive_
120 Room for improvement: constrain bits of randomness consumed, based on #limit
121 also maybe read chunks of randomness at a time
124 unsigned long rand_upto_inclusive_(unsigned long limit
) {
125 unsigned long randomness
;
130 if (rand_fd_
!= -1) {
133 len
= read(rand_fd_
, &randomness
, sizeof randomness
);
134 if (len
== sizeof randomness
) {
135 randomness
%= limit
+ 1;
139 NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_
, sizeof randomness
, len
, (len
< 0) ? strerror(errno
) : ( (len
== 0) ? "EOF" : "not enough read consecutively") );
142 /* fall back to dumb randomness */
143 randomness
= mrand48();
144 randomness
%= limit
+ 1;
151 int rand_init_(char *rand_file
) {
152 srand48(time(NULL
) ^ getpid());
154 rand_fd_
= open(rand_file
, O_RDONLY
);
155 if (rand_fd_
== -1) {
156 NOTIFY_ERROR("%s('%s'):%s", "open", rand_file
, strerror(errno
));
165 buf_t
buf_new_(size_t sz
) {
166 buf_t buf
= malloc(sz
+ sizeof *buf
);
169 buf
->buf_start
= buf
->buf_used
= 0;
170 memset(buf
->buf
, 0, sz
);
177 void buf_rebase_(buf_t buf
) {
178 if (buf
->buf_start
== 0)
180 memmove(buf
->buf
, buf
->buf
+ buf
->buf_start
, buf
->buf_used
);
186 int buf_makeroom_(buf_t
*pbuf
, size_t roomfor
) {
190 assert(pbuf
!= NULL
);
193 *pbuf
= buf_new_(roomfor
);
201 if (BUF_ROOM(*pbuf
) >= roomfor
)
204 new_sz
= (*pbuf
)->buf_used
+ roomfor
;
205 tmp_ptr
= realloc(*pbuf
, new_sz
+ sizeof **pbuf
);
206 if (tmp_ptr
== NULL
) {
207 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno
));
211 (*pbuf
)->buf_sz
= new_sz
;
218 int buf_range_dup_or_append_(buf_t src
, size_t src_skip
, size_t n
, buf_t
*dst
) {
221 assert(src_skip
+ n
<= src
->buf_used
);
223 if (buf_makeroom_(dst
, n
)) {
227 memcpy((*dst
)->buf
+ (*dst
)->buf_used
, src
->buf
+ src
->buf_start
+ src_skip
, n
);
228 (*dst
)->buf_used
+= n
;
235 Starting after #src_offset characters, scan through #src, stopping at
236 the first character matching #delimiter, whereupon all the characters
237 leading up to #delimiter are copied into *#dst if #dst is not NULL. #src
238 becomes the characters following #delimiter.
239 Returns the number of characters flensed from #src.
241 Room for improvement:
242 If flensed segment is more than half the buffer, copy remainder of src
243 into dst, then return src, leaving dst in its place.
246 ssize_t
buf_flense_(buf_t
*src
, size_t src_offset
, int delimiter
, buf_t
*dst
) {
247 const size_t delimiter_len
= 1;
251 assert(src_offset
<= (*src
)->buf_used
);
253 NOTIFY_DEBUG("src_offset:%zu", src_offset
);
255 D_BUF("dst ", dst
? *dst
: NULL
);
257 for (i
= src_offset
; i
< (*src
)->buf_used
; i
++) {
258 if ((*src
)->buf
[(*src
)->buf_start
+ i
] == delimiter
) {
261 if (buf_range_dup_or_append_((*src
), 0, i
, dst
)) {
266 (*src
)->buf_start
+= i
+ delimiter_len
;
267 (*src
)->buf_used
-= i
+ delimiter_len
;
270 D_BUF("dst ", dst
? *dst
: NULL
);
271 return i
+ delimiter_len
;
280 static const char buf_flense_testdata1
[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
281 static const char buf_flense_testdata2
[] = "\n\nfoo\nbar\n\n";
283 struct buf_flense_expected_result_
{
286 } buf_flense_expected1
[] = {
300 }, buf_flense_expected2
[] = {
309 struct test_buf_flense_data_
{
311 const struct buf_flense_expected_result_
*expected
;
312 } test_buf_flense_data1
= {
313 .src
= buf_flense_testdata1
,
314 .expected
= buf_flense_expected1
,
315 }, test_buf_flense_data2
= {
316 .src
= buf_flense_testdata2
,
317 .expected
= buf_flense_expected2
,
320 static int test_buf_flense_(void *test_arg
, void *suite_arg
) {
322 struct test_buf_flense_data_
*test_data
= (struct test_buf_flense_data_
*)test_arg
;
323 const char testdata_len
= strlen(test_data
->src
);
324 const struct buf_flense_expected_result_
*next_result
= test_data
->expected
;
328 TEST_INFO("initializing src buffer with %zu characters", testdata_len
);
329 src
= buf_new_(testdata_len
);
331 TEST_ERROR("%s:%s", "new_buf_", "failed");
334 memcpy(src
->buf
, test_data
->src
, testdata_len
);
335 src
->buf_used
+= testdata_len
;
343 r
= buf_flense_(&src
, 0, '\n', &dst
);
344 if (r
!= next_result
->r
) {
345 TEST_ERROR("result '%zd' does not match expected result '%zd'", r
, next_result
->r
);
349 TEST_INFO("finished flensing");
353 if (strlen(next_result
->buf
) > dst
->buf_used
) {
354 TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result
->buf
);
357 } else if (memcmp(next_result
->buf
, dst
->buf
+ dst
->buf_start
, strlen(next_result
->buf
))) {
358 TEST_ERROR("flensed buffer does not match expected result '%s'", next_result
->buf
);
363 TEST_INFO("flensed: '%.*s'", (int)dst
->buf_used
, dst
->buf
+ dst
->buf_start
);
365 memset(dst
, 0, dst
->buf_sz
+ sizeof *dst
);
371 if (strlen(next_result
->buf
) > src
->buf_used
) {
372 TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result
->buf
);
375 } else if (memcmp(next_result
->buf
, src
->buf
+ src
->buf_start
, strlen(next_result
->buf
))) {
376 TEST_ERROR("remaining buffer does not match expected result '%s'", next_result
->buf
);
381 TEST_INFO("remains: '%.*s'", (int)src
->buf_used
, src
->buf
+ src
->buf_start
);
383 memset(src
, 0, src
->buf_sz
+ sizeof *src
);
392 /* reservoir_remember_
393 Remember #line, forgetting a line if more than #num_lines have already
394 been remembered. Remembers them in random order.
397 void reservoir_remember_(reservoir_t reservoir
, buf_t buf
) {
400 assert(reservoir
!= NULL
);
402 D_BUF("reserving ", buf
);
404 if (reservoir
->reservoir_sz
> 0) {
405 unsigned long randomness
;
407 if (reservoir
->reservoir_insertions
< reservoir
->reservoir_sz
) {
408 /* there are still unused slots, fill them up without discarding anything */
409 /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
410 randomness
= rand_upto_inclusive_(reservoir
->reservoir_insertions
);
412 assert(reservoir
->reservoir
[reservoir
->reservoir_insertions
] == NULL
); /* yet-unused slots will be null-initialized */
414 NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness
, reservoir
->reservoir_insertions
);
415 reservoir
->reservoir
[reservoir
->reservoir_insertions
] = reservoir
->reservoir
[randomness
];
416 old_buf
= NULL
; /* no old entry to discard */
418 randomness
= rand_upto_inclusive_(reservoir
->reservoir_sz
- 1);
419 old_buf
= reservoir
->reservoir
[randomness
];
421 NOTIFY_DEBUG("replacing reservoir index %zu", randomness
);
422 reservoir
->reservoir
[randomness
] = buf
;
424 /* can't add anything to a zero-size reservoir, so just dispose of new item */
428 reservoir
->reservoir_insertions
+= 1;
430 if (old_buf
!= NULL
) {
431 D_BUF("FREEING ", old_buf
);
432 memset(old_buf
, 0, old_buf
->buf_sz
+ sizeof *old_buf
);
436 D_RESERVOIR(reservoir
);
441 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
442 accumulator buffer. For each #delimiter character found in what was just
443 read, occasionally remember the preceeding characters.
446 int reservoir_read_(int fd
, size_t read_block_sz
, int delimiter
, size_t *num_lines
, reservoir_t reservoir
) {
447 buf_t read_buf
, new_buf
= NULL
;
448 size_t line_scanned
; /* how much of the buffer has already been searched for delimiter */
451 assert(read_block_sz
> 0);
452 assert(num_lines
!= NULL
);
453 assert(reservoir
!= NULL
);
459 read_buf
= buf_new_(read_block_sz
);
460 if (read_buf
== NULL
) {
466 NOTIFY_DEBUG("read loop\n\n");
468 if (buf_makeroom_(&read_buf
, read_block_sz
)) {
474 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd
, read_buf
->buf
, read_buf
->buf_start
, line_scanned
, BUF_ROOM(read_buf
));
475 len
= read(fd
, read_buf
->buf
+ read_buf
->buf_start
+ line_scanned
, BUF_ROOM(read_buf
));
477 NOTIFY_ERROR("%s:%s", "read", strerror(errno
));
485 read_buf
->buf_used
+= len
;
487 NOTIFY_DEBUG("len:%zd", len
);
488 D_BUF("read_buf: ", read_buf
);
493 NOTIFY_DEBUG("len:%zd", len
);
495 /* determine if we want to save the next buffer */
496 if (new_buf
== NULL
) {
497 /* if new_buf is not null, we already want to save the next one.. */
498 /* otherwise, save if we've read in fewer lines than the reservoir holds */
499 /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
501 if (*num_lines
< reservoir
->reservoir_sz
) {
502 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines
, reservoir
->reservoir_sz
);
504 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines
, reservoir
->reservoir_sz
);
507 if (*num_lines
< reservoir
->reservoir_sz
508 || rand_upto_inclusive_(*num_lines
) < reservoir
->reservoir_sz
) {
509 NOTIFY_DEBUG("next buffer will be reserved..");
510 new_buf
= buf_new_(0);
511 if (new_buf
== NULL
) {
516 NOTIFY_DEBUG("not saving next buffer..");
520 len_flensed
= buf_flense_(&read_buf
, line_scanned
, delimiter
, new_buf
? &new_buf
: NULL
);
521 if (len_flensed
< 0) {
526 if (len_flensed
== 0) {
527 /* no delimiter found yet, stop parsing and read more */
528 NOTIFY_DEBUG("no delim found after %zd", len
);
530 buf_rebase_(read_buf
);
537 D_BUF("read_buf: ", read_buf
);
539 if (new_buf
!= NULL
) {
540 D_BUF("parsed complete line: ", new_buf
);
541 reservoir_remember_(reservoir
, new_buf
);
550 NOTIFY_DEBUG("loop done\n\n");
551 if (read_buf
->buf_used
) {
552 D_BUF("leftovers: ", read_buf
);
555 || *num_lines
< reservoir
->reservoir_sz
) {
556 reservoir_remember_(reservoir
, read_buf
);
571 Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
574 int reservoir_write_(int fd
, reservoir_t reservoir
, char delim
) {
579 iov
[1].iov_base
= &delim
;
580 iov
[1].iov_len
= sizeof delim
;
582 assert(reservoir
!= NULL
);
583 D_RESERVOIR(reservoir
);
585 for (i
= 0; i
< reservoir
->reservoir_sz
; i
++) {
586 if (reservoir
->reservoir
[i
]) {
587 iov
[0].iov_base
= reservoir
->reservoir
[i
]->buf
+ reservoir
->reservoir
[i
]->buf_start
;
588 iov
[0].iov_len
= reservoir
->reservoir
[i
]->buf_used
;
590 iov
[0].iov_base
= NULL
;
594 len
= writev(fd
, iov
, sizeof iov
/ sizeof *iov
);
596 NOTIFY_ERROR("%s:%s", "writev", strerror(errno
));
606 int main(int argc
, char *argv
[]) {
607 reservoir_t reservoir
;
608 size_t num_lines
= 0;
612 while ( (c
= getopt(argc
, argv
, "hvb:n:d:r:")) != EOF
) {
615 options_
.verbosity
++;
619 options_
.read_buf_sz
= atoi(optarg
);
623 options_
.delim
= *optarg
;
624 options_
.delim_out
= *optarg
;
628 options_
.reservoir_sz
= atoi(optarg
);
632 options_
.rand_file
= optarg
;
645 if (argc
- optind
< 0) {
650 if (rand_init_(options_
.rand_file
)) {
651 NOTIFY_ERROR("failed to initialize randomness source\n");
655 reservoir
= malloc((options_
.reservoir_sz
* sizeof *reservoir
->reservoir
) + sizeof *reservoir
);
656 if (reservoir
== NULL
) {
657 NOTIFY_ERROR("%s:%s", "calloc", strerror(errno
));
660 reservoir
->reservoir_sz
= options_
.reservoir_sz
;
661 reservoir
->reservoir_insertions
= 0;
662 memset(reservoir
->reservoir
, 0, reservoir
->reservoir_sz
* sizeof *reservoir
->reservoir
);
664 if (argc
- optind
== 0) {
666 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
670 while (optind
< argc
) {
671 if (strcmp("-", argv
[optind
]) == 0) {
673 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
677 fd
= open(argv
[optind
], O_RDONLY
);
679 NOTIFY_ERROR("%s('%s'):%s", "open", argv
[optind
], strerror(errno
));
682 if (reservoir_read_(fd
, options_
.read_buf_sz
, options_
.delim
, &num_lines
, reservoir
)) {
686 NOTIFY_ERROR("%s:%s", "close", strerror(errno
));
694 if (options_
.verbosity
) {
695 fprintf(stderr
, "%zu sample%s, out of %zu total choices\n", options_
.reservoir_sz
, options_
.reservoir_sz
> 1 ? "s" : "", num_lines
);
698 if (options_
.verbosity
> 1) {
699 fprintf(stderr
, "%zu selection events\n", reservoir
->reservoir_insertions
);
702 if (reservoir_write_(STDOUT_FILENO
, reservoir
, options_
.delim_out
)) {
711 void *test_suite_data
;
713 int test_suite_pre(void *suite_data
) {
715 if (rand_init_(NULL
)) {
720 int test_suite_post(void *suite_data
) {
725 test_t test_suite
[] = {
726 { "test_buf_flense_ 1", test_buf_flense_
, NULL
, NULL
, &test_buf_flense_data1
},
727 { "test_buf_flense_ 2", test_buf_flense_
, NULL
, NULL
, &test_buf_flense_data2
},
728 { NULL
, NULL
, NULL
, NULL
, NULL
},