#include #include #include #include #include #include #include #include "reservoir.h" #include "notify.h" #include "buf.h" #include "randomness.h" #include "test_suite.h" /* If these ever need to be overridden.. */ void *(*reservoir_malloc_)(size_t) = malloc; void *(*reservoir_realloc_)(void *, size_t) = realloc; void (*reservoir_free_)(void *) = free; reservoir_t reservoir_new(size_t sz) { reservoir_t reservoir; reservoir = reservoir_malloc_((sz * sizeof *reservoir->reservoir) + sizeof *reservoir); if (reservoir == NULL) { NOTIFY_ERROR("%s:%s", "malloc", strerror(errno)); return NULL; } reservoir->reservoir_sz = sz; reservoir->reservoir_used = 0; reservoir->reservoir_tally = 0; memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir); return reservoir; } void reservoir_del(reservoir_t *preservoir) { if (preservoir) { if (*preservoir) { while ((*preservoir)->reservoir_used) { (*preservoir)->reservoir_used -= 1; buf_del(&((*preservoir)->reservoir[(*preservoir)->reservoir_used])); } reservoir_free_(*preservoir); *preservoir = NULL; } } } int reservoir_grow(reservoir_t *preservoir, size_t growby) { assert(preservoir != NULL); if (growby) { void *tmp_ptr = reservoir_realloc_(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir); if (tmp_ptr == NULL) { NOTIFY_ERROR("%s:%s", "realloc", strerror(errno)); return -1; } *preservoir = tmp_ptr; (*preservoir)->reservoir_sz += growby; memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used); } D_RESERVOIR(*preservoir); return 0; } void reservoir_remember(reservoir_t reservoir, buf_t buf) { buf_t old_buf; assert(reservoir != NULL); D_BUF("reserving ", buf); if (reservoir->reservoir_sz > 0) { unsigned long randomness; if (reservoir->reservoir_used < reservoir->reservoir_sz) { /* there are still unused slots, fill them up without discarding anything */ /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */ randomness = randomness_upto_inclusive(reservoir->reservoir_used); assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */ NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used); reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness]; old_buf = NULL; /* no old entry to discard */ reservoir->reservoir_used += 1; } else { randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1); old_buf = reservoir->reservoir[randomness]; } NOTIFY_DEBUG("replacing reservoir index %zu", randomness); reservoir->reservoir[randomness] = buf; } else { /* can't add anything to a zero-size reservoir, so just dispose of new item */ old_buf = buf; } reservoir->reservoir_tally += 1; if (old_buf != NULL) { D_BUF("FREEING ", old_buf); memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf); buf_del(&old_buf); assert(old_buf == NULL); } D_RESERVOIR(reservoir); } int reservoir_write(int fd, reservoir_t reservoir, char delim) { ssize_t len; size_t i; struct iovec iov[2]; iov[1].iov_base = &delim; iov[1].iov_len = sizeof delim; assert(reservoir != NULL); D_RESERVOIR(reservoir); for (i = 0; i < reservoir->reservoir_sz; i++) { if (reservoir->reservoir[i]) { iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start; iov[0].iov_len = reservoir->reservoir[i]->buf_used; } else { iov[0].iov_base = NULL; iov[0].iov_len = 0; } do { len = writev(fd, iov, sizeof iov / sizeof *iov); } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < 0) { NOTIFY_ERROR("%s:%s", "writev", strerror(errno)); return -1; } } return 0; } #define META_BUF_SZ 128 int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) { char buf[META_BUF_SZ]; int metalen; ssize_t len; metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c", reservoir->reservoir_sz, delim, reservoir->reservoir_used, delim, reservoir->reservoir_tally, delim, samples, delim); if ((size_t)metalen >= sizeof buf) { NOTIFY_ERROR("out of buffer"); return -1; } do { len = write(fd, buf, metalen); } while (len == -1 && (errno == EINTR || errno == EAGAIN)); if (len < metalen) { if (len < 0) { NOTIFY_ERROR("%s:%s", "write", strerror(errno)); } return -1; } return 0; } #ifdef TEST void *test_suite_data; int test_suite_pre(void *suite_data) { (void)suite_data; if (randomness_init(NULL)) { return -1; } return 0; } int test_suite_post(void *suite_data) { (void)suite_data; return 0; } test_t test_suite[] = { { NULL, NULL, NULL, NULL, NULL }, }; #endif /* TEST */