Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
tuple_buffer.h
Go to the documentation of this file.
1
11#ifndef TUPLE_BUFFER_H
12#define TUPLE_BUFFER_H
13
14#include <stdlib.h>
15#include <stdio.h>
16
17#include "types.h"
18
19
20#define CHAINEDBUFF_NUMTUPLESPERBUF (1024*1024)
21
24#define SORTED_MATERIALIZE_TO_FILE 0
25
26#ifndef CACHE_LINE_SIZE
27#define CACHE_LINE_SIZE 64
28#endif
29
30
32typedef struct tuplebuffer_t tuplebuffer_t;
33
35 tuple_t * tuples;
36 tuplebuffer_t * next;
37};
38
40 tuplebuffer_t * buf;
41 tuplebuffer_t * readcursor;
42 tuplebuffer_t * writecursor;
43 uint32_t writepos;
44 uint32_t readpos;
45 uint32_t readlen;
46 uint32_t numbufs;
47};
48
49static inline void
50cb_begin(chainedtuplebuffer_t * cb)
51{
52 cb->readpos = 0;
53 cb->readlen = cb->writepos;
54 cb->readcursor = cb->buf;
55}
56
57static inline void
58cb_begin_backwards(chainedtuplebuffer_t * cb)
59{
60 cb->readpos = cb->writepos - 1;
61 cb->readcursor = cb->buf;
62}
63
64static inline tuple_t *
65cb_read_next(chainedtuplebuffer_t * cb)
66{
67 if(cb->readpos == cb->readlen) {
68 cb->readpos = 0;
69 cb->readlen = CHAINEDBUFF_NUMTUPLESPERBUF;
70 cb->readcursor = cb->readcursor->next;
71 }
72
73 return (cb->readcursor->tuples + cb->readpos ++);
74}
75
76static inline tuple_t *
77cb_read_backwards(chainedtuplebuffer_t * cb)
78{
79 tuple_t * res = (cb->readcursor->tuples + cb->readpos);
80
81 if(cb->readpos == 0) {
82 cb->readpos = CHAINEDBUFF_NUMTUPLESPERBUF - 1;
83 cb->readcursor = cb->readcursor->next;
84 }
85 else {
86 cb->readpos --;
87 }
88
89 return res;
90}
91
92static inline tuple_t *
93cb_next_writepos(chainedtuplebuffer_t * cb)
94{
95 if(cb->writepos == CHAINEDBUFF_NUMTUPLESPERBUF) {
96 tuplebuffer_t * newbuf = (tuplebuffer_t*) malloc(sizeof(tuplebuffer_t));
97 posix_memalign ((void **)&newbuf->tuples,
98 CACHE_LINE_SIZE, sizeof(tuple_t)
99 * CHAINEDBUFF_NUMTUPLESPERBUF);
100
101 newbuf->next = cb->buf;
102 cb->buf = newbuf;
103 cb->writepos = 0;
104 cb->numbufs ++;
105 }
106
107 return (cb->buf->tuples + cb->writepos ++);
108}
109
111chainedtuplebuffer_init(void)
112{
114 malloc(sizeof(chainedtuplebuffer_t));
115 tuplebuffer_t * newbuf = (tuplebuffer_t *) malloc(sizeof(tuplebuffer_t));
116
117 newbuf->tuples = (tuple_t *) malloc(sizeof(tuple_t)
118 * CHAINEDBUFF_NUMTUPLESPERBUF);
119 newbuf->next = NULL;
120
121 newcb->buf = newcb->readcursor = newcb->writecursor = newbuf;
122 newcb->writepos = newcb->readpos = 0;
123 newcb->numbufs = 1;
124
125 return newcb;
126}
127
128static void
129chainedtuplebuffer_free(chainedtuplebuffer_t * cb)
130{
131 tuplebuffer_t * tmp = cb->buf;
132
133 while(tmp) {
134 tuplebuffer_t * tmp2 = tmp->next;
135 free(tmp->tuples);
136 free(tmp);
137 tmp = tmp2;
138 }
139
140 free(cb);
141}
142
144static inline int __attribute__((always_inline))
145thrkeycmp(const void * k1, const void * k2)
146{
147 int val = ((tuple_t *)k2)->key - ((tuple_t *)k1)->key;
148 return val;
149}
150
155static void
156write_result_relation(result_t * res, char * filename)
157{
158#if SORTED_MATERIALIZE_TO_FILE
159 FILE * fp = fopen(filename, "w");
160 int i;
161 int64_t j;
162
163 //fprintf(fp, "#KEY, VAL\n");
164
165 tuple_t threadorder[res->nthreads];
166
167 for(i = 0; i < res->nthreads; i++) {
168 int64_t nresults = res->resultlist[i].nresults;
169
170 if(nresults > 0) {
172 res->resultlist[i].results;
173
174 cb_begin_backwards(cb);
175 tuple_t * tup = cb_read_backwards(cb);
176 threadorder[i].key = tup->key;
177 threadorder[i].payload = i; /* thread index */
178 }
179 else {
180 threadorder[i].key = 0;
181 threadorder[i].payload = -1;
182 }
183 }
184
185 /* just to output thread results sorted */
186 qsort(threadorder, res->nthreads, sizeof(tuple_t), thrkeycmp);
187
188 for(i = 0; i < res->nthreads; i++) {
189 int tid = threadorder[i].payload;
190
191 if(tid != -1) {
193 res->resultlist[tid].results;
194 int64_t nresults = res->resultlist[tid].nresults;
195 cb_begin_backwards(cb);
196
197 for (j = 0; j < nresults; j++) {
198 tuple_t * tup = cb_read_backwards(cb);
199 fprintf(fp, "%d %d\n", tup->key, tup->payload);
200 }
201 }
202 }
203
204 fclose(fp);
205
206#else
207
208 FILE * fp = fopen(filename, "w");
209 int i;
210 int64_t j;
211
212 for(i = 0; i < res->nthreads; i++) {
213
214 // char fname[256];
215 // sprintf(fname, "Thread-%d.tbl", i);
216 // fp = fopen(fname, "w");
217
219 res->resultlist[i].results;
220 int64_t nresults = res->resultlist[i].nresults;
221
222 cb_begin_backwards(cb);
223
224 for (j = 0; j < nresults; j++) {
225 tuple_t * tup = cb_read_backwards(cb);
226 fprintf(fp, "%d %d\n", tup->key, tup->payload);
227 }
228
229 // fclose(fp);
230 }
231
232 fclose(fp);
233
234#endif
235
236}
237
238#endif /* TUPLE_BUFFER_H */
Definition: types.h:74
Definition: types.h:45
Provides general type definitions used by all join algorithms.