--- /dev/null
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "buf.h"
+#include "notify.h"
+#include "test_suite.h"
+
+inline
+buf_t buf_new(size_t sz) {
+ buf_t buf = malloc(sz + sizeof *buf);
+ if (buf) {
+ buf->buf_sz = sz;
+ buf->buf_start = buf->buf_used = 0;
+ memset(buf->buf, 0, sz);
+ }
+ return buf;
+}
+
+inline
+void buf_rebase(buf_t buf) {
+ if (buf->buf_start == 0)
+ return;
+ memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
+ buf->buf_start = 0;
+}
+
+inline
+int buf_makeroom(buf_t *pbuf, size_t roomfor) {
+ size_t new_sz;
+ void *tmp_ptr;
+
+ assert(pbuf != NULL);
+
+ if (*pbuf == NULL) {
+ *pbuf = buf_new(roomfor);
+ if (*pbuf == NULL) {
+ return -1;
+ }
+ }
+
+ buf_rebase(*pbuf);
+
+ if (BUF_ROOM(*pbuf) >= roomfor)
+ return 0;
+
+ new_sz = (*pbuf)->buf_used + roomfor;
+ tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
+ if (tmp_ptr == NULL) {
+ NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
+ return -1;
+ }
+ *pbuf = tmp_ptr;
+ (*pbuf)->buf_sz = new_sz;
+
+ return 0;
+}
+
+inline
+int buf_range_dup_or_append(buf_t src, size_t src_offset, size_t n, buf_t *pdst) {
+ assert(src != NULL);
+ assert(pdst != NULL);
+ assert(src_offset + n <= src->buf_used);
+
+ if (buf_makeroom(pdst, n)) {
+ return -1;
+ }
+
+ memcpy((*pdst)->buf + (*pdst)->buf_used, src->buf + src->buf_start + src_offset, n);
+ (*pdst)->buf_used += n;
+
+ return 0;
+}
+
+/* Room for improvement:
+ Depending on ratio of flensed portion to buffer use, try to do less
+ copying-around of buffer ranges, by swapping buf pointers instead.
+*/
+ssize_t buf_flense(buf_t *psrc, size_t src_offset, int delimiter, buf_t *pdst) {
+ const size_t delimiter_len = 1;
+ size_t i;
+
+ assert(psrc != NULL);
+ assert(src_offset <= (*psrc)->buf_used);
+
+ NOTIFY_DEBUG("src_offset:%zu", src_offset);
+ D_BUF("src ", *psrc);
+ D_BUF("dst ", pdst ? *pdst : NULL);
+
+ for (i = src_offset; i < (*psrc)->buf_used; i++) {
+ if ((*psrc)->buf[(*psrc)->buf_start + i] == delimiter) {
+
+ if (pdst != NULL) {
+ if (buf_range_dup_or_append((*psrc), 0, i, pdst)) {
+ return -1;
+ }
+ }
+
+ (*psrc)->buf_start += i + delimiter_len;
+ (*psrc)->buf_used -= i + delimiter_len;
+
+ D_BUF("src ", *psrc);
+ D_BUF("dst ", pdst ? *pdst : NULL);
+ return i + delimiter_len;
+ }
+ }
+
+ return 0;
+}
+
+#ifdef TEST
+
+static const char buf_flense_testdata1_[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
+static const char buf_flense_testdata2_[] = "\n\nfoo\nbar\n\n";
+
+struct buf_flense_expected_result_ {
+ ssize_t r;
+ const char *buf;
+} buf_flense_expected1[] = {
+ { 1 + 1, "a" },
+ { 2 + 1, "bc" },
+ { 3 + 1, "def" },
+ { 4 + 1, "ghij" },
+ { 5 + 1, "klmno" },
+ { 3 + 1, "pqr" },
+ { 1 + 1, "s" },
+ { 3 + 1, "tuv" },
+ { 5 + 1, "wxyz0" },
+ { 4 + 1, "1234" },
+ { 3 + 1, "567" },
+ { 2 + 1, "89" },
+ { 0, "0leftovers" },
+}, buf_flense_expected2[] = {
+ { 0 + 1, "" },
+ { 0 + 1, "" },
+ { 3 + 1, "foo" },
+ { 3 + 1, "bar" },
+ { 0 + 1, "" },
+ { 0, "" },
+};
+
+struct test_buf_flense_data_ {
+ const char *src;
+ const struct buf_flense_expected_result_ *expected;
+} test_buf_flense_data1 = {
+ .src = buf_flense_testdata1_,
+ .expected = buf_flense_expected1,
+}, test_buf_flense_data2 = {
+ .src = buf_flense_testdata2_,
+ .expected = buf_flense_expected2,
+};
+
+static
+int test_buf_flense_(void *test_arg, void *suite_arg) {
+ (void)suite_arg;
+ struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
+ const char testdata_len = strlen(test_data->src);
+ const struct buf_flense_expected_result_ *next_result = test_data->expected;
+ int retval = 0;
+ buf_t src;
+
+ TEST_INFO("initializing src buffer with %zu characters", testdata_len);
+ src = buf_new(testdata_len);
+ if (src == NULL) {
+ TEST_ERROR("%s:%s", "new_buf_", "failed");
+ return -1;
+ }
+ memcpy(src->buf, test_data->src, testdata_len);
+ src->buf_used += testdata_len;
+
+ D_BUF("src ", src);
+
+ for (;;) {
+ ssize_t r;
+ buf_t dst = NULL;
+
+ r = buf_flense(&src, 0, '\n', &dst);
+ if (r != next_result->r) {
+ TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
+ retval = -1;
+ }
+ if (r == 0) {
+ TEST_INFO("finished flensing");
+ break;
+ }
+
+ if (strlen(next_result->buf) > dst->buf_used) {
+ TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
+ D_BUF("dst ", dst);
+ retval = -1;
+ } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
+ TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
+ D_BUF("dst ", dst);
+ retval = -1;
+ }
+
+ TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
+
+ memset(dst, 0, dst->buf_sz + sizeof *dst);
+ free(dst);
+ dst = NULL;
+
+ next_result++;
+ }
+ if (strlen(next_result->buf) > src->buf_used) {
+ TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
+ D_BUF("src ", src);
+ retval = -1;
+ } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
+ TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
+ D_BUF("src ", src);
+ retval = -1;
+ }
+
+ TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
+
+ memset(src, 0, src->buf_sz + sizeof *src);
+ free(src);
+ src = NULL;
+
+ return retval;
+}
+
+void *test_suite_data;
+
+int test_suite_pre(void *suite_data) {
+ (void)suite_data;
+ return 0;
+}
+
+int test_suite_post(void *suite_data) {
+ (void)suite_data;
+ return 0;
+}
+
+test_t test_suite[] = {
+ { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
+ { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
+ { NULL, NULL, NULL, NULL, NULL },
+};
+
+#endif /* TEST */
/* reservoir_sample.c
- This generates a randomized subset of its input, by means of reservoir-
+ This collates a randomized subset of its input, by means of reservoir-
sampling, and a Fisher-Yates shuffle.
*/
+/* To do:
+ Allow user-supplied function (in relation to entry count) to define
+ grow-rate option, rather than static count. JIT would be nice there.
+
+ The entire grow-rate thing is not very well thought-through, will bias
+ the results, and likely not very useful as currently implemented.
+*/
+
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <fcntl.h>
-#include <sys/uio.h>
-#include <time.h>
+#include <limits.h>
+#include <signal.h>
#include <sysexits.h>
#include <assert.h>
#include "version.h"
#include "notify.h"
-#include "test_suite.h"
+#include "buf.h"
+#include "randomness.h"
+#include "reservoir.h"
static const char * const src_id_ = "v" VERSION_STR " " VERSION_DATE;
unsigned int verbosity;
size_t read_buf_sz;
size_t reservoir_sz;
+ unsigned long reservoir_grow_per; /* how many samples before adding a new slot to reservoir */
+ unsigned long reservoir_grow_double_per; /* how many samples before doubling reservoir_grow_per */
char delim;
char delim_out;
char *rand_file;
+ int status_fd;
} options_ = {
.verbosity = 0,
.read_buf_sz = 8192,
.reservoir_sz = 1,
+ .reservoir_grow_per = 0,
.delim = '\n',
.delim_out = '\n',
- .rand_file = "/dev/random",
+ .rand_file = NULL,
+ .status_fd = STDERR_FILENO,
};
-static int rand_fd_ = -1;
-
-/* a simple little sliding-window buffer */
-typedef struct buf_ {
- size_t buf_sz;
- size_t buf_start;
- size_t buf_used;
- unsigned char buf[];
-} *buf_t;
-#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
-
-typedef struct reservoir_ {
- size_t reservoir_insertions; /* how many items have ever been added to reservoir */
- size_t reservoir_sz;
- buf_t reservoir[];
-} *reservoir_t;
-
-#ifndef NDEBUG
-#define D_BUF(__pre__,__b__,...) do {\
- if ( (__b__) == NULL )\
- NOTIFY_DEBUG(__pre__ "buf:%p", ## __VA_ARGS__, (__b__));\
- else {\
- NOTIFY_DEBUG(__pre__ "buf:%p sz:%zu start:%zu used:%zu free:%zu '%.*s'",\
- ## __VA_ARGS__,\
- (__b__),\
- (__b__)->buf_sz,\
- (__b__)->buf_start,\
- (__b__)->buf_used,\
- BUF_ROOM((__b__)),\
- (int)(__b__)->buf_used, (__b__)->buf + (__b__)->buf_start);\
- assert( (__b__)->buf_sz >= (__b__)->buf_start + (__b__)->buf_used );\
- }\
-} while (0)
-#define D_RESERVOIR(__r__) do {\
- size_t i;\
- for (i = 0; i < (__r__)->reservoir_sz; i++) {\
- D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
- }\
- NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
-} while (0)
-#else
-#define D_RESERVOIR(...)
-#define D_BUF(...)
-#endif
-
+/* #status_requested_ will be set whenever a view of the current sample
+ reservoir is desired.
+*/
+static int status_requested_ = 0;
static
void usage_(const char *prog, unsigned int full) {
if (full) {
fprintf(f, "\nOptions:\n"
- "\t-n <num> -- returns <num> samples [default: %zu]\n"
- "\t-d <delim> -- use <delim> as input delimiter [default: '\\%03hho']\n"
- "\t-r <file> -- read randomness from <file> [default: '%s']\n"
- "\t-b <bytes> -- read buffer size [default: %zu]\n"
- "\t-v -- increase verbosity\n"
- "\t-h -- this screen\n",
+ "\t-n <num> -- returns <num> samples [default: %zu]\n"
+ "\t-d <delim> -- use <delim> as input and output delimiter [default: '\\%03hho']\n"
+ "\t-r <file> -- read randomness from <file> [default: '%s']\n"
+ "\t-b <bytes> -- set input buffer size [default: %zu]\n"
+ "\t-i <num> -- grow reservoir by 1 for every <num> input samples (0 inhibits behavior) [default: %lu]\n"
+ "\t-h <num> -- double the reservoir growth interval every <num> input samples (0 inhibits behavior) [default: %lu]\n "
+ "\t-s <file> -- USR1 signals will write reservoir contents to <file> rather than stderr (has no effect on normal output to stdout upon input EOF)\n"
+ "\t-v -- increase verbosity\n"
+ "\t-h -- this screen\n",
options_.reservoir_sz,
options_.delim,
- options_.rand_file,
- options_.read_buf_sz);
+ options_.rand_file ? options_.rand_file : "(use system pseudorandom generator)",
+ options_.read_buf_sz,
+ options_.reservoir_grow_double_per,
+ options_.reservoir_grow_per);
fprintf(f, "\n%78s\n", src_id_);
}
fflush(f);
}
-
-/* rand_upto_inclusive_
- Room for improvement: constrain bits of randomness consumed, based on #limit
- also maybe read chunks of randomness at a time
- */
-static
-unsigned long rand_upto_inclusive_(unsigned long limit) {
- unsigned long randomness;
-
- if (limit == 0)
- return 0;
-
- if (rand_fd_ != -1) {
- ssize_t len;
-
- len = read(rand_fd_, &randomness, sizeof randomness);
- if (len == sizeof randomness) {
- randomness %= limit + 1;
-
- return randomness;
- }
- NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
- }
-
- /* fall back to dumb randomness */
- randomness = mrand48();
- randomness %= limit + 1;
-
- return randomness;
-}
-
-
-static
-int rand_init_(char *rand_file) {
- srand48(time(NULL) ^ getpid());
- if (rand_file) {
- rand_fd_ = open(rand_file, O_RDONLY);
- if (rand_fd_ == -1) {
- NOTIFY_ERROR("%s('%s'):%s", "open", rand_file, strerror(errno));
- return -1;
- }
- }
- return 0;
-}
-
-
-static
-buf_t buf_new_(size_t sz) {
- buf_t buf = malloc(sz + sizeof *buf);
- if (buf) {
- buf->buf_sz = sz;
- buf->buf_start = buf->buf_used = 0;
- memset(buf->buf, 0, sz);
- }
- return buf;
-}
-
-
-static
-void buf_rebase_(buf_t buf) {
- if (buf->buf_start == 0)
- return;
- memmove(buf->buf, buf->buf + buf->buf_start, buf->buf_used);
- buf->buf_start = 0;
-}
-
-
-static
-int buf_makeroom_(buf_t *pbuf, size_t roomfor) {
- size_t new_sz;
- void *tmp_ptr;
-
- assert(pbuf != NULL);
-
- if (*pbuf == NULL) {
- *pbuf = buf_new_(roomfor);
- if (*pbuf == NULL) {
- return -1;
- }
- }
-
- buf_rebase_(*pbuf);
-
- if (BUF_ROOM(*pbuf) >= roomfor)
- return 0;
-
- new_sz = (*pbuf)->buf_used + roomfor;
- tmp_ptr = realloc(*pbuf, new_sz + sizeof **pbuf);
- if (tmp_ptr == NULL) {
- NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
- return -1;
- }
- *pbuf = tmp_ptr;
- (*pbuf)->buf_sz = new_sz;
-
- return 0;
-}
-
-
-static
-int buf_range_dup_or_append_(buf_t src, size_t src_skip, size_t n, buf_t *dst) {
- assert(src != NULL);
- assert(dst != NULL);
- assert(src_skip + n <= src->buf_used);
-
- if (buf_makeroom_(dst, n)) {
- return -1;
- }
-
- memcpy((*dst)->buf + (*dst)->buf_used, src->buf + src->buf_start + src_skip, n);
- (*dst)->buf_used += n;
-
- return 0;
-}
-
-
-/* buf_flense_
- Starting after #src_offset characters, scan through #src, stopping at
- the first character matching #delimiter, whereupon all the characters
- leading up to #delimiter are copied into *#dst if #dst is not NULL. #src
- becomes the characters following #delimiter.
- Returns the number of characters flensed from #src.
-
- Room for improvement:
- If flensed segment is more than half the buffer, copy remainder of src
- into dst, then return src, leaving dst in its place.
-*/
-static
-ssize_t buf_flense_(buf_t *src, size_t src_offset, int delimiter, buf_t *dst) {
- const size_t delimiter_len = 1;
- size_t i;
-
- assert(src != NULL);
- assert(src_offset <= (*src)->buf_used);
-
- NOTIFY_DEBUG("src_offset:%zu", src_offset);
- D_BUF("src ", *src);
- D_BUF("dst ", dst ? *dst : NULL);
-
- for (i = src_offset; i < (*src)->buf_used; i++) {
- if ((*src)->buf[(*src)->buf_start + i] == delimiter) {
-
- if (dst != NULL) {
- if (buf_range_dup_or_append_((*src), 0, i, dst)) {
- return -1;
- }
- }
-
- (*src)->buf_start += i + delimiter_len;
- (*src)->buf_used -= i + delimiter_len;
-
- D_BUF("src ", *src);
- D_BUF("dst ", dst ? *dst : NULL);
- return i + delimiter_len;
- }
- }
-
- return 0;
-}
-
-#ifdef TEST
-
-static const char buf_flense_testdata1[] = "a\nbc\ndef\nghij\nklmno\npqr\ns\ntuv\nwxyz0\n1234\n567\n89\n0leftovers";
-static const char buf_flense_testdata2[] = "\n\nfoo\nbar\n\n";
-
-struct buf_flense_expected_result_ {
- ssize_t r;
- const char *buf;
-} buf_flense_expected1[] = {
- { 1 + 1, "a" },
- { 2 + 1, "bc" },
- { 3 + 1, "def" },
- { 4 + 1, "ghij" },
- { 5 + 1, "klmno" },
- { 3 + 1, "pqr" },
- { 1 + 1, "s" },
- { 3 + 1, "tuv" },
- { 5 + 1, "wxyz0" },
- { 4 + 1, "1234" },
- { 3 + 1, "567" },
- { 2 + 1, "89" },
- { 0, "0leftovers" },
-}, buf_flense_expected2[] = {
- { 0 + 1, "" },
- { 0 + 1, "" },
- { 3 + 1, "foo" },
- { 3 + 1, "bar" },
- { 0 + 1, "" },
- { 0, "" },
-};
-
-struct test_buf_flense_data_ {
- const char *src;
- const struct buf_flense_expected_result_ *expected;
-} test_buf_flense_data1 = {
- .src = buf_flense_testdata1,
- .expected = buf_flense_expected1,
-}, test_buf_flense_data2 = {
- .src = buf_flense_testdata2,
- .expected = buf_flense_expected2,
-};
-
-static int test_buf_flense_(void *test_arg, void *suite_arg) {
- (void)suite_arg;
- struct test_buf_flense_data_ *test_data = (struct test_buf_flense_data_ *)test_arg;
- const char testdata_len = strlen(test_data->src);
- const struct buf_flense_expected_result_ *next_result = test_data->expected;
- int retval = 0;
- buf_t src;
-
- TEST_INFO("initializing src buffer with %zu characters", testdata_len);
- src = buf_new_(testdata_len);
- if (src == NULL) {
- TEST_ERROR("%s:%s", "new_buf_", "failed");
- return -1;
- }
- memcpy(src->buf, test_data->src, testdata_len);
- src->buf_used += testdata_len;
-
- D_BUF("src ", src);
-
- for (;;) {
- ssize_t r;
- buf_t dst = NULL;
-
- r = buf_flense_(&src, 0, '\n', &dst);
- if (r != next_result->r) {
- TEST_ERROR("result '%zd' does not match expected result '%zd'", r, next_result->r);
- retval = -1;
- }
- if (r == 0) {
- TEST_INFO("finished flensing");
- break;
- }
-
- if (strlen(next_result->buf) > dst->buf_used) {
- TEST_ERROR("flensed buffer smaller than expected result '%s'", next_result->buf);
- D_BUF("dst ", dst);
- retval = -1;
- } else if (memcmp(next_result->buf, dst->buf + dst->buf_start, strlen(next_result->buf))) {
- TEST_ERROR("flensed buffer does not match expected result '%s'", next_result->buf);
- D_BUF("dst ", dst);
- retval = -1;
- }
-
- TEST_INFO("flensed: '%.*s'", (int)dst->buf_used, dst->buf + dst->buf_start);
-
- memset(dst, 0, dst->buf_sz + sizeof *dst);
- free(dst);
- dst = NULL;
-
- next_result++;
- }
- if (strlen(next_result->buf) > src->buf_used) {
- TEST_ERROR("remaining buffer smaller than expected result '%s'", next_result->buf);
- D_BUF("src ", src);
- retval = -1;
- } else if (memcmp(next_result->buf, src->buf + src->buf_start, strlen(next_result->buf))) {
- TEST_ERROR("remaining buffer does not match expected result '%s'", next_result->buf);
- D_BUF("src ", src);
- retval = -1;
- }
-
- TEST_INFO("remains: '%.*s'", (int)src->buf_used, src->buf + src->buf_start);
-
- memset(src, 0, src->buf_sz + sizeof *src);
- free(src);
- src = NULL;
-
- return retval;
-}
-#endif /* TEST */
-
-
-/* reservoir_remember_
- Remember #line, forgetting a line if more than #num_lines have already
- been remembered. Remembers them in random order.
+/* request_snapshot_
+ Signal handler to be bound to USR1.
+ Upon receiving a signal, take note that a snapshot of the current state
+ has been requested.
*/
static
-void reservoir_remember_(reservoir_t reservoir, buf_t buf) {
- buf_t old_buf;
-
- assert(reservoir != NULL);
-
- D_BUF("reserving ", buf);
-
- if (reservoir->reservoir_sz > 0) {
- unsigned long randomness;
-
- if (reservoir->reservoir_insertions < reservoir->reservoir_sz) {
- /* there are still unused slots, fill them up without discarding anything */
- /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
- randomness = rand_upto_inclusive_(reservoir->reservoir_insertions);
-
- assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */
-
- NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions);
- reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness];
- old_buf = NULL; /* no old entry to discard */
- } else {
- randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
- old_buf = reservoir->reservoir[randomness];
- }
- NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
- reservoir->reservoir[randomness] = buf;
- } else {
- /* can't add anything to a zero-size reservoir, so just dispose of new item */
- old_buf = buf;
- }
-
- reservoir->reservoir_insertions += 1;
-
- if (old_buf != NULL) {
- D_BUF("FREEING ", old_buf);
- memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
- free(old_buf);
- }
-
- D_RESERVOIR(reservoir);
+void request_snapshot_(int signum) {
+ (void)signum;
+ status_requested_ = 1;
}
-
-/* reservoir_read_
+/* accumulate_input_
Read (up to #read_block_sz bytes at a time) from #fd (until EOF) into an
accumulator buffer. For each #delimiter character found in what was just
read, occasionally remember the preceeding characters.
*/
static
-int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) {
+int accumulate_input_(int fd, size_t read_block_sz, int delimiter, unsigned long *psamples, reservoir_t *preservoir) {
buf_t read_buf, new_buf = NULL;
- size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
+ size_t bytes_scanned; /* how much of the buffer has already been searched for delimiter */
ssize_t len;
+ unsigned long grow_count = 0;
+ unsigned long grow_grow_count = 0;
assert(read_block_sz > 0);
- assert(num_lines != NULL);
- assert(reservoir != NULL);
+ assert(psamples != NULL);
+ assert(preservoir != NULL);
+ assert(*preservoir != NULL);
if (fd < 0) {
return -1;
}
- read_buf = buf_new_(read_block_sz);
+ read_buf = buf_new(read_block_sz);
if (read_buf == NULL) {
return -1;
}
- line_scanned = 0;
+ bytes_scanned = 0;
+ /* begin accumulating */
for (;;) {
NOTIFY_DEBUG("read loop\n\n");
- if (buf_makeroom_(&read_buf, read_block_sz)) {
+ /* make sure there's enough room in our input buffer for a full read() */
+ if (buf_makeroom(&read_buf, read_block_sz)) {
free(read_buf);
free(new_buf);
return -1;
}
- NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, line_scanned, BUF_ROOM(read_buf));
- len = read(fd, read_buf->buf + read_buf->buf_start + line_scanned, BUF_ROOM(read_buf));
+ NOTIFY_DEBUG("read(%d, %p + %zu + %zu, %zu)", fd, read_buf->buf, read_buf->buf_start, bytes_scanned, BUF_ROOM(read_buf));
+ do {
+ len = read(fd, read_buf->buf + read_buf->buf_start + read_buf->buf_used, BUF_ROOM(read_buf));
+
+ /* a signal may have interrupted read(), deal with that before
+ doing anything else */
+ if (status_requested_) {
+ status_requested_ = 0;
+ NOTIFY_DEBUG("dumping reservoir due to signal");
+ if (reservoir_write(STDOUT_FILENO, *preservoir, options_.delim_out)) {
+ NOTIFY_ERROR("failed to output current reservoir contents");
+ }
+ if (options_.verbosity) {
+ reservoir_write_meta(options_.status_fd, *preservoir, *psamples, options_.delim_out);
+ }
+ }
+ } while (len == -1 && (errno == EINTR || errno == EAGAIN));
if (len < 0) {
NOTIFY_ERROR("%s:%s", "read", strerror(errno));
free(read_buf);
NOTIFY_DEBUG("len:%zd", len);
+ if (options_.reservoir_grow_per
+ && grow_count >= options_.reservoir_grow_per) {
+ NOTIFY_DEBUG("have seen %lu entries, growing reservoir to %zu", grow_count, (*preservoir)->reservoir_sz + 1);
+ grow_count = 0;
+ if (reservoir_grow(preservoir, 1)) {
+ NOTIFY_ERROR("failed to increase reservoir size");
+ free(read_buf);
+ return -1;
+ }
+ }
+
+ if (options_.reservoir_grow_double_per
+ && grow_grow_count >= options_.reservoir_grow_double_per) {
+ if (grow_count > ULONG_MAX / 2) {
+ /* would overflow, never grow again */
+ grow_count = 0;
+ grow_grow_count = 0;
+ NOTIFY_DEBUG("grow limit reached, rewrite with arbitrary-precision maths to continue");
+ } else {
+ NOTIFY_DEBUG("have seen %lu entries, doubling entries required to grow reservoir to %lu", grow_count * 2);
+ grow_count *= 2;
+ }
+ }
+
/* determine if we want to save the next buffer */
if (new_buf == NULL) {
/* if new_buf is not null, we already want to save the next one.. */
/* otherwise, save if we've read in fewer lines than the reservoir holds */
- /* or else there's a reservoir_sz-in-num_lines chance of saving the next line */
+ /* or else there's a reservoir_sz-in-*psamples chance of saving the next line */
- if (*num_lines < reservoir->reservoir_sz) {
- NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *num_lines, reservoir->reservoir_sz);
+ if (*psamples < (*preservoir)->reservoir_sz) {
+ NOTIFY_DEBUG("still filling reservoir.. %zu filled out of %zu..", *psamples, (*preservoir)->reservoir_sz);
} else {
- NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *num_lines, reservoir->reservoir_sz);
+ NOTIFY_DEBUG("will save if random [0-%zu] is less than %zu...", *psamples, (*preservoir)->reservoir_sz);
}
- if (*num_lines < reservoir->reservoir_sz
- || rand_upto_inclusive_(*num_lines) < reservoir->reservoir_sz) {
- NOTIFY_DEBUG("next buffer will be reserved..");
- new_buf = buf_new_(0);
+ if (*psamples < (*preservoir)->reservoir_sz
+ || randomness_upto_inclusive(*psamples) < (*preservoir)->reservoir_sz) {
+ NOTIFY_DEBUG("next buffer will be remembered..");
+ new_buf = buf_new(0);
if (new_buf == NULL) {
free(read_buf);
return -1;
}
}
- len_flensed = buf_flense_(&read_buf, line_scanned, delimiter, new_buf ? &new_buf : NULL);
+ len_flensed = buf_flense(&read_buf, bytes_scanned, delimiter, new_buf ? &new_buf : NULL);
if (len_flensed < 0) {
free(read_buf);
free(new_buf);
if (len_flensed == 0) {
/* no delimiter found yet, stop parsing and read more */
NOTIFY_DEBUG("no delim found after %zd", len);
- line_scanned = len;
- buf_rebase_(read_buf);
+ bytes_scanned = len;
+ buf_rebase(read_buf);
break;
}
len -= len_flensed;
- line_scanned = 0;
+ bytes_scanned = 0;
D_BUF("read_buf: ", read_buf);
if (new_buf != NULL) {
D_BUF("parsed complete line: ", new_buf);
- reservoir_remember_(reservoir, new_buf);
+ reservoir_remember((*preservoir), new_buf);
new_buf = NULL;
}
- *num_lines += 1;
+ *psamples += 1;
+ grow_count += 1;
+ grow_grow_count += 1;
}
}
D_BUF("leftovers: ", read_buf);
if (new_buf != NULL
- || *num_lines < reservoir->reservoir_sz) {
- reservoir_remember_(reservoir, read_buf);
+ || *psamples < (*preservoir)->reservoir_sz) {
+ reservoir_remember((*preservoir), read_buf);
read_buf = NULL;
}
- *num_lines += 1;
+ *psamples += 1;
}
free(read_buf);
}
-/* reservoir_write_
- Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
-*/
-static
-int reservoir_write_(int fd, reservoir_t reservoir, char delim) {
- ssize_t len;
- size_t i;
- struct iovec iov[2];
-
- iov[1].iov_base = &delim;
- iov[1].iov_len = sizeof delim;
-
- assert(reservoir != NULL);
- D_RESERVOIR(reservoir);
-
- for (i = 0; i < reservoir->reservoir_sz; i++) {
- if (reservoir->reservoir[i]) {
- iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
- iov[0].iov_len = reservoir->reservoir[i]->buf_used;
- } else {
- iov[0].iov_base = NULL;
- iov[0].iov_len = 0;
- }
-
- len = writev(fd, iov, sizeof iov / sizeof *iov);
- if (len < 0) {
- NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
- return -1;
- }
- }
-
- return 0;
-}
-
-#ifndef TEST
-
int main(int argc, char *argv[]) {
+ struct sigaction sa;
+ char *status_filename = NULL;
reservoir_t reservoir;
- size_t num_lines = 0;
- int fd;
+ unsigned long num_samples = 0;
int c;
- while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) {
+ while ( (c = getopt(argc, argv, "hvb:d:D:i:j:s:n:r:")) != EOF ) {
switch (c) {
case 'v':
options_.verbosity++;
break;
case 'b':
- options_.read_buf_sz = atoi(optarg);
+ options_.read_buf_sz = strtoul(optarg, NULL, 0);
+ /* XXX: validate */
break;
case 'd':
options_.delim = *optarg;
+ /* @fallthrough@ */
+ case 'D':
options_.delim_out = *optarg;
break;
+ case 'i':
+ options_.reservoir_grow_per = strtoul(optarg, NULL, 0);
+ break;
+
+ case 'j':
+ options_.reservoir_grow_double_per = strtoul(optarg, NULL, 0);
+ break;
+
+ case 's':
+ status_filename = optarg;
+ break;
+
case 'n':
- options_.reservoir_sz = atoi(optarg);
+ options_.reservoir_sz = strtoul(optarg, NULL, 0);
break;
case 'r':
}
}
+#if 0
+ /* zero-or-more arguments required */
+ /* if that ever changes... */
if (argc - optind < 0) {
usage_(argv[0], 0);
exit(EX_USAGE);
}
+#endif
- if (rand_init_(options_.rand_file)) {
+ if (randomness_init(options_.rand_file)) {
NOTIFY_ERROR("failed to initialize randomness source\n");
exit(EX_NOINPUT);
}
- reservoir = malloc((options_.reservoir_sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
+ reservoir = reservoir_new(options_.reservoir_sz);
if (reservoir == NULL) {
- NOTIFY_ERROR("%s:%s", "calloc", strerror(errno));
+ NOTIFY_ERROR("could not create new reservoir");
+ exit(EX_OSERR);
+ }
+
+ if (status_filename) {
+ options_.status_fd = open(status_filename, O_RDONLY|O_APPEND|O_CREAT);
+ if (options_.status_fd < 0) {
+ NOTIFY_ERROR("could not open status file '%s'", status_filename);
+ exit(EX_OSERR);
+ }
+ }
+
+ sa.sa_handler = request_snapshot_;
+ (void)sigemptyset(&sa.sa_mask);
+ sa.sa_flags = 0;
+ if (sigaction(SIGUSR1, &sa, NULL)) {
+ NOTIFY_ERROR("%s:%s", "sigaction", strerror(errno));
exit(EX_OSERR);
}
- reservoir->reservoir_sz = options_.reservoir_sz;
- reservoir->reservoir_insertions = 0;
- memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
if (argc - optind == 0) {
- fd = STDIN_FILENO;
- if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+ if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
exit(EX_SOFTWARE);
}
} else {
while (optind < argc) {
if (strcmp("-", argv[optind]) == 0) {
- fd = STDIN_FILENO;
- if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+ if (accumulate_input_(STDIN_FILENO, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
exit(EX_SOFTWARE);
}
} else {
- fd = open(argv[optind], O_RDONLY);
+ int fd = open(argv[optind], O_RDONLY);
if (fd < 0) {
NOTIFY_ERROR("%s('%s'):%s", "open", argv[optind], strerror(errno));
exit(EX_NOINPUT);
}
- if (reservoir_read_(fd, options_.read_buf_sz, options_.delim, &num_lines, reservoir)) {
+ if (accumulate_input_(fd, options_.read_buf_sz, options_.delim, &num_samples, &reservoir)) {
exit(EX_SOFTWARE);
}
if (close(fd)) {
}
if (options_.verbosity) {
- fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
+ fprintf(stderr, "%zu sample%s, out of %lu total choices\n", reservoir->reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_samples);
}
if (options_.verbosity > 1) {
- fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions);
+ fprintf(stderr, "%zu selection events\n", reservoir->reservoir_tally);
}
- if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
+ if (reservoir_write(STDOUT_FILENO, reservoir, options_.delim_out)) {
exit(EX_SOFTWARE);
}
exit(EX_OK);
}
-
-#else /* TEST */
-
-void *test_suite_data;
-
-int test_suite_pre(void *suite_data) {
- (void)suite_data;
- if (rand_init_(NULL)) {
- return -1;
- }
- return 0;
-}
-int test_suite_post(void *suite_data) {
- (void)suite_data;
- return 0;
-}
-
-test_t test_suite[] = {
- { "test_buf_flense_ 1", test_buf_flense_, NULL, NULL, &test_buf_flense_data1 },
- { "test_buf_flense_ 2", test_buf_flense_, NULL, NULL, &test_buf_flense_data2 },
- { NULL, NULL, NULL, NULL, NULL },
-};
-
-
-#endif /* TEST */