static int rand_fd_ = -1;
+/* a simple little sliding-window buffer */
typedef struct buf_ {
size_t buf_sz;
size_t buf_start;
#define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
typedef struct reservoir_ {
+ size_t reservoir_insertions; /* how many items have ever been added to reservoir */
size_t reservoir_sz;
buf_t reservoir[];
} *reservoir_t;
#define D_RESERVOIR(__r__) do {\
size_t i;\
for (i = 0; i < (__r__)->reservoir_sz; i++) {\
- D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0)
+ D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
+ }\
+ NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
+} while (0)
#else
#define D_RESERVOIR(...)
#define D_BUF(...)
NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
}
+ /* fall back to dumb randomness */
randomness = mrand48();
randomness %= limit + 1;
been remembered. Remembers them in random order.
*/
static
-void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) {
- unsigned long randomness;
+void reservoir_remember_(reservoir_t reservoir, buf_t buf) {
buf_t old_buf;
assert(reservoir != NULL);
D_BUF("reserving ", buf);
if (reservoir->reservoir_sz > 0) {
- if (num_lines < reservoir->reservoir_sz) {
- randomness = rand_upto_inclusive_(num_lines);
- NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines);
- reservoir->reservoir[num_lines] = reservoir->reservoir[randomness];
- old_buf = NULL;
+ unsigned long randomness;
+
+ if (reservoir->reservoir_insertions < reservoir->reservoir_sz) {
+ /* there are still unused slots, fill them up without discarding anything */
+ /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
+ randomness = rand_upto_inclusive_(reservoir->reservoir_insertions);
+
+ assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */
+
+ NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions);
+ reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness];
+ old_buf = NULL; /* no old entry to discard */
} else {
randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
old_buf = reservoir->reservoir[randomness];
NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
reservoir->reservoir[randomness] = buf;
} else {
+ /* can't add anything to a zero-size reservoir, so just dispose of new item */
old_buf = buf;
}
+ reservoir->reservoir_insertions += 1;
+
if (old_buf != NULL) {
D_BUF("FREEING ", old_buf);
memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
read, occasionally remember the preceeding characters.
*/
static
-int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) {
+int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) {
buf_t read_buf, new_buf = NULL;
size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
ssize_t len;
if (new_buf != NULL) {
D_BUF("parsed complete line: ", new_buf);
- reservoir_remember_(reservoir, *num_lines, new_buf);
+ reservoir_remember_(reservoir, new_buf);
new_buf = NULL;
- *num_lines += 1;
}
+ *num_lines += 1;
+
}
}
/* leftovers */
if (new_buf != NULL
|| *num_lines < reservoir->reservoir_sz) {
- reservoir_remember_(reservoir, *num_lines, read_buf);
+ reservoir_remember_(reservoir, read_buf);
read_buf = NULL;
}
Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
*/
static
-int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) {
+int reservoir_write_(int fd, reservoir_t reservoir, char delim) {
ssize_t len;
size_t i;
struct iovec iov[2];
#ifndef TEST
int main(int argc, char *argv[]) {
- struct reservoir_ *reservoir;
+ reservoir_t reservoir;
size_t num_lines = 0;
int fd;
int c;
- while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) {
+ while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) {
switch (c) {
case 'v':
options_.verbosity++;
options_.reservoir_sz = atoi(optarg);
break;
+ case 'r':
+ options_.rand_file = optarg;
+ break;
+
case 'h':
usage_(argv[0], 1);
exit(EX_OK);
exit(EX_OSERR);
}
reservoir->reservoir_sz = options_.reservoir_sz;
+ reservoir->reservoir_insertions = 0;
memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
if (argc - optind == 0) {
fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
}
+ if (options_.verbosity > 1) {
+ fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions);
+ }
+
if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
exit(EX_SOFTWARE);
}