Split code into modules, handle USR1, minor fixes.
[reservoir_sample] / reservoir_sample.c
1 /* reservoir_sample.c
2 This collates a randomized subset of its input, by means of reservoir-
3 sampling, and a Fisher-Yates shuffle.
4 */
5
6 /* To do:
7 Allow user-supplied function (in relation to entry count) to define
8 grow-rate option, rather than static count. JIT would be nice there.
9
10 The entire grow-rate thing is not very well thought-through, will bias
11 the results, and likely not very useful as currently implemented.
12 */
13
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <stdio.h>
19 #include <fcntl.h>
20 #include <limits.h>
21 #include <signal.h>
22 #include <sysexits.h>
23 #include <assert.h>
24
25 #include "version.h"
26 #include "notify.h"
27 #include "buf.h"
28 #include "randomness.h"
29 #include "reservoir.h"
30
31 static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
32
33 static struct options_ {
34 unsigned int verbosity;
35 size_t read_buf_sz;
36 size_t reservoir_sz;
37 unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */
38 unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */
39 char delim;
40 char delim_out;
41 char *rand_file;
42 int status_fd;
43 } options_ = {
44 .verbosity = 0,
45 .read_buf_sz = 8192,
46 .reservoir_sz = 1,
47 .reservoir_grow_per = 0,
48 .delim = '\n',
49 .delim_out = '\n',
50 .rand_file = NULL,
51 .status_fd = STDERR_FILENO,
52 };
53
54 /* #status_requested_ will be set whenever a view of the current sample
55 reservoir is desired.
56 */
57 static int status_requested_ = 0;
58
59 static
60 void usage_(const char *prog, unsigned int full) {
61 FILE *f = full ? stdout : stderr;
62 char *x = strrchr(prog, '/');
63
64 if (x && *(x + 1))
65 prog = x + 1;
66
67 if (full)
68 fprintf(f, "%s -- returns a random sampling of input\n\n",
69 prog);
70
71 fprintf(f, "Usage: %s options\n",
72 prog);
73
74 if (full) {
75 fprintf(f, "\nOptions:\n"
76 "\t-n <num> -- returns <num> samples [default: %zu]\n"
77 "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
78 "\t-r <file> -- read randomness from <file> [default: '%s']\n"
79 "\t-b <bytes> -- set input buffer size [default: %zu]\n"
80 "\t-i <num> -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
81 "\t-h <num> -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
82 "\t-s <file> -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
83 "\t-v -- increase verbosity\n"
84 "\t-h -- this screen\n",
85 options_.reservoir_sz,
86 options_.delim,
87 options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)",
88 options_.read_buf_sz,
89 options_.reservoir_grow_double_per,
90 options_.reservoir_grow_per);
91
92 fprintf(f, "\n%78s\n", src_id_);
93 }
94 fflush(f);
95 }
96
97 /* request_snapshot_
98 Signal handler to be bound to USR1.
99 Upon receiving a signal, take note that a snapshot of the current state
100 has been requested.
101 */
102 static
103 void request_snapshot_(int signum) {
104 (void)signum;
105 status_requested_ = 1;
106 }
107
108 /* accumulate_input_
109 Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
110 accumulator buffer. For each #delimiter character found in what was just
111 read, occasionally remember the preceeding characters.
112 */
113 static
114 int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) {
115 buf_t read_buf, new_buf = NULL;
116 size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */
117 ssize_t len;
118 unsigned long grow_count = 0;
119 unsigned long grow_grow_count = 0;
120
121 assert(read_block_sz > 0);
122 assert(psamples != NULL);
123 assert(preservoir != NULL);
124 assert(*preservoir != NULL);
125
126 if (fd < 0) {
127 return -1;
128 }
129
130 read_buf = buf_new(read_block_sz);
131 if (read_buf == NULL) {
132 return -1;
133 }
134 bytes_scanned = 0;
135
136 /* begin accumulating */
137 for (;;) {
138 NOTIFY_DEBUG("read loop\n\n");
139
140 /* make sure there's enough room in our input buffer for a full read() */
141 if (buf_makeroom(&read_buf, read_block_sz)) {
142 free(read_buf);
143 free(new_buf);
144 return -1;
145 }
146
147 NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf));
148 do {
149 len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf));
150
151 /* a signal may have interrupted read(), deal with that before
152 doing anything else */
153 if (status_requested_) {
154 status_requested_ = 0;
155 NOTIFY_DEBUG("dumping reservoir due to signal");
156 if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) {
157 NOTIFY_ERROR("failed to output current reservoir contents");
158 }
159 if (options_.verbosity) {
160 reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out);
161 }
162 }
163 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
164 if (len < 0) {
165 NOTIFY_ERROR("%s:%s", "read", strerror(errno));
166 free(read_buf);
167 free(new_buf);
168 return -1;
169 }
170 if (len == 0) {
171 break;
172 }
173 read_buf->buf_used += len;
174
175 NOTIFY_DEBUG("len:%zd", len);
176 D_BUF("read_buf: ", read_buf);
177
178 while (len > 0) {
179 ssize_t len_flensed;
180
181 NOTIFY_DEBUG("len:%zd", len);
182
183 if (options_.reservoir_grow_per
184 && grow_count >= options_.reservoir_grow_per) {
185 NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1);
186 grow_count = 0;
187 if (reservoir_grow(preservoir, 1)) {
188 NOTIFY_ERROR("failed to increase reservoir size");
189 free(read_buf);
190 return -1;
191 }
192 }
193
194 if (options_.reservoir_grow_double_per
195 && grow_grow_count >= options_.reservoir_grow_double_per) {
196 if (grow_count > ULONG_MAX / 2) {
197 /* would overflow, never grow again */
198 grow_count = 0;
199 grow_grow_count = 0;
200 NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
201 } else {
202 NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2);
203 grow_count *= 2;
204 }
205 }
206
207 /* determine if we want to save the next buffer */
208 if (new_buf == NULL) {
209 /* if new_buf is not null, we already want to save the next one.. */
210 /* otherwise, save if we've read in fewer lines than the reservoir holds */
211 /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
212
213 if (*psamples < (*preservoir)->reservoir_sz) {
214 NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz);
215 } else {
216 NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz);
217 }
218
219 if (*psamples < (*preservoir)->reservoir_sz
220 || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) {
221 NOTIFY_DEBUG("next buffer will be remembered..");
222 new_buf = buf_new(0);
223 if (new_buf == NULL) {
224 free(read_buf);
225 return -1;
226 }
227 } else {
228 NOTIFY_DEBUG("not saving next buffer..");
229 }
230 }
231
232 len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL);
233 if (len_flensed < 0) {
234 free(read_buf);
235 free(new_buf);
236 return -1;
237 }
238 if (len_flensed == 0) {
239 /* no delimiter found yet, stop parsing and read more */
240 NOTIFY_DEBUG("no delim found after %zd", len);
241 bytes_scanned = len;
242 buf_rebase(read_buf);
243 break;
244 }
245
246 len -= len_flensed;
247 bytes_scanned = 0;
248
249 D_BUF("read_buf: ", read_buf);
250
251 if (new_buf != NULL) {
252 D_BUF("parsed complete line: ", new_buf);
253 reservoir_remember((*preservoir), new_buf);
254 new_buf = NULL;
255 }
256
257 *psamples += 1;
258 grow_count += 1;
259 grow_grow_count += 1;
260
261 }
262 }
263 /* leftovers */
264 NOTIFY_DEBUG("loop done\n\n");
265 if (read_buf->buf_used) {
266 D_BUF("leftovers: ", read_buf);
267
268 if (new_buf != NULL
269 || *psamples < (*preservoir)->reservoir_sz) {
270 reservoir_remember((*preservoir), read_buf);
271 read_buf = NULL;
272 }
273
274 *psamples += 1;
275 }
276
277 free(read_buf);
278 free(new_buf);
279
280 return 0;
281 }
282
283
284 int main(int argc, char *argv[]) {
285 struct sigaction sa;
286 char *status_filename = NULL;
287 reservoir_t reservoir;
288 unsigned long num_samples = 0;
289 int c;
290
291 while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) {
292 switch (c) {
293 case 'v':
294 options_.verbosity++;
295 break;
296
297 case 'b':
298 options_.read_buf_sz = strtoul(optarg, NULL, 0);
299 /* XXX: validate */
300 break;
301
302 case 'd':
303 options_.delim = *optarg;
304 /* @fallthrough@ */
305 case 'D':
306 options_.delim_out = *optarg;
307 break;
308
309 case 'i':
310 options_.reservoir_grow_per = strtoul(optarg, NULL, 0);
311 break;
312
313 case 'j':
314 options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0);
315 break;
316
317 case 's':
318 status_filename = optarg;
319 break;
320
321 case 'n':
322 options_.reservoir_sz = strtoul(optarg, NULL, 0);
323 break;
324
325 case 'r':
326 options_.rand_file = optarg;
327 break;
328
329 case 'h':
330 usage_(argv[0], 1);
331 exit(EX_OK);
332
333 default:
334 usage_(argv[0], 0);
335 exit(EX_USAGE);
336 }
337 }
338
339 #if 0
340 /* zero-or-more arguments required */
341 /* if that ever changes... */
342 if (argc - optind < 0) {
343 usage_(argv[0], 0);
344 exit(EX_USAGE);
345 }
346 #endif
347
348 if (randomness_init(options_.rand_file)) {
349 NOTIFY_ERROR("failed to initialize randomness source\n");
350 exit(EX_NOINPUT);
351 }
352
353 reservoir = reservoir_new(options_.reservoir_sz);
354 if (reservoir == NULL) {
355 NOTIFY_ERROR("could not create new reservoir");
356 exit(EX_OSERR);
357 }
358
359 if (status_filename) {
360 options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT);
361 if (options_.status_fd < 0) {
362 NOTIFY_ERROR("could not open status file '%s'", status_filename);
363 exit(EX_OSERR);
364 }
365 }
366
367 sa.sa_handler = request_snapshot_;
368 (void)sigemptyset(&sa.sa_mask);
369 sa.sa_flags = 0;
370 if (sigaction(SIGUSR1, &sa, NULL)) {
371 NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno));
372 exit(EX_OSERR);
373 }
374
375 if (argc - optind == 0) {
376 if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
377 exit(EX_SOFTWARE);
378 }
379 } else {
380 while (optind < argc) {
381 if (strcmp("-", argv[optind]) == 0) {
382 if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
383 exit(EX_SOFTWARE);
384 }
385 } else {
386 int fd = open(argv[optind], O_RDONLY);
387 if (fd < 0) {
388 NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
389 exit(EX_NOINPUT);
390 }
391 if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
392 exit(EX_SOFTWARE);
393 }
394 if (close(fd)) {
395 NOTIFY_ERROR("%s:%s", "close", strerror(errno));
396 exit(EX_OSERR);
397 }
398 }
399 optind++;
400 }
401 }
402
403 if (options_.verbosity) {
404 fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples);
405 }
406
407 if (options_.verbosity > 1) {
408 fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally);
409 }
410
411 if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) {
412 exit(EX_SOFTWARE);
413 }
414
415 exit(EX_OK);
416 }