Merge branch 'release/1.3'
[reservoir_sample] / reservoir.c
1 #include <stdlib.h>
2 #include <unistd.h>
3 #include <sys/uio.h>
4 #include <stdio.h>
5 #include <string.h>
6 #include <errno.h>
7 #include <assert.h>
8
9 #include "reservoir.h"
10 #include "notify.h"
11 #include "buf.h"
12 #include "randomness.h"
13 #include "test_suite.h"
14
15 /* If these ever need to be overridden.. */
16 void *(*reservoir_malloc_)(size_t) = malloc;
17 void *(*reservoir_realloc_)(void *, size_t) = realloc;
18 void (*reservoir_free_)(void *) = free;
19
20 reservoir_t reservoir_new(size_t sz) {
21 reservoir_t reservoir;
22
23 reservoir = reservoir_malloc_((sz * sizeof *reservoir->reservoir) + sizeof *reservoir);
24 if (reservoir == NULL) {
25 NOTIFY_ERROR("%s:%s", "malloc", strerror(errno));
26 return NULL;
27 }
28 reservoir->reservoir_sz = sz;
29 reservoir->reservoir_used = 0;
30 reservoir->reservoir_tally = 0;
31 memset(reservoir->reservoir, 0, reservoir->reservoir_sz * sizeof *reservoir->reservoir);
32
33 return reservoir;
34 }
35
36 void reservoir_del(reservoir_t *preservoir) {
37 if (preservoir) {
38 if (*preservoir) {
39 while ((*preservoir)->reservoir_used) {
40 (*preservoir)->reservoir_used -= 1;
41 buf_del(&((*preservoir)->reservoir[(*preservoir)->reservoir_used]));
42 }
43 reservoir_free_(*preservoir);
44 *preservoir = NULL;
45 }
46 }
47 }
48
49 int reservoir_grow(reservoir_t *preservoir, size_t growby) {
50 assert(preservoir != NULL);
51
52 if (growby) {
53 void *tmp_ptr = reservoir_realloc_(*preservoir, (((*preservoir)->reservoir_sz + growby) * sizeof *(*preservoir)->reservoir) + sizeof **preservoir);
54 if (tmp_ptr == NULL) {
55 NOTIFY_ERROR("%s:%s", "realloc", strerror(errno));
56 return -1;
57 }
58 *preservoir = tmp_ptr;
59 (*preservoir)->reservoir_sz += growby;
60 memset((*preservoir)->reservoir + (*preservoir)->reservoir_used, 0, (*preservoir)->reservoir_sz - (*preservoir)->reservoir_used);
61 }
62
63 D_RESERVOIR(*preservoir);
64
65 return 0;
66 }
67
68 void reservoir_remember(reservoir_t reservoir, buf_t buf) {
69 buf_t old_buf;
70
71 assert(reservoir != NULL);
72
73 D_BUF("reserving ", buf);
74
75 if (reservoir->reservoir_sz > 0) {
76 unsigned long randomness;
77
78 if (reservoir->reservoir_used < reservoir->reservoir_sz) {
79 /* there are still unused slots, fill them up without discarding anything */
80 /* do this by moving our random victim slot contents to the end of the list, before inserting the new item in its old place */
81 randomness = randomness_upto_inclusive(reservoir->reservoir_used);
82
83 assert(reservoir->reservoir[reservoir->reservoir_used] == NULL); /* yet-unused slots will be null-initialized */
84
85 NOTIFY_DEBUG("preserving existing index %zu to end index (%zu)", randomness, reservoir->reservoir_used);
86 reservoir->reservoir[reservoir->reservoir_used] = reservoir->reservoir[randomness];
87 old_buf = NULL; /* no old entry to discard */
88 reservoir->reservoir_used += 1;
89 } else {
90 randomness = randomness_upto_inclusive(reservoir->reservoir_sz - 1);
91 old_buf = reservoir->reservoir[randomness];
92 }
93 NOTIFY_DEBUG("replacing reservoir index %zu", randomness);
94 reservoir->reservoir[randomness] = buf;
95 } else {
96 /* can't add anything to a zero-size reservoir, so just dispose of new item */
97 old_buf = buf;
98 }
99
100 reservoir->reservoir_tally += 1;
101
102 if (old_buf != NULL) {
103 D_BUF("FREEING ", old_buf);
104 memset(old_buf, 0, old_buf->buf_sz + sizeof *old_buf);
105 buf_del(&old_buf);
106 assert(old_buf == NULL);
107 }
108
109 D_RESERVOIR(reservoir);
110 }
111
112 int reservoir_write(int fd, reservoir_t reservoir, char delim) {
113 ssize_t len;
114 size_t i;
115 struct iovec iov[2];
116
117 iov[1].iov_base = &delim;
118 iov[1].iov_len = sizeof delim;
119
120 assert(reservoir != NULL);
121 D_RESERVOIR(reservoir);
122
123 for (i = 0; i < reservoir->reservoir_sz; i++) {
124 if (reservoir->reservoir[i]) {
125 iov[0].iov_base = reservoir->reservoir[i]->buf + reservoir->reservoir[i]->buf_start;
126 iov[0].iov_len = reservoir->reservoir[i]->buf_used;
127 } else {
128 iov[0].iov_base = NULL;
129 iov[0].iov_len = 0;
130 }
131
132 do {
133 len = writev(fd, iov, sizeof iov / sizeof *iov);
134 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
135 if (len < 0) {
136 NOTIFY_ERROR("%s:%s", "writev", strerror(errno));
137 return -1;
138 }
139 }
140
141 return 0;
142 }
143
144 #define META_BUF_SZ 128
145 int reservoir_write_meta(int fd, reservoir_t reservoir, unsigned long samples, char delim) {
146 char buf[META_BUF_SZ];
147 int metalen;
148 ssize_t len;
149
150 metalen = snprintf(buf, sizeof buf, "sz:%zu%cused:%zu%crecorded:%lu%csamples:%lu%c",
151 reservoir->reservoir_sz, delim,
152 reservoir->reservoir_used, delim,
153 reservoir->reservoir_tally, delim,
154 samples, delim);
155 if ((size_t)metalen >= sizeof buf) {
156 NOTIFY_ERROR("out of buffer");
157 return -1;
158 }
159 do {
160 len = write(fd, buf, metalen);
161 } while (len == -1 && (errno == EINTR || errno == EAGAIN));
162 if (len < metalen) {
163 if (len < 0) {
164 NOTIFY_ERROR("%s:%s", "write", strerror(errno));
165 }
166 return -1;
167 }
168
169 return 0;
170 }
171
172 #ifdef TEST
173
174 void *test_suite_data;
175
176 int test_suite_pre(void *suite_data) {
177 (void)suite_data;
178 if (randomness_init(NULL)) {
179 return -1;
180 }
181 return 0;
182 }
183
184 int test_suite_post(void *suite_data) {
185 (void)suite_data;
186 return 0;
187 }
188
189 test_t test_suite[] = {
190 { NULL, NULL, NULL, NULL, NULL },
191 };
192
193 #endif /* TEST */