20#define CHAINEDBUFF_NUMTUPLESPERBUF (1024*1024)
24#define SORTED_MATERIALIZE_TO_FILE 0
26#ifndef CACHE_LINE_SIZE
27#define CACHE_LINE_SIZE 64
53 cb->readlen = cb->writepos;
54 cb->readcursor = cb->buf;
60 cb->readpos = cb->writepos - 1;
61 cb->readcursor = cb->buf;
67 if(cb->readpos == cb->readlen) {
69 cb->readlen = CHAINEDBUFF_NUMTUPLESPERBUF;
70 cb->readcursor = cb->readcursor->next;
73 return (cb->readcursor->tuples + cb->readpos ++);
79 tuple_t * res = (cb->readcursor->tuples + cb->readpos);
81 if(cb->readpos == 0) {
82 cb->readpos = CHAINEDBUFF_NUMTUPLESPERBUF - 1;
83 cb->readcursor = cb->readcursor->next;
95 if(cb->writepos == CHAINEDBUFF_NUMTUPLESPERBUF) {
97 posix_memalign ((
void **)&newbuf->tuples,
98 CACHE_LINE_SIZE,
sizeof(
tuple_t)
99 * CHAINEDBUFF_NUMTUPLESPERBUF);
101 newbuf->next = cb->buf;
107 return (cb->buf->tuples + cb->writepos ++);
111chainedtuplebuffer_init(
void)
118 * CHAINEDBUFF_NUMTUPLESPERBUF);
121 newcb->buf = newcb->readcursor = newcb->writecursor = newbuf;
122 newcb->writepos = newcb->readpos = 0;
144static inline int __attribute__((always_inline))
145thrkeycmp(
const void * k1,
const void * k2)
156write_result_relation(
result_t * res,
char * filename)
158#if SORTED_MATERIALIZE_TO_FILE
159 FILE * fp = fopen(filename,
"w");
165 tuple_t threadorder[res->nthreads];
167 for(i = 0; i < res->nthreads; i++) {
168 int64_t nresults = res->resultlist[i].nresults;
172 res->resultlist[i].results;
174 cb_begin_backwards(cb);
175 tuple_t * tup = cb_read_backwards(cb);
176 threadorder[i].key = tup->key;
177 threadorder[i].payload = i;
180 threadorder[i].key = 0;
181 threadorder[i].payload = -1;
186 qsort(threadorder, res->nthreads,
sizeof(
tuple_t), thrkeycmp);
188 for(i = 0; i < res->nthreads; i++) {
189 int tid = threadorder[i].payload;
193 res->resultlist[tid].results;
194 int64_t nresults = res->resultlist[tid].nresults;
195 cb_begin_backwards(cb);
197 for (j = 0; j < nresults; j++) {
198 tuple_t * tup = cb_read_backwards(cb);
199 fprintf(fp,
"%d %d\n", tup->key, tup->payload);
208 FILE * fp = fopen(filename,
"w");
212 for(i = 0; i < res->nthreads; i++) {
219 res->resultlist[i].results;
220 int64_t nresults = res->resultlist[i].nresults;
222 cb_begin_backwards(cb);
224 for (j = 0; j < nresults; j++) {
225 tuple_t * tup = cb_read_backwards(cb);
226 fprintf(fp,
"%d %d\n", tup->key, tup->payload);
Provides general type definitions used by all join algorithms.