From f4a2568d3c77fb8d0f9ac33118f6be8bc1f616a6 Mon Sep 17 00:00:00 2001 From: Justin Wind Date: Mon, 18 Feb 2013 13:25:43 -0800 Subject: [PATCH] Fixed bias issue, some general cleanups. Moved num_lines increment out of the conditional block it had snuck erroneously into, which had been causing a bias of the reservoir results towards later entries. --- .gitignore | 1 + Makefile | 3 ++- reservoir_sample.c | 55 +++++++++++++++++++++++++++++++++------------- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index c3a0398..2a37d97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +reservoir_sample *.o .depend ._* diff --git a/Makefile b/Makefile index 2762a29..8f8f4b7 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,8 @@ CC = clang endif CFLAGS += -g -Wall -Wextra -#CPPFLAGS += -DNDEBUG +CFLAGS += -O3 +CPPFLAGS += -DNDEBUG LDFLAGS += MAKEDEPEND = $(CC) -MM diff --git a/reservoir_sample.c b/reservoir_sample.c index 492dd64..167b5c9 100644 --- a/reservoir_sample.c +++ b/reservoir_sample.c @@ -38,6 +38,7 @@ static struct options_ { static int rand_fd_ = -1; +/* a simple little sliding-window buffer */ typedef struct buf_ { size_t buf_sz; size_t buf_start; @@ -47,6 +48,7 @@ typedef struct buf_ { #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) ) typedef struct reservoir_ { + size_t reservoir_insertions; /* how many items have ever been added to reservoir */ size_t reservoir_sz; buf_t reservoir[]; } *reservoir_t; @@ -70,7 +72,10 @@ typedef struct reservoir_ { #define D_RESERVOIR(__r__) do {\ size_t i;\ for (i = 0; i < (__r__)->reservoir_sz; i++) {\ - D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0) + D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\ + }\ + NOTIFY_DEBUG(" insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\ +} while (0) #else #define D_RESERVOIR(...) #define D_BUF(...) @@ -134,6 +139,7 @@ unsigned long rand_upto_inclusive_(unsigned long limit) { NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") ); } + /* fall back to dumb randomness */ randomness = mrand48(); randomness %= limit + 1; @@ -388,8 +394,7 @@ static int test_buf_flense_(void *test_arg, void *suite_arg) { been remembered. Remembers them in random order. */ static -void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) { - unsigned long randomness; +void reservoir_remember_(reservoir_t reservoir, buf_t buf) { buf_t old_buf; assert(reservoir != NULL); @@ -397,11 +402,18 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b D_BUF("reserving ", buf); if (reservoir->reservoir_sz > 0) { - if (num_lines < reservoir->reservoir_sz) { - randomness = rand_upto_inclusive_(num_lines); - NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines); - reservoir->reservoir[num_lines] = reservoir->reservoir[randomness]; - old_buf = NULL; + unsigned long randomness; + + if (reservoir->reservoir_insertions < reservoir->reservoir_sz) { + /* there are still unused slots, fill them up without discarding anything */ + /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ + randomness = rand_upto_inclusive_(reservoir->reservoir_insertions); + + assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */ + + NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions); + reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness]; + old_buf = NULL; /* no old entry to discard */ } else { randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1); old_buf = reservoir->reservoir[randomness]; @@ -409,9 +421,12 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b NOTIFY_DEBUG("replacing reservoir index %zu", randomness); reservoir->reservoir[randomness] = buf; } else { + /* can't add anything to a zero-size reservoir, so just dispose of new item */ old_buf = buf; } + reservoir->reservoir_insertions += 1; + if (old_buf != NULL) { D_BUF("FREEING ", old_buf); memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); @@ -428,7 +443,7 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b read, occasionally remember the preceeding characters. */ static -int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) { +int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) { buf_t read_buf, new_buf = NULL; size_t line_scanned; /* how much of the buffer has already been searched for delimiter */ ssize_t len; @@ -523,11 +538,12 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin if (new_buf != NULL) { D_BUF("parsed complete line: ", new_buf); - reservoir_remember_(reservoir, *num_lines, new_buf); + reservoir_remember_(reservoir, new_buf); new_buf = NULL; - *num_lines += 1; } + *num_lines += 1; + } } /* leftovers */ @@ -537,7 +553,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin if (new_buf != NULL || *num_lines < reservoir->reservoir_sz) { - reservoir_remember_(reservoir, *num_lines, read_buf); + reservoir_remember_(reservoir, read_buf); read_buf = NULL; } @@ -555,7 +571,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots. */ static -int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) { +int reservoir_write_(int fd, reservoir_t reservoir, char delim) { ssize_t len; size_t i; struct iovec iov[2]; @@ -588,12 +604,12 @@ int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) { #ifndef TEST int main(int argc, char *argv[]) { - struct reservoir_ *reservoir; + reservoir_t reservoir; size_t num_lines = 0; int fd; int c; - while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) { + while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) { switch (c) { case 'v': options_.verbosity++; @@ -612,6 +628,10 @@ int main(int argc, char *argv[]) { options_.reservoir_sz = atoi(optarg); break; + case 'r': + options_.rand_file = optarg; + break; + case 'h': usage_(argv[0], 1); exit(EX_OK); @@ -638,6 +658,7 @@ int main(int argc, char *argv[]) { exit(EX_OSERR); } reservoir->reservoir_sz = options_.reservoir_sz; + reservoir->reservoir_insertions = 0; memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); if (argc - optind == 0) { @@ -674,6 +695,10 @@ int main(int argc, char *argv[]) { fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines); } + if (options_.verbosity > 1) { + fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions); + } + if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) { exit(EX_SOFTWARE); } -- 2.43.2