Fixed bias issue, some general cleanups.
authorJustin Wind <justin.wind@gmail.com>
Mon, 18 Feb 2013 21:25:43 +0000 (13:25 -0800)
committerJustin Wind <justin.wind@gmail.com>
Tue, 19 Feb 2013 06:24:17 +0000 (22:24 -0800)
Moved num_lines increment out of the conditional block it had snuck erroneously into, which had been causing a bias of the reservoir results towards later entries.

.gitignore
Makefile
reservoir_sample.c

index c3a0398b18bd0083b7be088587a3fdd6d10b0743..2a37d976dd5bd0fb83e3dc3f53b97767618c64a0 100644 (file)
@@ -1,3 +1,4 @@
+reservoir_sample
 *.o
 .depend
 ._*
index 2762a295d5cbbc2d71624205d182f0ce39134b17..8f8f4b7daf8ae73eb0f7055a7145336894334491 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,8 @@ CC = clang
 endif
 
 CFLAGS += -g -Wall -Wextra
-#CPPFLAGS += -DNDEBUG
+CFLAGS += -O3
+CPPFLAGS += -DNDEBUG
 LDFLAGS +=
 
 MAKEDEPEND = $(CC) -MM
index 492dd64975a24307c83ffd0530310594e8059490..167b5c9531aab1960d0d7fdc35a7daecc3bc52aa 100644 (file)
@@ -38,6 +38,7 @@ static struct options_ {
 
 static int rand_fd_ = -1;
 
+/* a simple little sliding-window buffer */
 typedef struct buf_ {
        size_t buf_sz;
        size_t buf_start;
@@ -47,6 +48,7 @@ typedef struct buf_ {
 #define BUF_ROOM(__b__) ( (__b__)->buf_sz - ( (__b__)->buf_start + (__b__)->buf_used ) )
 
 typedef struct reservoir_ {
+       size_t reservoir_insertions; /* how many items have ever been added to reservoir */
        size_t reservoir_sz;
        buf_t reservoir[];
 } *reservoir_t;
@@ -70,7 +72,10 @@ typedef struct reservoir_ {
 #define D_RESERVOIR(__r__) do {\
        size_t i;\
        for (i = 0; i < (__r__)->reservoir_sz; i++) {\
-               D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i); } } while (0)
+               D_BUF("reservoir[%zu] ", (__r__)->reservoir[i], i);\
+       }\
+       NOTIFY_DEBUG("  insertions:%zu sz:%zu", (__r__)->reservoir_insertions, (__r__)->reservoir_sz);\
+} while (0)
 #else
 #define D_RESERVOIR(...)
 #define D_BUF(...)
@@ -134,6 +139,7 @@ unsigned long rand_upto_inclusive_(unsigned long limit) {
                NOTIFY_ERROR("%s(%d, %zu):%zd:%s", "read", rand_fd_, sizeof randomness, len, (len < 0) ? strerror(errno) : ( (len == 0) ? "EOF" : "not enough read consecutively") );
        }
 
+       /* fall back to dumb randomness */
        randomness = mrand48();
        randomness %= limit + 1;
 
@@ -388,8 +394,7 @@ static int test_buf_flense_(void *test_arg, void *suite_arg) {
     been remembered.  Remembers them in random order.
 */
 static
-void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t buf) {
-       unsigned long randomness;
+void reservoir_remember_(reservoir_t reservoir, buf_t buf) {
        buf_t old_buf;
 
        assert(reservoir != NULL);
@@ -397,11 +402,18 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b
        D_BUF("reserving ", buf);
 
        if (reservoir->reservoir_sz > 0) {
-               if (num_lines < reservoir->reservoir_sz) {
-                       randomness = rand_upto_inclusive_(num_lines);
-                       NOTIFY_DEBUG("moving index %zu to end (%zu)", randomness, num_lines);
-                       reservoir->reservoir[num_lines] = reservoir->reservoir[randomness];
-                       old_buf = NULL;
+               unsigned long randomness;
+
+               if (reservoir->reservoir_insertions < reservoir->reservoir_sz) {
+                       /* there are still unused slots, fill them up without discarding anything */
+                       /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
+                       randomness = rand_upto_inclusive_(reservoir->reservoir_insertions);
+
+                       assert(reservoir->reservoir[reservoir->reservoir_insertions] == NULL); /* yet-unused slots will be null-initialized */
+
+                       NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_insertions);
+                       reservoir->reservoir[reservoir->reservoir_insertions] = reservoir->reservoir[randomness];
+                       old_buf = NULL; /* no old entry to discard */
                } else {
                        randomness = rand_upto_inclusive_(reservoir->reservoir_sz - 1);
                        old_buf = reservoir->reservoir[randomness];
@@ -409,9 +421,12 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b
                NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
                reservoir->reservoir[randomness] = buf;
        } else {
+               /* can't add anything to a zero-size reservoir, so just dispose of new item */
                old_buf = buf;
        }
 
+       reservoir->reservoir_insertions += 1;
+
        if (old_buf != NULL) {
                D_BUF("FREEING ", old_buf);
                memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
@@ -428,7 +443,7 @@ void reservoir_remember_(struct reservoir_ *reservoir, size_t num_lines, buf_t b
     read, occasionally remember the preceeding characters.
 */
 static
-int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, struct reservoir_ *reservoir) {
+int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lines, reservoir_t reservoir) {
        buf_t read_buf, new_buf = NULL;
        size_t line_scanned; /* how much of the buffer has already been searched for delimiter */
        ssize_t len;
@@ -523,11 +538,12 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
 
                        if (new_buf != NULL) {
                                D_BUF("parsed complete line: ", new_buf);
-                               reservoir_remember_(reservoir, *num_lines, new_buf);
+                               reservoir_remember_(reservoir, new_buf);
                                new_buf = NULL;
-                               *num_lines += 1;
                        }
 
+                       *num_lines += 1;
+
                }
        }
        /* leftovers */
@@ -537,7 +553,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
 
                if (new_buf != NULL
                ||  *num_lines < reservoir->reservoir_sz) {
-                       reservoir_remember_(reservoir, *num_lines, read_buf);
+                       reservoir_remember_(reservoir, read_buf);
                        read_buf = NULL;
                }
 
@@ -555,7 +571,7 @@ int reservoir_read_(int fd, size_t read_block_sz, int delimiter, size_t *num_lin
     Room for improvement: unroll reservoir into (up to IOV_MAX) more iov slots.
 */
 static
-int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) {
+int reservoir_write_(int fd, reservoir_t reservoir, char delim) {
        ssize_t len;
        size_t i;
        struct iovec iov[2];
@@ -588,12 +604,12 @@ int reservoir_write_(int fd, struct reservoir_ *reservoir, char delim) {
 #ifndef TEST
 
 int main(int argc, char *argv[]) {
-       struct reservoir_ *reservoir;
+       reservoir_t reservoir;
        size_t num_lines = 0;
        int fd;
        int c;
 
-       while ( (c = getopt(argc, argv, "hvb:n:d:")) != EOF ) {
+       while ( (c = getopt(argc, argv, "hvb:n:d:r:")) != EOF ) {
                switch (c) {
                        case 'v':
                                options_.verbosity++;
@@ -612,6 +628,10 @@ int main(int argc, char *argv[]) {
                                options_.reservoir_sz = atoi(optarg);
                                break;
 
+                       case 'r':
+                               options_.rand_file = optarg;
+                               break;
+
                        case 'h':
                                usage_(argv[0], 1);
                                exit(EX_OK);
@@ -638,6 +658,7 @@ int main(int argc, char *argv[]) {
                exit(EX_OSERR);
        }
        reservoir->reservoir_sz = options_.reservoir_sz;
+       reservoir->reservoir_insertions = 0;
        memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
 
        if (argc - optind == 0) {
@@ -674,6 +695,10 @@ int main(int argc, char *argv[]) {
                fprintf(stderr, "%zu sample%s, out of %zu total choices\n", options_.reservoir_sz, options_.reservoir_sz > 1 ? "s" : "", num_lines);
        }
 
+       if (options_.verbosity > 1) {
+               fprintf(stderr, "%zu selection events\n", reservoir->reservoir_insertions);
+       }
+
        if (reservoir_write_(STDOUT_FILENO, reservoir, options_.delim_out)) {
                exit(EX_SOFTWARE);
        }