Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
no_partitioning_join.c
Go to the documentation of this file.
1
13#ifndef _GNU_SOURCE
14#define _GNU_SOURCE
15#endif
16#include <sched.h> /* CPU_ZERO, CPU_SET */
17#include <pthread.h> /* pthread_* */
18#include <string.h> /* memset */
19#include <stdio.h> /* printf */
20#include <stdlib.h> /* memalign */
21#include <sys/time.h> /* gettimeofday */
22
24#include "npj_params.h" /* constant parameters */
25#include "npj_types.h" /* bucket_t, hashtable_t, bucket_buffer_t */
26#include "rdtsc.h" /* startTimer, stopTimer */
27#include "lock.h" /* lock, unlock */
28#include "cpu_mapping.h" /* get_cpu_id */
29#ifdef PERF_COUNTERS
30#include "perf_counters.h" /* PCM_x */
31#endif
32
33#include "barrier.h" /* pthread_barrier_* */
34#include "affinity.h" /* pthread_attr_setaffinity_np */
35#include "generator.h" /* numa_localize() */
36
37#ifdef JOIN_RESULT_MATERIALIZE
38#include "tuple_buffer.h" /* for materialization */
39#endif
40
41#ifndef BARRIER_ARRIVE
43#define BARRIER_ARRIVE(B,RV) \
44 RV = pthread_barrier_wait(B); \
45 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
46 printf("Couldn't wait on barrier\n"); \
47 exit(EXIT_FAILURE); \
48 }
49#endif
50
51#ifndef NEXT_POW_2
57#define NEXT_POW_2(V) \
58 do { \
59 V--; \
60 V |= V >> 1; \
61 V |= V >> 2; \
62 V |= V >> 4; \
63 V |= V >> 8; \
64 V |= V >> 16; \
65 V++; \
66 } while(0)
67#endif
68
69#ifndef HASH
70#define HASH(X, MASK, SKIP) (((X) & MASK) >> SKIP)
71#endif
72
74#ifdef DEBUG
75#define DEBUGMSG(COND, MSG, ...) \
76 if(COND) { fprintf(stdout, "[DEBUG] "MSG, ## __VA_ARGS__); }
77#else
78#define DEBUGMSG(COND, MSG, ...)
79#endif
80
82extern int numalocalize; /* defined in generator.c */
83extern int nthreads; /* defined in generator.c */
84
88typedef struct arg_t arg_t;
89
90struct arg_t {
91 int32_t tid;
92 hashtable_t * ht;
93 relation_t relR;
94 relation_t relS;
95 pthread_barrier_t * barrier;
96 int64_t num_results;
97
98 /* results of the thread */
99 threadresult_t * threadresult;
100
101#ifndef NO_TIMING
102 /* stats about the thread */
103 uint64_t timer1, timer2, timer3;
104 struct timeval start, end;
105#endif
106} ;
107
121void
123{
124 bucket_buffer_t * overflowbuf;
125 overflowbuf = (bucket_buffer_t*) malloc(sizeof(bucket_buffer_t));
126 overflowbuf->count = 0;
127 overflowbuf->next = NULL;
128
129 *ppbuf = overflowbuf;
130}
131
140static inline void
141get_new_bucket(bucket_t ** result, bucket_buffer_t ** buf)
142{
143 if((*buf)->count < OVERFLOW_BUF_SIZE) {
144 *result = (*buf)->buf + (*buf)->count;
145 (*buf)->count ++;
146 }
147 else {
148 /* need to allocate new buffer */
149 bucket_buffer_t * new_buf = (bucket_buffer_t*)
150 malloc(sizeof(bucket_buffer_t));
151 new_buf->count = 1;
152 new_buf->next = *buf;
153 *buf = new_buf;
154 *result = new_buf->buf;
155 }
156}
157
159void
161{
162 do {
163 bucket_buffer_t * tmp = buf->next;
164 free(buf);
165 buf = tmp;
166 } while(buf);
167}
168
182void
183allocate_hashtable(hashtable_t ** ppht, uint32_t nbuckets)
184{
185 hashtable_t * ht;
186
187 ht = (hashtable_t*)malloc(sizeof(hashtable_t));
188 ht->num_buckets = nbuckets;
189 NEXT_POW_2((ht->num_buckets));
190
191 /* allocate hashtable buckets cache line aligned */
192 if (posix_memalign((void**)&ht->buckets, CACHE_LINE_SIZE,
193 ht->num_buckets * sizeof(bucket_t))){
194 perror("Aligned allocation failed!\n");
195 exit(EXIT_FAILURE);
196 }
197
200 if(numalocalize) {
201 tuple_t * mem = (tuple_t *) ht->buckets;
202 uint32_t ntuples = (ht->num_buckets*sizeof(bucket_t))/sizeof(tuple_t);
203 numa_localize(mem, ntuples, nthreads);
204 }
205
206 memset(ht->buckets, 0, ht->num_buckets * sizeof(bucket_t));
207 ht->skip_bits = 0; /* the default for modulo hash */
208 ht->hash_mask = (ht->num_buckets - 1) << ht->skip_bits;
209 *ppht = ht;
210}
211
217void
219{
220 free(ht->buckets);
221 free(ht);
222}
223
230void
232{
233 uint32_t i;
234 const uint32_t hashmask = ht->hash_mask;
235 const uint32_t skipbits = ht->skip_bits;
236
237 for(i=0; i < rel->num_tuples; i++){
238 tuple_t * dest;
239 bucket_t * curr, * nxt;
240 int32_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
241
242 /* copy the tuple to appropriate hash bucket */
243 /* if full, follow nxt pointer to find correct place */
244 curr = ht->buckets + idx;
245 nxt = curr->next;
246
247 if(curr->count == BUCKET_SIZE) {
248 if(!nxt || nxt->count == BUCKET_SIZE) {
249 bucket_t * b;
250 b = (bucket_t*) calloc(1, sizeof(bucket_t));
251 curr->next = b;
252 b->next = nxt;
253 b->count = 1;
254 dest = b->tuples;
255 }
256 else {
257 dest = nxt->tuples + nxt->count;
258 nxt->count ++;
259 }
260 }
261 else {
262 dest = curr->tuples + curr->count;
263 curr->count ++;
264 }
265 *dest = rel->tuples[i];
266 }
267}
268
279int64_t
280probe_hashtable(hashtable_t *ht, relation_t *rel, void * output)
281{
282 uint32_t i, j;
283 int64_t matches;
284
285 const uint32_t hashmask = ht->hash_mask;
286 const uint32_t skipbits = ht->skip_bits;
287#ifdef PREFETCH_NPJ
288 size_t prefetch_index = PREFETCH_DISTANCE;
289#endif
290
291 matches = 0;
292
293#ifdef JOIN_RESULT_MATERIALIZE
294 chainedtuplebuffer_t * chainedbuf = (chainedtuplebuffer_t *) output;
295#endif
296
297 for (i = 0; i < rel->num_tuples; i++)
298 {
299#ifdef PREFETCH_NPJ
300 if (prefetch_index < rel->num_tuples) {
301 intkey_t idx_prefetch = HASH(rel->tuples[prefetch_index++].key,
302 hashmask, skipbits);
303 __builtin_prefetch(ht->buckets + idx_prefetch, 0, 1);
304 }
305#endif
306
307 intkey_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
308 bucket_t * b = ht->buckets+idx;
309
310 do {
311 for(j = 0; j < b->count; j++) {
312 if(rel->tuples[i].key == b->tuples[j].key){
313 matches ++;
314
315 #ifdef JOIN_RESULT_MATERIALIZE
316 /* copy to the result buffer */
317 tuple_t * joinres = cb_next_writepos(chainedbuf);
318 joinres->key = b->tuples[j].payload; /* R-rid */
319 joinres->payload = rel->tuples[i].payload; /* S-rid */
320#endif
321
322 }
323 }
324
325 b = b->next;/* follow overflow pointer */
326 } while(b);
327 }
328
329 return matches;
330}
331
333static void
334print_timing(uint64_t total, uint64_t build, uint64_t part,
335 uint64_t numtuples, int64_t result,
336 struct timeval * start, struct timeval * end)
337{
338 double diff_usec = (((*end).tv_sec*1000000L + (*end).tv_usec)
339 - ((*start).tv_sec*1000000L+(*start).tv_usec));
340 double cyclestuple = total;
341 cyclestuple /= numtuples;
342 fprintf(stdout, "RUNTIME TOTAL, BUILD, PART (cycles): \n");
343 fprintf(stderr, "%llu \t %llu \t %llu ",
344 total, build, part);
345 fprintf(stdout, "\n");
346 fprintf(stdout, "TOTAL-TIME-USECS, TOTAL-TUPLES, CYCLES-PER-TUPLE: \n");
347 fprintf(stdout, "%.4lf \t %llu \t ", diff_usec, result);
348 fflush(stdout);
349 fprintf(stderr, "%.4lf ", cyclestuple);
350 fflush(stderr);
351 fprintf(stdout, "\n");
352
353}
354
356result_t *
357NPO_st(relation_t *relR, relation_t *relS, int nthreads)
358{
359 hashtable_t * ht;
360 int64_t result = 0;
361 result_t * joinresult;
362
363#ifndef NO_TIMING
364 struct timeval start, end;
365 uint64_t timer1, timer2, timer3;
366#endif
367 uint32_t nbuckets = (relR->num_tuples / BUCKET_SIZE);
368 allocate_hashtable(&ht, nbuckets);
369
370 joinresult = (result_t *) malloc(sizeof(result_t));
371#ifdef JOIN_RESULT_MATERIALIZE
372 joinresult->resultlist = (threadresult_t *) malloc(sizeof(threadresult_t));
373#endif
374
375#ifndef NO_TIMING
376 gettimeofday(&start, NULL);
377 startTimer(&timer1);
378 startTimer(&timer2);
379 timer3 = 0; /* no partitioning */
380#endif
381
382 build_hashtable_st(ht, relR);
383
384#ifndef NO_TIMING
385 stopTimer(&timer2); /* for build */
386#endif
387
388#ifdef JOIN_RESULT_MATERIALIZE
389 chainedtuplebuffer_t * chainedbuf = chainedtuplebuffer_init();
390#else
391 void * chainedbuf = NULL;
392#endif
393
394 result = probe_hashtable(ht, relS, chainedbuf);
395
396#ifdef JOIN_RESULT_MATERIALIZE
397 threadresult_t * thrres = &(joinresult->resultlist[0]);/* single-thread */
398 thrres->nresults = result;
399 thrres->threadid = 0;
400 thrres->results = (void *) chainedbuf;
401#endif
402
403#ifndef NO_TIMING
404 stopTimer(&timer1); /* over all */
405 gettimeofday(&end, NULL);
406 /* now print the timing results: */
407 print_timing(timer1, timer2, timer3, relS->num_tuples, result, &start, &end);
408#endif
409
411
412 joinresult->totalresults = result;
413 joinresult->nthreads = 1;
414
415 return joinresult;
416}
417
426void
428 bucket_buffer_t ** overflowbuf)
429{
430 uint32_t i;
431 const uint32_t hashmask = ht->hash_mask;
432 const uint32_t skipbits = ht->skip_bits;
433
434#ifdef PREFETCH_NPJ
435 size_t prefetch_index = PREFETCH_DISTANCE;
436#endif
437
438 for(i=0; i < rel->num_tuples; i++){
439 tuple_t * dest;
440 bucket_t * curr, * nxt;
441
442#ifdef PREFETCH_NPJ
443 if (prefetch_index < rel->num_tuples) {
444 intkey_t idx_prefetch = HASH(rel->tuples[prefetch_index++].key,
445 hashmask, skipbits);
446 __builtin_prefetch(ht->buckets + idx_prefetch, 1, 1);
447 }
448#endif
449
450 int32_t idx = HASH(rel->tuples[i].key, hashmask, skipbits);
451 /* copy the tuple to appropriate hash bucket */
452 /* if full, follow nxt pointer to find correct place */
453 curr = ht->buckets+idx;
454 lock(&curr->latch);
455 nxt = curr->next;
456
457 if(curr->count == BUCKET_SIZE) {
458 if(!nxt || nxt->count == BUCKET_SIZE) {
459 bucket_t * b;
460 /* b = (bucket_t*) calloc(1, sizeof(bucket_t)); */
461 /* instead of calloc() everytime, we pre-allocate */
462 get_new_bucket(&b, overflowbuf);
463 curr->next = b;
464 b->next = nxt;
465 b->count = 1;
466 dest = b->tuples;
467 }
468 else {
469 dest = nxt->tuples + nxt->count;
470 nxt->count ++;
471 }
472 }
473 else {
474 dest = curr->tuples + curr->count;
475 curr->count ++;
476 }
477
478 *dest = rel->tuples[i];
479 unlock(&curr->latch);
480 }
481
482}
483
491void *
492npo_thread(void * param)
493{
494 int rv;
495 arg_t * args = (arg_t*) param;
496
497 /* allocate overflow buffer for each thread */
498 bucket_buffer_t * overflowbuf;
499 init_bucket_buffer(&overflowbuf);
500
501#ifdef PERF_COUNTERS
502 if(args->tid == 0){
503 PCM_initPerformanceMonitor(NULL, NULL);
504 PCM_start();
505 }
506#endif
507
508 /* wait at a barrier until each thread starts and start timer */
509 BARRIER_ARRIVE(args->barrier, rv);
510
511#ifndef NO_TIMING
512 /* the first thread checkpoints the start time */
513 if(args->tid == 0){
514 gettimeofday(&args->start, NULL);
515 startTimer(&args->timer1);
516 startTimer(&args->timer2);
517 args->timer3 = 0; /* no partitionig phase */
518 }
519#endif
520
521 /* insert tuples from the assigned part of relR to the ht */
522 build_hashtable_mt(args->ht, &args->relR, &overflowbuf);
523
524 /* wait at a barrier until each thread completes build phase */
525 BARRIER_ARRIVE(args->barrier, rv);
526
527#ifdef PERF_COUNTERS
528 if(args->tid == 0){
529 PCM_stop();
530 PCM_log("========== Build phase profiling results ==========\n");
532 PCM_start();
533 }
534 /* Just to make sure we get consistent performance numbers */
535 BARRIER_ARRIVE(args->barrier, rv);
536#endif
537
538
539#ifndef NO_TIMING
540 /* build phase finished, thread-0 checkpoints the time */
541 if(args->tid == 0){
542 stopTimer(&args->timer2);
543 }
544#endif
545
546#ifdef JOIN_RESULT_MATERIALIZE
547 chainedtuplebuffer_t * chainedbuf = chainedtuplebuffer_init();
548#else
549 void * chainedbuf = NULL;
550#endif
551
552 /* probe for matching tuples from the assigned part of relS */
553 args->num_results = probe_hashtable(args->ht, &args->relS, chainedbuf);
554
555#ifdef JOIN_RESULT_MATERIALIZE
556 args->threadresult->nresults = args->num_results;
557 args->threadresult->threadid = args->tid;
558 args->threadresult->results = (void *) chainedbuf;
559#endif
560
561#ifndef NO_TIMING
562 /* for a reliable timing we have to wait until all finishes */
563 BARRIER_ARRIVE(args->barrier, rv);
564
565 /* probe phase finished, thread-0 checkpoints the time */
566 if(args->tid == 0){
567 stopTimer(&args->timer1);
568 gettimeofday(&args->end, NULL);
569 }
570#endif
571
572#ifdef PERF_COUNTERS
573 if(args->tid == 0) {
574 PCM_stop();
575 PCM_log("========== Probe phase profiling results ==========\n");
577 PCM_log("===================================================\n");
578 PCM_cleanup();
579 }
580 /* Just to make sure we get consistent performance numbers */
581 BARRIER_ARRIVE(args->barrier, rv);
582#endif
583
584 /* clean-up the overflow buffers */
585 free_bucket_buffer(overflowbuf);
586
587 return 0;
588}
589
591result_t *
592NPO(relation_t *relR, relation_t *relS, int nthreads)
593{
594 hashtable_t * ht;
595 int64_t result = 0;
596 int32_t numR, numS, numRthr, numSthr; /* total and per thread num */
597 int i, rv;
598 cpu_set_t set;
599 arg_t args[nthreads];
600 pthread_t tid[nthreads];
601 pthread_attr_t attr;
602 pthread_barrier_t barrier;
603
604 result_t * joinresult = 0;
605 joinresult = (result_t *) malloc(sizeof(result_t));
606
607#ifdef JOIN_RESULT_MATERIALIZE
608 joinresult->resultlist = (threadresult_t *) malloc(sizeof(threadresult_t)
609 * nthreads);
610#endif
611
612 uint32_t nbuckets = (relR->num_tuples / BUCKET_SIZE);
613 allocate_hashtable(&ht, nbuckets);
614
615 numR = relR->num_tuples;
616 numS = relS->num_tuples;
617 numRthr = numR / nthreads;
618 numSthr = numS / nthreads;
619
620 rv = pthread_barrier_init(&barrier, NULL, nthreads);
621 if(rv != 0){
622 printf("Couldn't create the barrier\n");
623 exit(EXIT_FAILURE);
624 }
625
626 pthread_attr_init(&attr);
627 for(i = 0; i < nthreads; i++){
628 int cpu_idx = get_cpu_id(i);
629
630 DEBUGMSG(1, "Assigning thread-%d to CPU-%d\n", i, cpu_idx);
631
632 CPU_ZERO(&set);
633 CPU_SET(cpu_idx, &set);
634 pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &set);
635
636 args[i].tid = i;
637 args[i].ht = ht;
638 args[i].barrier = &barrier;
639
640 /* assing part of the relR for next thread */
641 args[i].relR.num_tuples = (i == (nthreads-1)) ? numR : numRthr;
642 args[i].relR.tuples = relR->tuples + numRthr * i;
643 numR -= numRthr;
644
645 /* assing part of the relS for next thread */
646 args[i].relS.num_tuples = (i == (nthreads-1)) ? numS : numSthr;
647 args[i].relS.tuples = relS->tuples + numSthr * i;
648 numS -= numSthr;
649
650 args[i].threadresult = &(joinresult->resultlist[i]);
651
652 rv = pthread_create(&tid[i], &attr, npo_thread, (void*)&args[i]);
653 if (rv){
654 printf("ERROR; return code from pthread_create() is %d\n", rv);
655 exit(-1);
656 }
657
658 }
659
660 for(i = 0; i < nthreads; i++){
661 pthread_join(tid[i], NULL);
662 /* sum up results */
663 result += args[i].num_results;
664 }
665 joinresult->totalresults = result;
666 joinresult->nthreads = nthreads;
667
668
669#ifndef NO_TIMING
670 /* now print the timing results: */
671 print_timing(args[0].timer1, args[0].timer2, args[0].timer3,
672 relS->num_tuples, result,
673 &args[0].start, &args[0].end);
674#endif
675
677
678 return joinresult;
679}
680
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
Barrier implementation, defaults to Pthreads. On Mac custom implementation since barriers are not inc...
#define BARRIER_ARRIVE(B, RV)
Definition: barrier.h:29
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
Definition: generator.c:413
int64_t probe_hashtable(hashtable_t *ht, relation_t *rel, void *output)
void build_hashtable_st(hashtable_t *ht, relation_t *rel)
void destroy_hashtable(hashtable_t *ht)
void allocate_hashtable(hashtable_t **ppht, uint32_t nbuckets)
result_t * NPO(relation_t *relR, relation_t *relS, int nthreads)
void * npo_thread(void *param)
result_t * NPO_st(relation_t *relR, relation_t *relS, int nthreads)
void build_hashtable_mt(hashtable_t *ht, relation_t *rel, bucket_buffer_t **overflowbuf)
void init_bucket_buffer(bucket_buffer_t **ppbuf)
void free_bucket_buffer(bucket_buffer_t *buf)
void PCM_initPerformanceMonitor(const char *pcmcfg, const char *pcmout)
void PCM_start()
void PCM_stop()
void PCM_cleanup()
void PCM_printResults()
void PCM_log(char *msg)
int get_cpu_id(int thread_id)
Definition: cpu_mapping.c:78
#define DEBUGMSG(COND, MSG,...)
#define NEXT_POW_2(V)
int numalocalize
Definition: generator.c:47
The interface of No partitioning optimized (NPO) join algorithm.
Constant parameters used by No Partitioning Join implementations.
#define BUCKET_SIZE
Definition: npj_params.h:19
#define OVERFLOW_BUF_SIZE
Definition: npj_params.h:29
#define CACHE_LINE_SIZE
Definition: npj_params.h:24
Provides type definitions used by No Partitioning Join implementations.
An interface to the Intel Performance Counters Monitoring.
Definition: types.h:74
Definition: types.h:45
Implements a chained-buffer storage model for tuples.