Merge branch 'hotfix/open-fix'
[reservoir_sample] / reservoir.c
1 #include <stdlib.h>
2 #include <unistd.h>
3 #include <sys/uio.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <assert.h>
8
9 #include "reservoir.h"
10 #include "notify.h"
11 #include "buf.h"
12 #include "randomness.h"
13 #include "test_suite.h"
14
15 reservoir_t reservoir_new(size_t sz) {
16 reservoir_t reservoir;
17
18 reservoir = malloc((sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
19 if (reservoir == NULL) {
20 NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
21 return NULL;
22 }
23 reservoir->reservoir_sz = sz;
24 reservoir->reservoir_used = 0;
25 reservoir->reservoir_tally = 0;
26 memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
27
28 return reservoir;
29 }
30
31 int reservoir_grow(reservoir_t *preservoir, size_t growby) {
32 assert(preservoir != NULL);
33
34 if (growby) {
35 void *tmp_ptr = realloc(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir);
36 if (tmp_ptr == NULL) {
37 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
38 return -1;
39 }
40 *preservoir = tmp_ptr;
41 (*preservoir)->reservoir_sz += growby;
42 memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used);
43 }
44
45 D_RESERVOIR(*preservoir);
46
47 return 0;
48 }
49
50 void reservoir_remember(reservoir_t reservoir, buf_t buf) {
51 buf_t old_buf;
52
53 assert(reservoir != NULL);
54
55 D_BUF("reserving ", buf);
56
57 if (reservoir->reservoir_sz > 0) {
58 unsigned long randomness;
59
60 if (reservoir->reservoir_used < reservoir->reservoir_sz) {
61 /* there are still unused slots, fill them up without discarding anything */
62 /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
63 randomness = randomness_upto_inclusive(reservoir->reservoir_used);
64
65 assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */
66
67 NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used);
68 reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness];
69 old_buf = NULL; /* no old entry to discard */
70 reservoir->reservoir_used += 1;
71 } else {
72 randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1);
73 old_buf = reservoir->reservoir[randomness];
74 }
75 NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
76 reservoir->reservoir[randomness] = buf;
77 } else {
78 /* can't add anything to a zero-size reservoir, so just dispose of new item */
79 old_buf = buf;
80 }
81
82 reservoir->reservoir_tally += 1;
83
84 if (old_buf != NULL) {
85 D_BUF("FREEING ", old_buf);
86 memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
87 free(old_buf);
88 }
89
90 D_RESERVOIR(reservoir);
91 }
92
93 int reservoir_write(int fd, reservoir_t reservoir, char delim) {
94 ssize_t len;
95 size_t i;
96 struct iovec iov[2];
97
98 iov[1].iov_base = &delim;
99 iov[1].iov_len = sizeof delim;
100
101 assert(reservoir != NULL);
102 D_RESERVOIR(reservoir);
103
104 for (i = 0; i < reservoir->reservoir_sz; i++) {
105 if (reservoir->reservoir[i]) {
106 iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
107 iov[0].iov_len = reservoir->reservoir[i]->buf_used;
108 } else {
109 iov[0].iov_base = NULL;
110 iov[0].iov_len = 0;
111 }
112
113 do {
114 len = writev(fd, iov, sizeof iov / sizeof *iov);
115 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
116 if (len < 0) {
117 NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
118 return -1;
119 }
120 }
121
122 return 0;
123 }
124
125 #define META_BUF_SZ 128
126 int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) {
127 char buf[META_BUF_SZ];
128 int metalen;
129 ssize_t len;
130
131 metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c",
132 reservoir->reservoir_sz, delim,
133 reservoir->reservoir_used, delim,
134 reservoir->reservoir_tally, delim,
135 samples, delim);
136 if ((size_t)metalen >= sizeof buf) {
137 NOTIFY_ERROR("out of buffer");
138 return -1;
139 }
140 do {
141 len = write(fd, buf, metalen);
142 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
143 if (len < metalen) {
144 if (len < 0) {
145 NOTIFY_ERROR("%s:%s", "write", strerror(errno));
146 }
147 return -1;
148 }
149
150 return 0;
151 }
152
153 #ifdef TEST
154
155 void *test_suite_data;
156
157 int test_suite_pre(void *suite_data) {
158 (void)suite_data;
159 if (randomness_init(NULL)) {
160 return -1;
161 }
162 return 0;
163 }
164
165 int test_suite_post(void *suite_data) {
166 (void)suite_data;
167 return 0;
168 }
169
170 test_t test_suite[] = {
171 { NULL, NULL, NULL, NULL, NULL },
172 };
173
174 #endif /* TEST */