X-Git-Url: http://git.squeep.com/?p=reservoir_sample;a=blobdiff_plain;f=reservoir.c;fp=reservoir.c;h=1878203037f0bc95ac16bd5b991b61fded1ec35a;hp=0000000000000000000000000000000000000000;hb=9b5d13ce510e4668d165c0b5ede7fd7f74adcbfc;hpb=c294f0883b05016744fcbfc83241bbb5133a2cb9 diff --git a/reservoir.c b/reservoir.c new file mode 100644 index 0000000..1878203 --- /dev/null +++ b/reservoir.c @@ -0,0 +1,174 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "reservoir.h" +#include "notify.h" +#include "buf.h" +#include "randomness.h" +#include "test_suite.h" + +reservoir_t reservoir_new(size_t sz) { + reservoir_t reservoir; + + reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir); + if (reservoir == NULL) { + NOTIFY_ERROR("%s:%s", "malloc", strerror(errno)); + return NULL; + } + reservoir->reservoir_sz = sz; + reservoir->reservoir_used = 0; + reservoir->reservoir_tally = 0; + memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); + + return reservoir; +} + +int reservoir_grow(reservoir_t *preservoir, size_t growby) { + assert(preservoir != NULL); + + if (growby) { + void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir); + if (tmp_ptr == NULL) { + NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); + return -1; + } + *preservoir = tmp_ptr; + (*preservoir)->reservoir_sz += growby; + memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used); + } + + D_RESERVOIR(*preservoir); + + return 0; +} + +void reservoir_remember(reservoir_t reservoir, buf_t buf) { + buf_t old_buf; + + assert(reservoir != NULL); + + D_BUF("reserving ", buf); + + if (reservoir->reservoir_sz > 0) { + unsigned long randomness; + + if (reservoir->reservoir_used < reservoir->reservoir_sz) { + /* there are still unused slots, fill them up without discarding anything */ + /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ + randomness = randomness_upto_inclusive(reservoir->reservoir_used); + + assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */ + + NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used); + reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness]; + old_buf = NULL; /* no old entry to discard */ + reservoir->reservoir_used += 1; + } else { + randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1); + old_buf = reservoir->reservoir[randomness]; + } + NOTIFY_DEBUG("replacing reservoir index %zu", randomness); + reservoir->reservoir[randomness] = buf; + } else { + /* can't add anything to a zero-size reservoir, so just dispose of new item */ + old_buf = buf; + } + + reservoir->reservoir_tally += 1; + + if (old_buf != NULL) { + D_BUF("FREEING ", old_buf); + memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); + free(old_buf); + } + + D_RESERVOIR(reservoir); +} + +int reservoir_write(int fd, reservoir_t reservoir, char delim) { + ssize_t len; + size_t i; + struct iovec iov[2]; + + iov[1].iov_base = &delim; + iov[1].iov_len = sizeof delim; + + assert(reservoir != NULL); + D_RESERVOIR(reservoir); + + for (i = 0; i < reservoir->reservoir_sz; i++) { + if (reservoir->reservoir[i]) { + iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; + iov[0].iov_len = reservoir->reservoir[i]->buf_used; + } else { + iov[0].iov_base = NULL; + iov[0].iov_len = 0; + } + + do { + len = writev(fd, iov, sizeof iov / sizeof *iov); + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); + if (len < 0) { + NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); + return -1; + } + } + + return 0; +} + +#define META_BUF_SZ 128 +int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) { + char buf[META_BUF_SZ]; + int metalen; + ssize_t len; + + metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c", + reservoir->reservoir_sz, delim, + reservoir->reservoir_used, delim, + reservoir->reservoir_tally, delim, + samples, delim); + if ((size_t)metalen >= sizeof buf) { + NOTIFY_ERROR("out of buffer"); + return -1; + } + do { + len = write(fd, buf, metalen); + } while (len == -1 && (errno == EINTR || errno == EAGAIN)); + if (len < metalen) { + if (len < 0) { + NOTIFY_ERROR("%s:%s", "write", strerror(errno)); + } + return -1; + } + + return 0; +} + +#ifdef TEST + +void *test_suite_data; + +int test_suite_pre(void *suite_data) { + (void)suite_data; + if (randomness_init(NULL)) { + return -1; + } + return 0; +} + +int test_suite_post(void *suite_data) { + (void)suite_data; + return 0; +} + +test_t test_suite[] = { + { NULL, NULL, NULL, NULL, NULL }, +}; + +#endif /* TEST */