37#ifdef JOIN_RESULT_MATERIALIZE
43#define BARRIER_ARRIVE(B,RV) \
44 RV = pthread_barrier_wait(B); \
45 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
46 printf("Couldn't wait on barrier\n"); \
57#define NEXT_POW_2(V) \
70#define HASH(X, MASK, SKIP) (((X) & MASK) >> SKIP)
75#define DEBUGMSG(COND, MSG, ...) \
76 if(COND) { fprintf(stdout, "[DEBUG] "MSG, ## __VA_ARGS__); }
78#define DEBUGMSG(COND, MSG, ...)
95 pthread_barrier_t * barrier;
103 uint64_t timer1, timer2, timer3;
104 struct timeval start, end;
126 overflowbuf->count = 0;
127 overflowbuf->next = NULL;
129 *ppbuf = overflowbuf;
144 *result = (*buf)->buf + (*buf)->count;
152 new_buf->next = *buf;
154 *result = new_buf->buf;
188 ht->num_buckets = nbuckets;
193 ht->num_buckets *
sizeof(
bucket_t))){
194 perror(
"Aligned allocation failed!\n");
206 memset(ht->buckets, 0, ht->num_buckets *
sizeof(
bucket_t));
208 ht->hash_mask = (ht->num_buckets - 1) << ht->skip_bits;
234 const uint32_t hashmask = ht->hash_mask;
235 const uint32_t skipbits = ht->skip_bits;
237 for(i=0; i < rel->num_tuples; i++){
240 int32_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
244 curr = ht->buckets + idx;
257 dest = nxt->tuples + nxt->count;
262 dest = curr->tuples + curr->count;
265 *dest = rel->tuples[i];
285 const uint32_t hashmask = ht->hash_mask;
286 const uint32_t skipbits = ht->skip_bits;
288 size_t prefetch_index = PREFETCH_DISTANCE;
293#ifdef JOIN_RESULT_MATERIALIZE
297 for (i = 0; i < rel->num_tuples; i++)
300 if (prefetch_index < rel->num_tuples) {
301 intkey_t idx_prefetch = HASH(rel->tuples[prefetch_index++].key,
303 __builtin_prefetch(ht->buckets + idx_prefetch, 0, 1);
307 intkey_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
311 for(j = 0; j < b->count; j++) {
312 if(rel->tuples[i].key == b->tuples[j].key){
315 #ifdef JOIN_RESULT_MATERIALIZE
317 tuple_t * joinres = cb_next_writepos(chainedbuf);
318 joinres->key = b->tuples[j].payload;
319 joinres->payload = rel->tuples[i].payload;
334print_timing(uint64_t total, uint64_t build, uint64_t part,
335 uint64_t numtuples, int64_t result,
336 struct timeval * start,
struct timeval * end)
338 double diff_usec = (((*end).tv_sec*1000000L + (*end).tv_usec)
339 - ((*start).tv_sec*1000000L+(*start).tv_usec));
340 double cyclestuple = total;
341 cyclestuple /= numtuples;
342 fprintf(stdout,
"RUNTIME TOTAL, BUILD, PART (cycles): \n");
343 fprintf(stderr,
"%llu \t %llu \t %llu ",
345 fprintf(stdout,
"\n");
346 fprintf(stdout,
"TOTAL-TIME-USECS, TOTAL-TUPLES, CYCLES-PER-TUPLE: \n");
347 fprintf(stdout,
"%.4lf \t %llu \t ", diff_usec, result);
349 fprintf(stderr,
"%.4lf ", cyclestuple);
351 fprintf(stdout,
"\n");
364 struct timeval start, end;
365 uint64_t timer1, timer2, timer3;
367 uint32_t nbuckets = (relR->num_tuples /
BUCKET_SIZE);
371#ifdef JOIN_RESULT_MATERIALIZE
376 gettimeofday(&start, NULL);
388#ifdef JOIN_RESULT_MATERIALIZE
391 void * chainedbuf = NULL;
396#ifdef JOIN_RESULT_MATERIALIZE
398 thrres->nresults = result;
399 thrres->threadid = 0;
400 thrres->results = (
void *) chainedbuf;
405 gettimeofday(&end, NULL);
407 print_timing(timer1, timer2, timer3, relS->num_tuples, result, &start, &end);
412 joinresult->totalresults = result;
413 joinresult->nthreads = 1;
431 const uint32_t hashmask = ht->hash_mask;
432 const uint32_t skipbits = ht->skip_bits;
435 size_t prefetch_index = PREFETCH_DISTANCE;
438 for(i=0; i < rel->num_tuples; i++){
443 if (prefetch_index < rel->num_tuples) {
444 intkey_t idx_prefetch = HASH(rel->tuples[prefetch_index++].key,
446 __builtin_prefetch(ht->buckets + idx_prefetch, 1, 1);
450 int32_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
453 curr = ht->buckets+idx;
462 get_new_bucket(&b, overflowbuf);
469 dest = nxt->tuples + nxt->count;
474 dest = curr->tuples + curr->count;
478 *dest = rel->tuples[i];
479 unlock(&curr->latch);
514 gettimeofday(&args->start, NULL);
515 startTimer(&args->timer1);
516 startTimer(&args->timer2);
530 PCM_log(
"========== Build phase profiling results ==========\n");
542 stopTimer(&args->timer2);
546#ifdef JOIN_RESULT_MATERIALIZE
549 void * chainedbuf = NULL;
553 args->num_results =
probe_hashtable(args->ht, &args->relS, chainedbuf);
555#ifdef JOIN_RESULT_MATERIALIZE
556 args->threadresult->nresults = args->num_results;
557 args->threadresult->threadid = args->tid;
558 args->threadresult->results = (
void *) chainedbuf;
567 stopTimer(&args->timer1);
568 gettimeofday(&args->end, NULL);
575 PCM_log(
"========== Probe phase profiling results ==========\n");
577 PCM_log(
"===================================================\n");
596 int32_t numR, numS, numRthr, numSthr;
599 arg_t args[nthreads];
600 pthread_t tid[nthreads];
602 pthread_barrier_t barrier;
607#ifdef JOIN_RESULT_MATERIALIZE
612 uint32_t nbuckets = (relR->num_tuples /
BUCKET_SIZE);
615 numR = relR->num_tuples;
616 numS = relS->num_tuples;
617 numRthr = numR / nthreads;
618 numSthr = numS / nthreads;
620 rv = pthread_barrier_init(&barrier, NULL, nthreads);
622 printf(
"Couldn't create the barrier\n");
626 pthread_attr_init(&attr);
627 for(i = 0; i < nthreads; i++){
630 DEBUGMSG(1,
"Assigning thread-%d to CPU-%d\n", i, cpu_idx);
633 CPU_SET(cpu_idx, &set);
634 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &set);
638 args[i].barrier = &barrier;
641 args[i].relR.num_tuples = (i == (nthreads-1)) ? numR : numRthr;
642 args[i].relR.tuples = relR->tuples + numRthr * i;
646 args[i].relS.num_tuples = (i == (nthreads-1)) ? numS : numSthr;
647 args[i].relS.tuples = relS->tuples + numSthr * i;
650 args[i].threadresult = &(joinresult->resultlist[i]);
652 rv = pthread_create(&tid[i], &attr,
npo_thread, (
void*)&args[i]);
654 printf(
"ERROR; return code from pthread_create() is %d\n", rv);
660 for(i = 0; i < nthreads; i++){
661 pthread_join(tid[i], NULL);
663 result += args[i].num_results;
665 joinresult->totalresults = result;
666 joinresult->nthreads = nthreads;
671 print_timing(args[0].timer1, args[0].timer2, args[0].timer3,
672 relS->num_tuples, result,
673 &args[0].start, &args[0].end);
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
Barrier implementation, defaults to Pthreads. On Mac custom implementation since barriers are not inc...
#define BARRIER_ARRIVE(B, RV)
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
int64_t probe_hashtable(hashtable_t *ht, relation_t *rel, void *output)
void build_hashtable_st(hashtable_t *ht, relation_t *rel)
void destroy_hashtable(hashtable_t *ht)
void allocate_hashtable(hashtable_t **ppht, uint32_t nbuckets)
result_t * NPO(relation_t *relR, relation_t *relS, int nthreads)
void * npo_thread(void *param)
result_t * NPO_st(relation_t *relR, relation_t *relS, int nthreads)
void build_hashtable_mt(hashtable_t *ht, relation_t *rel, bucket_buffer_t **overflowbuf)
void init_bucket_buffer(bucket_buffer_t **ppbuf)
void free_bucket_buffer(bucket_buffer_t *buf)
int get_cpu_id(int thread_id)
#define DEBUGMSG(COND, MSG,...)
The interface of No partitioning optimized (NPO) join algorithm.
Constant parameters used by No Partitioning Join implementations.
#define OVERFLOW_BUF_SIZE
Provides type definitions used by No Partitioning Join implementations.
An interface to the Intel Performance Counters Monitoring.
Implements a chained-buffer storage model for tuples.