Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
parallel_radix_join.c
Go to the documentation of this file.
1
13#ifndef _GNU_SOURCE
14#define _GNU_SOURCE
15#endif
16#include <sched.h> /* CPU_ZERO, CPU_SET */
17#include <pthread.h> /* pthread_* */
18#include <stdlib.h> /* malloc, posix_memalign */
19#include <sys/time.h> /* gettimeofday */
20#include <stdio.h> /* printf */
21#include <smmintrin.h> /* simd only for 32-bit keys – SSE4.1 */
22#include <immintrin.h> /* simd only for 32-bit keys – SSE4.1 */
23
24#include "parallel_radix_join.h"
25#include "prj_params.h" /* constant parameters */
26#include "task_queue.h" /* task_queue_* */
27#include "cpu_mapping.h" /* get_cpu_id */
28#include "rdtsc.h" /* startTimer, stopTimer */
29#ifdef PERF_COUNTERS
30#include "perf_counters.h" /* PCM_x */
31#endif
32
33#include "barrier.h" /* pthread_barrier_* */
34#include "affinity.h" /* pthread_attr_setaffinity_np */
35#include "generator.h" /* numa_localize() */
36
37#ifdef JOIN_RESULT_MATERIALIZE
38#include "tuple_buffer.h" /* for materialization */
39#endif
40
43#ifndef BARRIER_ARRIVE
45#define BARRIER_ARRIVE(B,RV) \
46 RV = pthread_barrier_wait(B); \
47 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
48 printf("Couldn't wait on barrier\n"); \
49 exit(EXIT_FAILURE); \
50 }
51#endif
52
54#ifndef MALLOC_CHECK
55#define MALLOC_CHECK(M) \
56 if(!M){ \
57 printf("[ERROR] MALLOC_CHECK: %s : %d\n", __FILE__, __LINE__); \
58 perror(": malloc() failed!\n"); \
59 exit(EXIT_FAILURE); \
60 }
61#endif
62
63/* #define RADIX_HASH(V) ((V>>7)^(V>>13)^(V>>21)^V) */
64#define HASH_BIT_MODULO(K, MASK, NBITS) (((K) & MASK) >> NBITS)
65
66#ifndef NEXT_POW_2
72#define NEXT_POW_2(V) \
73 do { \
74 V--; \
75 V |= V >> 1; \
76 V |= V >> 2; \
77 V |= V >> 4; \
78 V |= V >> 8; \
79 V |= V >> 16; \
80 V++; \
81 } while(0)
82#endif
83
84#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
85
86#ifdef SYNCSTATS
87#define SYNC_TIMERS_START(A, TID) \
88 do { \
89 uint64_t tnow; \
90 startTimer(&tnow); \
91 A->localtimer.sync1[0] = tnow; \
92 A->localtimer.sync1[1] = tnow; \
93 A->localtimer.sync3 = tnow; \
94 A->localtimer.sync4 = tnow; \
95 A->localtimer.finish_time = tnow; \
96 if(TID == 0) { \
97 A->globaltimer->sync1[0] = tnow; \
98 A->globaltimer->sync1[1] = tnow; \
99 A->globaltimer->sync3 = tnow; \
100 A->globaltimer->sync4 = tnow; \
101 A->globaltimer->finish_time = tnow; \
102 } \
103 } while(0)
104
105#define SYNC_TIMER_STOP(T) stopTimer(T)
106#define SYNC_GLOBAL_STOP(T, TID) if(TID==0){ stopTimer(T); }
107#else
108#define SYNC_TIMERS_START(A, TID)
109#define SYNC_TIMER_STOP(T)
110#define SYNC_GLOBAL_STOP(T, TID)
111#endif
112
114#ifdef DEBUG
115#define DEBUGMSG(COND, MSG, ...) \
116 if(COND) { fprintf(stdout, "[DEBUG] "MSG, ## __VA_ARGS__); }
117#else
118#define DEBUGMSG(COND, MSG, ...)
119#endif
120
121/* just to enable compilation with g++ */
122#if defined(__cplusplus)
123#define restrict __restrict__
124#endif
125
127extern int numalocalize; /* defined in generator.c */
128
129typedef struct arg_t arg_t;
130typedef struct part_t part_t;
131typedef struct synctimer_t synctimer_t;
132typedef int64_t (*JoinFunction)(const relation_t * const,
133 const relation_t * const,
134 relation_t * const,
135 void * output);
136
137#ifdef SYNCSTATS
139struct synctimer_t {
141 uint64_t sync1[3]; /* for rel R and for rel S */
143 uint64_t sync3;
145 uint64_t sync4;
147 uint64_t finish_time;
148};
149#endif
150
152struct arg_t {
153 int32_t ** histR;
154 tuple_t * relR;
155 tuple_t * tmpR;
156 int32_t ** histS;
157 tuple_t * relS;
158 tuple_t * tmpS;
159
160 int32_t numR;
161 int32_t numS;
162 int64_t totalR;
163 int64_t totalS;
164
165 task_queue_t ** join_queue;
166 task_queue_t ** part_queue;
167#ifdef SKEW_HANDLING
168 task_queue_t * skew_queue;
169 task_t ** skewtask;
170#endif
171 pthread_barrier_t * barrier;
172 JoinFunction join_function;
173 int64_t result;
174 int32_t my_tid;
175 int nthreads;
176
177 /* results of the thread */
178 threadresult_t * threadresult;
179
180 /* stats about the thread */
181 int32_t parts_processed;
182 uint64_t timer1, timer2, timer3;
183 struct timeval start, end;
184#ifdef SYNCSTATS
186 synctimer_t localtimer;
188 synctimer_t * globaltimer;
189#endif
190} __attribute__((aligned(CACHE_LINE_SIZE)));
191
193struct part_t {
194 tuple_t * rel;
195 tuple_t * tmp;
196 int32_t ** hist;
197 int64_t * output;
198 arg_t * thrargs;
199 uint64_t total_tuples;
200 uint32_t num_tuples;
201 int32_t R;
202 uint32_t D;
203 int relidx; /* 0: R, 1: S */
204 uint32_t padding;
205} __attribute__((aligned(CACHE_LINE_SIZE)));
206
207static void *
208alloc_aligned(size_t size)
209{
210 void * ret;
211 int rv;
212 rv = posix_memalign((void**)&ret, CACHE_LINE_SIZE, size);
213
214 if (rv) {
215 perror("alloc_aligned() failed: out of memory");
216 return 0;
217 }
218
219 return ret;
220}
221
242int64_t
244 const relation_t * const S,
245 relation_t * const tmpR,
246 void * output)
247{
248 int * next, * bucket;
249 const uint32_t numR = R->num_tuples;
250 uint32_t N = numR;
251 int64_t matches = 0;
252
253 NEXT_POW_2(N);
254 /* N <<= 1; */
255 const uint32_t MASK = (N-1) << (NUM_RADIX_BITS);
256
257 next = (int*) malloc(sizeof(int) * numR);
258 /* posix_memalign((void**)&next, CACHE_LINE_SIZE, numR * sizeof(int)); */
259 bucket = (int*) calloc(N, sizeof(int));
260
261 const tuple_t * const Rtuples = R->tuples;
262 for(uint32_t i=0; i < numR; ){
263 uint32_t idx = HASH_BIT_MODULO(R->tuples[i].key, MASK, NUM_RADIX_BITS);
264 next[i] = bucket[idx];
265 bucket[idx] = ++i; /* we start pos's from 1 instead of 0 */
266
267 /* Enable the following tO avoid the code elimination
268 when running probe only for the time break-down experiment */
269 /* matches += idx; */
270 }
271
272 const tuple_t * const Stuples = S->tuples;
273 const uint32_t numS = S->num_tuples;
274
275#ifdef JOIN_RESULT_MATERIALIZE
276 chainedtuplebuffer_t * chainedbuf = (chainedtuplebuffer_t *) output;
277#endif
278
279 /* Disable the following loop for no-probe for the break-down experiments */
280 /* PROBE- LOOP */
281 for(uint32_t i=0; i < numS; i++ ){
282
283 uint32_t idx = HASH_BIT_MODULO(Stuples[i].key, MASK, NUM_RADIX_BITS);
284
285 for(int hit = bucket[idx]; hit > 0; hit = next[hit-1]){
286
287 if(Stuples[i].key == Rtuples[hit-1].key){
288
289#ifdef JOIN_RESULT_MATERIALIZE
290 /* copy to the result buffer, we skip it */
291 tuple_t * joinres = cb_next_writepos(chainedbuf);
292 joinres->key = Rtuples[hit-1].payload; /* R-rid */
293 joinres->payload = Stuples[i].payload; /* S-rid */
294#endif
295
296 matches ++;
297 }
298 }
299 }
300 /* PROBE-LOOP END */
301
302 /* clean up temp */
303 free(bucket);
304 free(next);
305
306 return matches;
307}
308
310inline
311uint32_t
312get_hist_size(uint32_t relSize) __attribute__((always_inline));
313
314inline
315uint32_t
316get_hist_size(uint32_t relSize)
317{
318 NEXT_POW_2(relSize);
319 relSize >>= 2;
320 if(relSize < 4) relSize = 4;
321 return relSize;
322}
323
330int64_t
331histogram_join(const relation_t * const R,
332 const relation_t * const S,
333 relation_t * const tmpR,
334 void * output)
335{
336 int32_t * restrict hist;
337 const tuple_t * restrict const Rtuples = R->tuples;
338 const uint32_t numR = R->num_tuples;
339 uint32_t Nhist = get_hist_size(numR);
340 const uint32_t MASK = (Nhist-1) << NUM_RADIX_BITS;
341
342 hist = (int32_t*) calloc(Nhist+2, sizeof(int32_t));
343
344 for( uint32_t i = 0; i < numR; i++ ) {
345
346 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, NUM_RADIX_BITS);
347
348 hist[idx+2] ++;
349 }
350
351 /* prefix sum on histogram */
352 for( uint32_t i = 2, sum = 0; i <= Nhist+1; i++ ) {
353 sum += hist[i];
354 hist[i] = sum;
355 }
356
357 tuple_t * const tmpRtuples = tmpR->tuples;
358 /* reorder tuples according to the prefix sum */
359 for( uint32_t i = 0; i < numR; i++ ) {
360
361 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK, NUM_RADIX_BITS) + 1;
362
363 tmpRtuples[hist[idx]] = Rtuples[i];
364
365 hist[idx] ++;
366 }
367
368 int64_t match = 0;
369 const uint32_t numS = S->num_tuples;
370 const tuple_t * const Stuples = S->tuples;
371 /* now comes the probe phase, TODO: implement prefetching */
372 for( uint32_t i = 0; i < numS; i++ ) {
373
374 uint32_t idx = HASH_BIT_MODULO(Stuples[i].key, MASK, NUM_RADIX_BITS);
375
376 int j = hist[idx], end = hist[idx+1];
377
378 /* Scalar comparisons */
379 for(; j < end; j++) {
380
381 if(Stuples[i].key == tmpRtuples[j].key) {
382
383 ++ match;
384 /* TODO: we do not output results */
385 }
386
387 }
388 }
389
390 /* clean up */
391 free(hist);
392
393 return match;
394}
395
397inline
398void
399prefetch(void * addr) __attribute__((always_inline));
400
401inline
402void
403prefetch(void * addr)
404{
405 /* #ifdef __x86_64__ */
406 __asm__ __volatile__ ("prefetcht0 %0" :: "m" (*(uint32_t*)addr));
407 /* _mm_prefetch(addr, _MM_HINT_T0); */
408 /* #endif */
409}
410
418int64_t
420 const relation_t * const S,
421 relation_t * const tmpR,
422 void * output)
423{
424#ifdef KEY_8B
425#warning SIMD comparison for 64-bit keys are not implemented!
426 return 0;
427#else
428 int32_t * restrict hist;
429 const tuple_t * restrict const Rtuples = R->tuples;
430 const uint32_t numR = R->num_tuples;
431 uint32_t Nhist = get_hist_size(numR);
432 const uint32_t mask = (Nhist-1) << NUM_RADIX_BITS;
433
434 hist = (int32_t*) calloc(Nhist+2, sizeof(int32_t));
435
436 /* compute histogram */
437 for( uint32_t i = 0; i < numR; i++ ) {
438 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, mask, NUM_RADIX_BITS);
439 hist[idx+2] ++;
440 }
441
442 /* prefix sum on histogram */
443 for( uint32_t i = 2, sum = 0; i <= Nhist+1; i++ ) {
444 sum += hist[i];
445 hist[i] = sum;
446 }
447
448 tuple_t * restrict const tmpRtuples = tmpR->tuples;
449 /* reorder tuples according to the prefix sum */
450 for( uint32_t i = 0; i < numR; i++ ) {
451 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, mask, NUM_RADIX_BITS) + 1;
452 tmpRtuples[hist[idx]] = Rtuples[i];
453 hist[idx] ++;
454 }
455
456#ifdef SMALL_PADDING_TUPLES
457 /* if there is a padding between sub-relations,clear last 3 keys for SIMD */
458 for( uint32_t i = numR+3; i >= numR; i-- ) {
459 tmpRtuples[numR].key = 0;
460 }
461#endif
462
463 intkey_t key_buffer[PROBE_BUFFER_SIZE];
464 uint32_t hash_buffer[PROBE_BUFFER_SIZE];
465
466 int64_t match = 0;
467 const uint32_t numS = S->num_tuples;
468 const tuple_t * restrict const Stuples = S->tuples;
469 __m128i counter = _mm_setzero_si128();
470
471 for( uint32_t i = 0; i < numS/PROBE_BUFFER_SIZE; i ++ ) {
472
473 for( int k = 0; k < PROBE_BUFFER_SIZE; k++ ) {
474 const intkey_t skey = Stuples[i * PROBE_BUFFER_SIZE + k].key;
475 const uint32_t idx = HASH_BIT_MODULO(skey, mask, NUM_RADIX_BITS);
476 /* now we issue a prefetch for element at S[idx] */
477 prefetch(tmpR->tuples + hist[idx]);
478 key_buffer[k] = skey;
479 hash_buffer[k] = idx;
480 }
481
482 for( int k = 0; k < PROBE_BUFFER_SIZE; k++ ) {
483
484 /* SIMD comparisons in groups of 2 (8B x 2 = 128 bits) */
485 int j = hist[hash_buffer[k]];
486 int end = hist[hash_buffer[k]+1];
487 __m128i search_key = _mm_set1_epi32(key_buffer[k]);
488
489 for( ; j < end; j += 2) {
490
491 __m128i keyvals = _mm_loadu_si128((__m128i const *)(tmpRtuples + j));
492 keyvals = _mm_cmpeq_epi32(keyvals, search_key);
493 counter = _mm_add_epi32(keyvals, counter);
494 /* TODO: we're just counting, not materializing results */
495 }
496 }
497 }
498
499 for(uint32_t i = numS - (numS % PROBE_BUFFER_SIZE); i < numS; i++ ){
500 const intkey_t skey = Stuples[i].key;
501 const uint32_t idx = HASH_BIT_MODULO(skey, mask, NUM_RADIX_BITS);
502
503 /* SIMD comparisons in groups of 2 (8B x 2 = 128 bits) */
504 int j = hist[idx];
505 int end = hist[idx+1];
506 __m128i search_key = _mm_set1_epi32(skey);
507
508 for( ; j < end; j += 2) {
509
510 __m128i keyvals = _mm_loadu_si128((__m128i const *)(tmpRtuples + j));
511 keyvals = _mm_cmpeq_epi32(keyvals, search_key);
512 counter = _mm_add_epi32(keyvals, counter);
513 /* TODO: we're just counting, not outputting anything */
514 }
515
516 }
517
518 match += -( _mm_extract_epi32(counter, 0) +
519 _mm_extract_epi32(counter, 2) );
520
521 /* clean up */
522 free(hist);
523
524 return match;
525#endif
526}
527
544void
545radix_cluster(relation_t * restrict outRel,
546 relation_t * restrict inRel,
547 int32_t * restrict hist,
548 int R,
549 int D)
550{
551 uint32_t i;
552 uint32_t M = ((1 << D) - 1) << R;
553 uint32_t offset;
554 uint32_t fanOut = 1 << D;
555
556 /* the following are fixed size when D is same for all the passes,
557 and can be re-used from call to call. Allocating in this function
558 just in case D differs from call to call. */
559 uint32_t dst[fanOut];
560
561 /* count tuples per cluster */
562 for( i=0; i < inRel->num_tuples; i++ ){
563 uint32_t idx = HASH_BIT_MODULO(inRel->tuples[i].key, M, R);
564 hist[idx]++;
565 }
566 offset = 0;
567 /* determine the start and end of each cluster depending on the counts. */
568 for ( i=0; i < fanOut; i++ ) {
569 /* dst[i] = outRel->tuples + offset; */
570 /* determine the beginning of each partitioning by adding some
571 padding to avoid L1 conflict misses during scatter. */
572 dst[i] = offset + i * SMALL_PADDING_TUPLES;
573 offset += hist[i];
574 }
575
576 /* copy tuples to their corresponding clusters at appropriate offsets */
577 for( i=0; i < inRel->num_tuples; i++ ){
578 uint32_t idx = HASH_BIT_MODULO(inRel->tuples[i].key, M, R);
579 outRel->tuples[ dst[idx] ] = inRel->tuples[i];
580 ++dst[idx];
581 }
582}
583
594void
595radix_cluster_nopadding(relation_t * outRel, relation_t * inRel, int R, int D)
596{
597 tuple_t ** dst;
598 tuple_t * input;
599 /* tuple_t ** dst_end; */
600 uint32_t * tuples_per_cluster;
601 uint32_t i;
602 uint32_t offset;
603 const uint32_t M = ((1 << D) - 1) << R;
604 const uint32_t fanOut = 1 << D;
605 const uint32_t ntuples = inRel->num_tuples;
606
607 tuples_per_cluster = (uint32_t*)calloc(fanOut, sizeof(uint32_t));
608 /* the following are fixed size when D is same for all the passes,
609 and can be re-used from call to call. Allocating in this function
610 just in case D differs from call to call. */
611 dst = (tuple_t**)malloc(sizeof(tuple_t*)*fanOut);
612 /* dst_end = (tuple_t**)malloc(sizeof(tuple_t*)*fanOut); */
613
614 input = inRel->tuples;
615 /* count tuples per cluster */
616 for( i=0; i < ntuples; i++ ){
617 uint32_t idx = (uint32_t)(HASH_BIT_MODULO(input->key, M, R));
618 tuples_per_cluster[idx]++;
619 input++;
620 }
621
622 offset = 0;
623 /* determine the start and end of each cluster depending on the counts. */
624 for ( i=0; i < fanOut; i++ ) {
625 dst[i] = outRel->tuples + offset;
626 offset += tuples_per_cluster[i];
627 /* dst_end[i] = outRel->tuples + offset; */
628 }
629
630 input = inRel->tuples;
631 /* copy tuples to their corresponding clusters at appropriate offsets */
632 for( i=0; i < ntuples; i++ ){
633 uint32_t idx = (uint32_t)(HASH_BIT_MODULO(input->key, M, R));
634 *dst[idx] = *input;
635 ++dst[idx];
636 input++;
637 /* we pre-compute the start and end of each cluster, so the following
638 check is unnecessary */
639 /* if(++dst[idx] >= dst_end[idx]) */
640 /* REALLOCATE(dst[idx], dst_end[idx]); */
641 }
642
643 /* clean up temp */
644 /* free(dst_end); */
645 free(dst);
646 free(tuples_per_cluster);
647}
648
649
659 task_queue_t * join_queue,
660 const int R, const int D)
661{
662 int i;
663 uint32_t offsetR = 0, offsetS = 0;
664 const int fanOut = 1 << D; /*(NUM_RADIX_BITS / NUM_PASSES);*/
665 int32_t * outputR, * outputS;
666
667 outputR = (int32_t*)calloc(fanOut+1, sizeof(int32_t));
668 outputS = (int32_t*)calloc(fanOut+1, sizeof(int32_t));
669 /* TODO: measure the effect of memset() */
670 /* memset(outputR, 0, fanOut * sizeof(int32_t)); */
671 radix_cluster(&task->tmpR, &task->relR, outputR, R, D);
672
673 /* memset(outputS, 0, fanOut * sizeof(int32_t)); */
674 radix_cluster(&task->tmpS, &task->relS, outputS, R, D);
675
676 /* task_t t; */
677 for(i = 0; i < fanOut; i++) {
678 if(outputR[i] > 0 && outputS[i] > 0) {
679 task_t * t = task_queue_get_slot_atomic(join_queue);
680 t->relR.num_tuples = outputR[i];
681 t->relR.tuples = task->tmpR.tuples + offsetR
683 t->tmpR.tuples = task->relR.tuples + offsetR
685 offsetR += outputR[i];
686
687 t->relS.num_tuples = outputS[i];
688 t->relS.tuples = task->tmpS.tuples + offsetS
690 t->tmpS.tuples = task->relS.tuples + offsetS
692 offsetS += outputS[i];
693
694 /* task_queue_copy_atomic(join_queue, &t); */
695 task_queue_add_atomic(join_queue, t);
696 }
697 else {
698 offsetR += outputR[i];
699 offsetS += outputS[i];
700 }
701 }
702 free(outputR);
703 free(outputS);
704}
705
714void
716{
717 const tuple_t * restrict rel = part->rel;
718 int32_t ** hist = part->hist;
719 int64_t * restrict output = part->output;
720
721 const uint32_t my_tid = part->thrargs->my_tid;
722 const uint32_t nthreads = part->thrargs->nthreads;
723 const uint32_t num_tuples = part->num_tuples;
724
725 const int32_t R = part->R;
726 const int32_t D = part->D;
727 const uint32_t fanOut = 1 << D;
728 const uint32_t MASK = (fanOut - 1) << R;
729 const uint32_t padding = part->padding;
730
731 int64_t sum = 0;
732 uint32_t i, j;
733 int rv;
734
735 int64_t dst[fanOut+1];
736
737 /* compute local histogram for the assigned region of rel */
738 /* compute histogram */
739 int32_t * my_hist = hist[my_tid];
740
741 for(i = 0; i < num_tuples; i++) {
742 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
743 my_hist[idx] ++;
744 }
745
746 /* compute local prefix sum on hist */
747 for(i = 0; i < fanOut; i++){
748 sum += my_hist[i];
749 my_hist[i] = sum;
750 }
751
752 SYNC_TIMER_STOP(&part->thrargs->localtimer.sync1[part->relidx]);
753 /* wait at a barrier until each thread complete histograms */
754 BARRIER_ARRIVE(part->thrargs->barrier, rv);
755 /* barrier global sync point-1 */
756 SYNC_GLOBAL_STOP(&part->thrargs->globaltimer->sync1[part->relidx], my_tid);
757
758 /* determine the start and end of each cluster */
759 for(i = 0; i < my_tid; i++) {
760 for(j = 0; j < fanOut; j++)
761 output[j] += hist[i][j];
762 }
763 for(i = my_tid; i < nthreads; i++) {
764 for(j = 1; j < fanOut; j++)
765 output[j] += hist[i][j-1];
766 }
767
768 for(i = 0; i < fanOut; i++ ) {
769 output[i] += i * padding; //PADDING_TUPLES;
770 dst[i] = output[i];
771 }
772 output[fanOut] = part->total_tuples + fanOut * padding; //PADDING_TUPLES;
773
774 tuple_t * restrict tmp = part->tmp;
775
776 /* Copy tuples to their corresponding clusters */
777 for(i = 0; i < num_tuples; i++ ){
778 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
779 tmp[dst[idx]] = rel[i];
780 ++dst[idx];
781 }
782}
783
788typedef union {
789 struct {
790 tuple_t tuples[CACHE_LINE_SIZE/sizeof(tuple_t)];
791 } tuples;
792 struct {
793 tuple_t tuples[CACHE_LINE_SIZE/sizeof(tuple_t) - 1];
794 int64_t slot;
795 } data;
797
798#define TUPLESPERCACHELINE (CACHE_LINE_SIZE/sizeof(tuple_t))
799
810static inline void
811store_nontemp_64B(void * dst, void * src)
812{
813#ifdef __AVX__
814 register __m256i * d1 = (__m256i*) dst;
815 register __m256i s1 = *((__m256i*) src);
816 register __m256i * d2 = d1+1;
817 register __m256i s2 = *(((__m256i*) src)+1);
818
819 _mm256_stream_si256(d1, s1);
820 _mm256_stream_si256(d2, s2);
821
822#elif defined(__SSE2__)
823
824 register __m128i * d1 = (__m128i*) dst;
825 register __m128i * d2 = d1+1;
826 register __m128i * d3 = d1+2;
827 register __m128i * d4 = d1+3;
828 register __m128i s1 = *(__m128i*) src;
829 register __m128i s2 = *((__m128i*)src + 1);
830 register __m128i s3 = *((__m128i*)src + 2);
831 register __m128i s4 = *((__m128i*)src + 3);
832
833 _mm_stream_si128 (d1, s1);
834 _mm_stream_si128 (d2, s2);
835 _mm_stream_si128 (d3, s3);
836 _mm_stream_si128 (d4, s4);
837
838#else
839 /* just copy with assignment */
840 *(cacheline_t *)dst = *(cacheline_t *)src;
841
842#endif
843
844}
845
856void
858{
859 const tuple_t * restrict rel = part->rel;
860 int32_t ** hist = part->hist;
861 int64_t * restrict output = part->output;
862
863 const uint32_t my_tid = part->thrargs->my_tid;
864 const uint32_t nthreads = part->thrargs->nthreads;
865 const uint32_t num_tuples = part->num_tuples;
866
867 const int32_t R = part->R;
868 const int32_t D = part->D;
869 const uint32_t fanOut = 1 << D;
870 const uint32_t MASK = (fanOut - 1) << R;
871 const uint32_t padding = part->padding;
872
873 int64_t sum = 0;
874 uint32_t i, j;
875 int rv;
876
877 /* compute local histogram for the assigned region of rel */
878 /* compute histogram */
879 int32_t * my_hist = hist[my_tid];
880
881 for(i = 0; i < num_tuples; i++) {
882 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
883 my_hist[idx] ++;
884 }
885
886 /* compute local prefix sum on hist */
887 for(i = 0; i < fanOut; i++){
888 sum += my_hist[i];
889 my_hist[i] = sum;
890 }
891
892 SYNC_TIMER_STOP(&part->thrargs->localtimer.sync1[part->relidx]);
893 /* wait at a barrier until each thread complete histograms */
894 BARRIER_ARRIVE(part->thrargs->barrier, rv);
895 /* barrier global sync point-1 */
896 SYNC_GLOBAL_STOP(&part->thrargs->globaltimer->sync1[part->relidx], my_tid);
897
898 /* determine the start and end of each cluster */
899 for(i = 0; i < my_tid; i++) {
900 for(j = 0; j < fanOut; j++)
901 output[j] += hist[i][j];
902 }
903 for(i = my_tid; i < nthreads; i++) {
904 for(j = 1; j < fanOut; j++)
905 output[j] += hist[i][j-1];
906 }
907
908 /* uint32_t pre; /\* nr of tuples to cache-alignment *\/ */
909 tuple_t * restrict tmp = part->tmp;
910 /* software write-combining buffer */
911 cacheline_t buffer[fanOut] __attribute__((aligned(CACHE_LINE_SIZE)));
912
913 for(i = 0; i < fanOut; i++ ) {
914 uint64_t off = output[i] + i * padding;
915 /* pre = (off + TUPLESPERCACHELINE) & ~(TUPLESPERCACHELINE-1); */
916 /* pre -= off; */
917 output[i] = off;
918 buffer[i].data.slot = off;
919 }
920 output[fanOut] = part->total_tuples + fanOut * padding;
921
922 /* Copy tuples to their corresponding clusters */
923 for(i = 0; i < num_tuples; i++ ){
924 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
925 uint64_t slot = buffer[idx].data.slot;
926 tuple_t * tup = (tuple_t *)(buffer + idx);
927 uint32_t slotMod = (slot) & (TUPLESPERCACHELINE - 1);
928 tup[slotMod] = rel[i];
929
930 if(slotMod == (TUPLESPERCACHELINE-1)){
931 /* write out 64-Bytes with non-temporal store */
932 store_nontemp_64B((tmp+slot-(TUPLESPERCACHELINE-1)), (buffer+idx));
933 /* writes += TUPLESPERCACHELINE; */
934 }
935
936 buffer[idx].data.slot = slot+1;
937 }
938 /* _mm_sfence (); */
939
940 /* write out the remainders in the buffer */
941 for(i = 0; i < fanOut; i++ ) {
942 uint64_t slot = buffer[i].data.slot;
943 uint32_t sz = (slot) & (TUPLESPERCACHELINE - 1);
944 slot -= sz;
945 for(uint32_t j = 0; j < sz; j++) {
946 tmp[slot] = buffer[i].data.tuples[j];
947 slot ++;
948 }
949 }
950}
951
963void *
964prj_thread(void * param)
965{
966 arg_t * args = (arg_t*) param;
967 int32_t my_tid = args->my_tid;
968
969 const int fanOut = 1 << (NUM_RADIX_BITS / NUM_PASSES);//PASS1RADIXBITS;
970 const int R = (NUM_RADIX_BITS / NUM_PASSES);//PASS1RADIXBITS;
971 const int D = (NUM_RADIX_BITS - (NUM_RADIX_BITS / NUM_PASSES));//PASS2RADIXBITS;
972
973 uint64_t results = 0;
974 int i;
975 int rv;
976
977 part_t part;
978 task_t * task;
979 task_queue_t * part_queue;
980 task_queue_t * join_queue;
981#ifdef SKEW_HANDLING
982 task_queue_t * skew_queue;
983#endif
984
985 int64_t * outputR = (int64_t *) calloc((fanOut+1), sizeof(int64_t));
986 int64_t * outputS = (int64_t *) calloc((fanOut+1), sizeof(int64_t));
987 MALLOC_CHECK((outputR && outputS));
988
989 int numaid = get_numa_id(my_tid);
990 part_queue = args->part_queue[numaid];
991 join_queue = args->join_queue[numaid];
992
993#ifdef SKEW_HANDLING
994 skew_queue = args->skew_queue;
995#endif
996
997 args->histR[my_tid] = (int32_t *) calloc(fanOut, sizeof(int32_t));
998 args->histS[my_tid] = (int32_t *) calloc(fanOut, sizeof(int32_t));
999
1000 /* in the first pass, partitioning is done together by all threads */
1001
1002 args->parts_processed = 0;
1003
1004#ifdef PERF_COUNTERS
1005 if(my_tid == 0){
1006 PCM_initPerformanceMonitor(NULL, NULL);
1007 PCM_start();
1008 }
1009#endif
1010
1011 /* wait at a barrier until each thread starts and then start the timer */
1012 BARRIER_ARRIVE(args->barrier, rv);
1013
1014 /* if monitoring synchronization stats */
1015 SYNC_TIMERS_START(args, my_tid);
1016
1017#ifndef NO_TIMING
1018 if(my_tid == 0){
1019 /* thread-0 checkpoints the time */
1020 gettimeofday(&args->start, NULL);
1021 startTimer(&args->timer1);
1022 startTimer(&args->timer2);
1023 startTimer(&args->timer3);
1024 }
1025#endif
1026
1027 /********** 1st pass of multi-pass partitioning ************/
1028 part.R = 0;
1029 part.D = NUM_RADIX_BITS / NUM_PASSES; //PASS1RADIXBITS
1030 part.thrargs = args;
1031 part.padding = PADDING_TUPLES;
1032
1033 /* 1. partitioning for relation R */
1034 part.rel = args->relR;
1035 part.tmp = args->tmpR;
1036 part.hist = args->histR;
1037 part.output = outputR;
1038 part.num_tuples = args->numR;
1039 part.total_tuples = args->totalR;
1040 part.relidx = 0;
1041
1042#ifdef USE_SWWC_OPTIMIZED_PART
1044#else
1046#endif
1047
1048 /* 2. partitioning for relation S */
1049 part.rel = args->relS;
1050 part.tmp = args->tmpS;
1051 part.hist = args->histS;
1052 part.output = outputS;
1053 part.num_tuples = args->numS;
1054 part.total_tuples = args->totalS;
1055 part.relidx = 1;
1056
1057#ifdef USE_SWWC_OPTIMIZED_PART
1059#else
1061#endif
1062
1063
1064 /* wait at a barrier until each thread copies out */
1065 BARRIER_ARRIVE(args->barrier, rv);
1066
1067 /********** end of 1st partitioning phase ******************/
1068
1069#ifdef SKEW_HANDLING
1070 /* experimental skew threshold */
1071 /* const int thresh1 = MAX((1<<D), (1<<R)) * THRESHOLD1(args->nthreads); */
1072 /* const int thresh1 = MAX(args->totalR, arg->totalS)/MAX((1<<D),(1<<R)); */
1073 const int thresh1 = 64*THRESHOLD1(args->nthreads);
1074#endif
1075
1076 /* 3. first thread creates partitioning tasks for 2nd pass */
1077 if(my_tid == 0) {
1078 /* For Debugging: */
1079 /* int numnuma = get_num_numa_regions(); */
1080 /* int correct_numa_mapping = 0, wrong_numa_mapping = 0; */
1081 /* int counts[4] = {0, 0, 0, 0}; */
1082 for(i = 0; i < fanOut; i++) {
1083 int32_t ntupR = outputR[i+1] - outputR[i] - PADDING_TUPLES;
1084 int32_t ntupS = outputS[i+1] - outputS[i] - PADDING_TUPLES;
1085
1086#ifdef SKEW_HANDLING
1087
1088 if(ntupR > thresh1 || ntupS > thresh1){
1089 DEBUGMSG(1, "Adding to skew_queue= R:%d, S:%d\n", ntupR, ntupS);
1090
1091 task_t * t = task_queue_get_slot(skew_queue);
1092
1093 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1094 t->relR.tuples = args->tmpR + outputR[i];
1095 t->tmpR.tuples = args->relR + outputR[i];
1096
1097 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1098 t->relS.tuples = args->tmpS + outputS[i];
1099 t->tmpS.tuples = args->relS + outputS[i];
1100
1101 task_queue_add(skew_queue, t);
1102 }
1103 else
1104#endif
1105 if(ntupR > 0 && ntupS > 0) {
1106 /* Determine the NUMA node of each partition: */
1107 void * ptr = (void*)&((args->tmpR + outputR[i])[0]);
1108 int pq_idx = get_numa_node_of_address(ptr);
1109
1110 /* For Debugging: */
1111 /* void * ptr2 = (void*)&((args->tmpS + outputS[i])[0]); */
1112 /* int pq_idx2 = get_numa_node_of_address(ptr2); */
1113 /* if(pq_idx != pq_idx2) */
1114 /* wrong_numa_mapping ++; */
1115 /* else */
1116 /* correct_numa_mapping ++; */
1117 /* int numanode_of_mem = get_numa_node_of_address(ptr); */
1118 /* int pq_idx = i / (fanOut / numnuma); */
1119 /* if(numanode_of_mem == pq_idx) */
1120 /* correct_numa_mapping ++; */
1121 /* else */
1122 /* wrong_numa_mapping ++; */
1123 /* pq_idx = numanode_of_mem; */
1124 /* counts[numanode_of_mem] ++; */
1125 /* counts[pq_idx] ++; */
1126
1127 task_queue_t * numalocal_part_queue = args->part_queue[pq_idx];
1128
1129 task_t * t = task_queue_get_slot(numalocal_part_queue);
1130
1131 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1132 t->relR.tuples = args->tmpR + outputR[i];
1133 t->tmpR.tuples = args->relR + outputR[i];
1134
1135 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1136 t->relS.tuples = args->tmpS + outputS[i];
1137 t->tmpS.tuples = args->relS + outputS[i];
1138
1139
1140 task_queue_add(numalocal_part_queue, t);
1141
1142 }
1143 }
1144
1145 /* debug partitioning task queue */
1146 DEBUGMSG(1, "Pass-2: # partitioning tasks = %d\n", part_queue->count);
1147
1148 /* DEBUG NUMA MAPPINGS */
1149 /* printf("Correct NUMA-mappings = %d, Wrong = %d\n", */
1150 /* correct_numa_mapping, wrong_numa_mapping); */
1151 /* printf("Counts -- 0=%d, 1=%d, 2=%d, 3=%d\n", */
1152 /* counts[0], counts[1], counts[2], counts[3]); */
1153 }
1154
1155 SYNC_TIMER_STOP(&args->localtimer.sync3);
1156 /* wait at a barrier until first thread adds all partitioning tasks */
1157 BARRIER_ARRIVE(args->barrier, rv);
1158 /* global barrier sync point-3 */
1159 SYNC_GLOBAL_STOP(&args->globaltimer->sync3, my_tid);
1160
1161 /************ 2nd pass of multi-pass partitioning ********************/
1162 /* 4. now each thread further partitions and add to join task queue **/
1163
1164#if NUM_PASSES==1
1165 /* If the partitioning is single pass we directly add tasks from pass-1 */
1166 task_queue_t * swap = join_queue;
1167 join_queue = part_queue;
1168 /* part_queue is used as a temporary queue for handling skewed parts */
1169 part_queue = swap;
1170
1171#elif NUM_PASSES==2
1172
1173 while((task = task_queue_get_atomic(part_queue))){
1174
1175 serial_radix_partition(task, join_queue, R, D);
1176
1177 }
1178
1179#else
1180#warning Only 2-pass partitioning is implemented, set NUM_PASSES to 2!
1181#endif
1182
1183#ifdef SKEW_HANDLING
1184 /* Partitioning pass-2 for skewed relations */
1185 part.R = R;
1186 part.D = D;
1187 part.thrargs = args;
1188 part.padding = SMALL_PADDING_TUPLES;
1189
1190 while(1) {
1191 if(my_tid == 0) {
1192 *args->skewtask = task_queue_get_atomic(skew_queue);
1193 }
1194 BARRIER_ARRIVE(args->barrier, rv);
1195 if( *args->skewtask == NULL)
1196 break;
1197
1198 DEBUGMSG((my_tid==0), "Got skew task = R: %d, S: %d\n",
1199 (*args->skewtask)->relR.num_tuples,
1200 (*args->skewtask)->relS.num_tuples);
1201
1202 int32_t numperthr = (*args->skewtask)->relR.num_tuples / args->nthreads;
1203 const int fanOut2 = (1 << D);
1204
1205 free(outputR);
1206 free(outputS);
1207
1208 outputR = (int64_t*) calloc(fanOut2 + 1, sizeof(int64_t));
1209 outputS = (int64_t*) calloc(fanOut2 + 1, sizeof(int64_t));
1210
1211 free(args->histR[my_tid]);
1212 free(args->histS[my_tid]);
1213
1214 args->histR[my_tid] = (int32_t*) calloc(fanOut2, sizeof(int32_t));
1215 args->histS[my_tid] = (int32_t*) calloc(fanOut2, sizeof(int32_t));
1216
1217 /* wait until each thread allocates memory */
1218 BARRIER_ARRIVE(args->barrier, rv);
1219
1220 /* 1. partitioning for relation R */
1221 part.rel = (*args->skewtask)->relR.tuples + my_tid * numperthr;
1222 part.tmp = (*args->skewtask)->tmpR.tuples;
1223 part.hist = args->histR;
1224 part.output = outputR;
1225 part.num_tuples = (my_tid == (args->nthreads-1)) ?
1226 ((*args->skewtask)->relR.num_tuples - my_tid * numperthr)
1227 : numperthr;
1228 part.total_tuples = (*args->skewtask)->relR.num_tuples;
1229 part.relidx = 2; /* meaning this is pass-2, no syncstats */
1231
1232 numperthr = (*args->skewtask)->relS.num_tuples / args->nthreads;
1233 /* 2. partitioning for relation S */
1234 part.rel = (*args->skewtask)->relS.tuples + my_tid * numperthr;
1235 part.tmp = (*args->skewtask)->tmpS.tuples;
1236 part.hist = args->histS;
1237 part.output = outputS;
1238 part.num_tuples = (my_tid == (args->nthreads-1)) ?
1239 ((*args->skewtask)->relS.num_tuples - my_tid * numperthr)
1240 : numperthr;
1241 part.total_tuples = (*args->skewtask)->relS.num_tuples;
1242 part.relidx = 2; /* meaning this is pass-2, no syncstats */
1244
1245 /* wait at a barrier until each thread copies out */
1246 BARRIER_ARRIVE(args->barrier, rv);
1247
1248 /* first thread adds join tasks */
1249 if(my_tid == 0) {
1250 const int THR1 = THRESHOLD1(args->nthreads);
1251
1252 for(i = 0; i < fanOut2; i++) {
1253 int32_t ntupR = outputR[i+1] - outputR[i] - SMALL_PADDING_TUPLES;
1254 int32_t ntupS = outputS[i+1] - outputS[i] - SMALL_PADDING_TUPLES;
1255 if(ntupR > THR1 || ntupS > THR1){
1256
1257 DEBUGMSG(1, "Large join task = R: %d, S: %d\n", ntupR, ntupS);
1258
1259 /* use part_queue temporarily */
1260 for(int k=0; k < args->nthreads; k++) {
1261 int ns = (k == args->nthreads-1)
1262 ? (ntupS - k*(ntupS/args->nthreads))
1263 : (ntupS/args->nthreads);
1264 task_t * t = task_queue_get_slot(part_queue);
1265
1266 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1267 t->relR.tuples = (*args->skewtask)->tmpR.tuples + outputR[i];
1268 t->tmpR.tuples = (*args->skewtask)->relR.tuples + outputR[i];
1269
1270 t->relS.num_tuples = t->tmpS.num_tuples = ns; //ntupS;
1271 t->relS.tuples = (*args->skewtask)->tmpS.tuples + outputS[i] //;
1272 + k*(ntupS/args->nthreads);
1273 t->tmpS.tuples = (*args->skewtask)->relS.tuples + outputS[i] //;
1274 + k*(ntupS/args->nthreads);
1275
1276 task_queue_add(part_queue, t);
1277 }
1278 }
1279 else
1280 if(ntupR > 0 && ntupS > 0) {
1281 task_t * t = task_queue_get_slot(join_queue);
1282
1283 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1284 t->relR.tuples = (*args->skewtask)->tmpR.tuples + outputR[i];
1285 t->tmpR.tuples = (*args->skewtask)->relR.tuples + outputR[i];
1286
1287 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1288 t->relS.tuples = (*args->skewtask)->tmpS.tuples + outputS[i];
1289 t->tmpS.tuples = (*args->skewtask)->relS.tuples + outputS[i];
1290
1291 task_queue_add(join_queue, t);
1292
1293 DEBUGMSG(1, "Join added = R: %d, S: %d\n",
1294 t->relR.num_tuples, t->relS.num_tuples);
1295 }
1296 }
1297
1298 }
1299 }
1300
1301 /* add large join tasks in part_queue to the front of the join queue */
1302 if(my_tid == 0) {
1303 while((task = task_queue_get_atomic(part_queue)))
1304 task_queue_add(join_queue, task);
1305 }
1306
1307#endif
1308
1309 free(outputR);
1310 free(outputS);
1311
1312 SYNC_TIMER_STOP(&args->localtimer.sync4);
1313 /* wait at a barrier until all threads add all join tasks */
1314 BARRIER_ARRIVE(args->barrier, rv);
1315 /* global barrier sync point-4 */
1316 SYNC_GLOBAL_STOP(&args->globaltimer->sync4, my_tid);
1317
1318#ifndef NO_TIMING
1319 if(my_tid == 0) stopTimer(&args->timer3);/* partitioning finished */
1320#endif
1321
1322 DEBUGMSG((my_tid == 0), "Number of join tasks = %d\n", join_queue->count);
1323
1324#ifdef PERF_COUNTERS
1325 if(my_tid == 0){
1326 PCM_stop();
1327 PCM_log("======= Partitioning phase profiling results ======\n");
1329 PCM_start();
1330 }
1331 /* Just to make sure we get consistent performance numbers */
1332 BARRIER_ARRIVE(args->barrier, rv);
1333#endif
1334
1335#ifdef JOIN_RESULT_MATERIALIZE
1336 chainedtuplebuffer_t * chainedbuf = chainedtuplebuffer_init();
1337#else
1338 void * chainedbuf = NULL;
1339#endif
1340
1341 while((task = task_queue_get_atomic(join_queue))){
1342 /* do the actual join. join method differs for different algorithms,
1343 i.e. bucket chaining, histogram-based, histogram-based with simd &
1344 prefetching */
1345 results += args->join_function(&task->relR, &task->relS, &task->tmpR, chainedbuf);
1346
1347 args->parts_processed ++;
1348 }
1349
1350 args->result = results;
1351
1352#ifdef JOIN_RESULT_MATERIALIZE
1353 args->threadresult->nresults = results;
1354 args->threadresult->threadid = my_tid;
1355 args->threadresult->results = (void *) chainedbuf;
1356#endif
1357
1358 /* this thread is finished */
1359 SYNC_TIMER_STOP(&args->localtimer.finish_time);
1360
1361#ifndef NO_TIMING
1362 /* this is for just reliable timing of finish time */
1363 BARRIER_ARRIVE(args->barrier, rv);
1364 if(my_tid == 0) {
1365 /* Actually with this setup we're not timing build */
1366 stopTimer(&args->timer2);/* build finished */
1367 stopTimer(&args->timer1);/* probe finished */
1368 gettimeofday(&args->end, NULL);
1369 }
1370#endif
1371
1372 /* global finish time */
1373 SYNC_GLOBAL_STOP(&args->globaltimer->finish_time, my_tid);
1374
1375#ifdef PERF_COUNTERS
1376 if(my_tid == 0) {
1377 PCM_stop();
1378 PCM_log("=========== Build+Probe profiling results =========\n");
1380 PCM_log("===================================================\n");
1381 PCM_cleanup();
1382 }
1383 /* Just to make sure we get consistent performance numbers */
1384 BARRIER_ARRIVE(args->barrier, rv);
1385#endif
1386
1387 return 0;
1388}
1389
1391static void
1392print_timing(uint64_t total, uint64_t build, uint64_t part,
1393 uint64_t numtuples, int64_t result,
1394 struct timeval * start, struct timeval * end)
1395{
1396 double diff_usec = (((*end).tv_sec*1000000L + (*end).tv_usec)
1397 - ((*start).tv_sec*1000000L+(*start).tv_usec));
1398 double cyclestuple = total;
1399 cyclestuple /= numtuples;
1400 // stdout: passes, radix bits, runtime total, build, part, total time usecs, total tuples, cycles/tuple
1401 fprintf(stderr, "PASSES, RADIX BITS, RUNTIME TOTAL, BUILD, PART (cycles),");
1402 fprintf(stderr, "TOTAL-TIME-USECS, TOTAL-TUPLES, CYCLES-PER-TUPLE: \n");
1403 fprintf(stdout, "%d, %d, %llu, %llu, %llu, ",
1404 NUM_PASSES, NUM_RADIX_BITS, total, build, part);
1405 fprintf(stdout, "%.4lf, ", diff_usec);
1406 fprintf(stdout, "%llu, %.4lf\n", result, cyclestuple);
1407 fflush(stdout);
1408 fflush(stderr);
1409}
1410
1411
1423result_t *
1424join_init_run(relation_t * relR, relation_t * relS, JoinFunction jf, int nthreads)
1425{
1426 int i, rv;
1427 pthread_t tid[nthreads];
1428 pthread_attr_t attr;
1429 pthread_barrier_t barrier;
1430 cpu_set_t set;
1431 arg_t args[nthreads];
1432
1433 int32_t ** histR, ** histS;
1434 tuple_t * tmpRelR, * tmpRelS;
1435 int32_t numperthr[2];
1436 int64_t result = 0;
1437
1438 /* task_queue_t * part_queue, * join_queue; */
1439 int numnuma = get_num_numa_regions();
1440 task_queue_t * part_queue[numnuma];
1441 task_queue_t * join_queue[numnuma];
1442
1443#ifdef SKEW_HANDLING
1444 task_queue_t * skew_queue;
1445 task_t * skewtask = NULL;
1446 skew_queue = task_queue_init(FANOUT_PASS1);
1447#endif
1448
1449 for(i = 0; i < numnuma; i++){
1450 part_queue[i] = task_queue_init(FANOUT_PASS1);
1451 join_queue[i] = task_queue_init((1<<NUM_RADIX_BITS));
1452 }
1453
1454 result_t * joinresult = 0;
1455 joinresult = (result_t *) malloc(sizeof(result_t));
1456
1457#ifdef JOIN_RESULT_MATERIALIZE
1458 joinresult->resultlist = (threadresult_t *) malloc(sizeof(threadresult_t)
1459 * nthreads);
1460#endif
1461
1462 /* allocate temporary space for partitioning */
1463 tmpRelR = (tuple_t*) alloc_aligned(relR->num_tuples * sizeof(tuple_t) +
1465 tmpRelS = (tuple_t*) alloc_aligned(relS->num_tuples * sizeof(tuple_t) +
1467 MALLOC_CHECK((tmpRelR && tmpRelS));
1470 if(numalocalize) {
1471 uint64_t numwithpad;
1472
1473 numwithpad = (relR->num_tuples * sizeof(tuple_t) +
1474 RELATION_PADDING)/sizeof(tuple_t);
1475 numa_localize(tmpRelR, numwithpad, nthreads);
1476
1477 numwithpad = (relS->num_tuples * sizeof(tuple_t) +
1478 RELATION_PADDING)/sizeof(tuple_t);
1479 numa_localize(tmpRelS, numwithpad, nthreads);
1480 }
1481
1482
1483 /* allocate histograms arrays, actual allocation is local to threads */
1484 histR = (int32_t**) alloc_aligned(nthreads * sizeof(int32_t*));
1485 histS = (int32_t**) alloc_aligned(nthreads * sizeof(int32_t*));
1486 MALLOC_CHECK((histR && histS));
1487
1488 rv = pthread_barrier_init(&barrier, NULL, nthreads);
1489 if(rv != 0){
1490 printf("[ERROR] Couldn't create the barrier\n");
1491 exit(EXIT_FAILURE);
1492 }
1493
1494 pthread_attr_init(&attr);
1495
1496#ifdef SYNCSTATS
1497 /* thread-0 keeps track of synchronization stats */
1498 args[0].globaltimer = (synctimer_t*) malloc(sizeof(synctimer_t));
1499#endif
1500
1501 /* first assign chunks of relR & relS for each thread */
1502 numperthr[0] = relR->num_tuples / nthreads;
1503 numperthr[1] = relS->num_tuples / nthreads;
1504 for(i = 0; i < nthreads; i++){
1505 int cpu_idx = get_cpu_id(i);
1506
1507 DEBUGMSG(1, "Assigning thread-%d to CPU-%d\n", i, cpu_idx);
1508
1509 CPU_ZERO(&set);
1510 CPU_SET(cpu_idx, &set);
1511 pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &set);
1512
1513
1514 args[i].relR = relR->tuples + i * numperthr[0];
1515 args[i].tmpR = tmpRelR;
1516 args[i].histR = histR;
1517
1518 args[i].relS = relS->tuples + i * numperthr[1];
1519 args[i].tmpS = tmpRelS;
1520 args[i].histS = histS;
1521
1522 args[i].numR = (i == (nthreads-1)) ?
1523 (relR->num_tuples - i * numperthr[0]) : numperthr[0];
1524 args[i].numS = (i == (nthreads-1)) ?
1525 (relS->num_tuples - i * numperthr[1]) : numperthr[1];
1526 args[i].totalR = relR->num_tuples;
1527 args[i].totalS = relS->num_tuples;
1528
1529 args[i].my_tid = i;
1530 args[i].part_queue = part_queue;
1531 args[i].join_queue = join_queue;
1532#ifdef SKEW_HANDLING
1533 args[i].skew_queue = skew_queue;
1534 args[i].skewtask = &skewtask;
1535#endif
1536 args[i].barrier = &barrier;
1537 args[i].join_function = jf;
1538 args[i].nthreads = nthreads;
1539 args[i].threadresult = &(joinresult->resultlist[i]);
1540
1541 rv = pthread_create(&tid[i], &attr, prj_thread, (void*)&args[i]);
1542 if (rv){
1543 printf("[ERROR] return code from pthread_create() is %d\n", rv);
1544 exit(-1);
1545 }
1546 }
1547
1548 /* wait for threads to finish */
1549 for(i = 0; i < nthreads; i++){
1550 pthread_join(tid[i], NULL);
1551 result += args[i].result;
1552 }
1553 joinresult->totalresults = result;
1554 joinresult->nthreads = nthreads;
1555
1556#ifdef SYNCSTATS
1557/* #define ABSDIFF(X,Y) (((X) > (Y)) ? ((X)-(Y)) : ((Y)-(X))) */
1558 fprintf(stdout, "TID JTASKS T1.1 T1.1-IDLE T1.2 T1.2-IDLE "\
1559 "T3 T3-IDLE T4 T4-IDLE T5 T5-IDLE\n");
1560 for(i = 0; i < nthreads; i++){
1561 synctimer_t * glob = args[0].globaltimer;
1562 synctimer_t * local = & args[i].localtimer;
1563 fprintf(stdout,
1564 "%d %d %llu %llu %llu %llu %llu %llu %llu %llu "\
1565 "%llu %llu\n",
1566 (i+1), args[i].parts_processed, local->sync1[0],
1567 glob->sync1[0] - local->sync1[0],
1568 local->sync1[1] - glob->sync1[0],
1569 glob->sync1[1] - local->sync1[1],
1570 local->sync3 - glob->sync1[1],
1571 glob->sync3 - local->sync3,
1572 local->sync4 - glob->sync3,
1573 glob->sync4 - local->sync4,
1574 local->finish_time - glob->sync4,
1575 glob->finish_time - local->finish_time);
1576 }
1577#endif
1578
1579#ifndef NO_TIMING
1580 /* now print the timing results: */
1581 print_timing(args[0].timer1, args[0].timer2, args[0].timer3,
1582 relS->num_tuples, result,
1583 &args[0].start, &args[0].end);
1584#endif
1585
1586 /* clean up */
1587 for(i = 0; i < nthreads; i++) {
1588 free(histR[i]);
1589 free(histS[i]);
1590 }
1591 free(histR);
1592 free(histS);
1593
1594 for(i = 0; i < numnuma; i++){
1595 task_queue_free(part_queue[i]);
1596 task_queue_free(join_queue[i]);
1597 }
1598
1599#ifdef SKEW_HANDLING
1600 task_queue_free(skew_queue);
1601#endif
1602 free(tmpRelR);
1603 free(tmpRelS);
1604#ifdef SYNCSTATS
1605 free(args[0].globaltimer);
1606#endif
1607
1608 return joinresult;
1609}
1610
1612result_t *
1613PRO(relation_t * relR, relation_t * relS, int nthreads)
1614{
1615 return join_init_run(relR, relS, bucket_chaining_join, nthreads);
1616}
1617
1619result_t *
1620PRH(relation_t * relR, relation_t * relS, int nthreads)
1621{
1622 return join_init_run(relR, relS, histogram_join, nthreads);
1623}
1624
1626result_t *
1627PRHO(relation_t * relR, relation_t * relS, int nthreads)
1628{
1629 return join_init_run(relR, relS, histogram_optimized_join, nthreads);
1630}
1631
1633result_t *
1634RJ(relation_t * relR, relation_t * relS, int nthreads)
1635{
1636 int64_t result = 0;
1637 result_t * joinresult;
1638 uint32_t i;
1639
1640#ifndef NO_TIMING
1641 struct timeval start, end;
1642 uint64_t timer1, timer2, timer3;
1643#endif
1644
1645 relation_t *outRelR, *outRelS;
1646
1647 outRelR = (relation_t*) malloc(sizeof(relation_t));
1648 outRelS = (relation_t*) malloc(sizeof(relation_t));
1649
1650 joinresult = (result_t *) malloc(sizeof(result_t));
1651#ifdef JOIN_RESULT_MATERIALIZE
1652 joinresult->resultlist = (threadresult_t *) malloc(sizeof(threadresult_t));
1653#endif
1654
1655 /* allocate temporary space for partitioning */
1656 /* TODO: padding problem */
1657 size_t sz = relR->num_tuples * sizeof(tuple_t) + RELATION_PADDING;
1658 outRelR->tuples = (tuple_t*) malloc(sz);
1659 outRelR->num_tuples = relR->num_tuples;
1660
1661 sz = relS->num_tuples * sizeof(tuple_t) + RELATION_PADDING;
1662 outRelS->tuples = (tuple_t*) malloc(sz);
1663 outRelS->num_tuples = relS->num_tuples;
1664
1665#ifndef NO_TIMING
1666 gettimeofday(&start, NULL);
1667 startTimer(&timer1);
1668 startTimer(&timer2);
1669 startTimer(&timer3);
1670#endif
1671
1672 /***** do the multi-pass partitioning *****/
1673#if NUM_PASSES==1
1674 /* apply radix-clustering on relation R for pass-1 */
1675 radix_cluster_nopadding(outRelR, relR, 0, NUM_RADIX_BITS);
1676 relR = outRelR;
1677
1678 /* apply radix-clustering on relation S for pass-1 */
1679 radix_cluster_nopadding(outRelS, relS, 0, NUM_RADIX_BITS);
1680 relS = outRelS;
1681
1682#elif NUM_PASSES==2
1683 /* apply radix-clustering on relation R for pass-1 */
1685
1686 /* apply radix-clustering on relation S for pass-1 */
1688
1689 /* apply radix-clustering on relation R for pass-2 */
1690 radix_cluster_nopadding(relR, outRelR,
1693
1694 /* apply radix-clustering on relation S for pass-2 */
1695 radix_cluster_nopadding(relS, outRelS,
1698
1699 /* clean up temporary relations */
1700 free(outRelR->tuples);
1701 free(outRelS->tuples);
1702 free(outRelR);
1703 free(outRelS);
1704
1705#else
1706#error Only 1 or 2 pass partitioning is implemented, change NUM_PASSES!
1707#endif
1708
1709
1710#ifndef NO_TIMING
1711 stopTimer(&timer3);
1712#endif
1713
1714 int * R_count_per_cluster = (int*)calloc((1<<NUM_RADIX_BITS), sizeof(int));
1715 int * S_count_per_cluster = (int*)calloc((1<<NUM_RADIX_BITS), sizeof(int));
1716
1717 /* compute number of tuples per cluster */
1718 for( i=0; i < relR->num_tuples; i++ ){
1719 uint32_t idx = (relR->tuples[i].key) & ((1<<NUM_RADIX_BITS)-1);
1720 R_count_per_cluster[idx] ++;
1721 }
1722 for( i=0; i < relS->num_tuples; i++ ){
1723 uint32_t idx = (relS->tuples[i].key) & ((1<<NUM_RADIX_BITS)-1);
1724 S_count_per_cluster[idx] ++;
1725 }
1726
1727#ifdef JOIN_RESULT_MATERIALIZE
1728 chainedtuplebuffer_t * chainedbuf = chainedtuplebuffer_init();
1729#else
1730 void * chainedbuf = NULL;
1731#endif
1732
1733 /* build hashtable on inner */
1734 int r, s; /* start index of next clusters */
1735 r = s = 0;
1736 for( i=0; i < (1<<NUM_RADIX_BITS); i++ ){
1737 relation_t tmpR, tmpS;
1738
1739 if(R_count_per_cluster[i] > 0 && S_count_per_cluster[i] > 0){
1740
1741 tmpR.num_tuples = R_count_per_cluster[i];
1742 tmpR.tuples = relR->tuples + r;
1743 r += R_count_per_cluster[i];
1744
1745 tmpS.num_tuples = S_count_per_cluster[i];
1746 tmpS.tuples = relS->tuples + s;
1747 s += S_count_per_cluster[i];
1748
1749 result += bucket_chaining_join(&tmpR, &tmpS, NULL, chainedbuf);
1750 }
1751 else {
1752 r += R_count_per_cluster[i];
1753 s += S_count_per_cluster[i];
1754 }
1755 }
1756
1757#ifdef JOIN_RESULT_MATERIALIZE
1758 threadresult_t * thrres = &(joinresult->resultlist[0]);/* single-thread */
1759 thrres->nresults = result;
1760 thrres->threadid = 0;
1761 thrres->results = (void *) chainedbuf;
1762#endif
1763
1764#ifndef NO_TIMING
1765 /* TODO: actually we're not timing build */
1766 stopTimer(&timer2);/* build finished */
1767 stopTimer(&timer1);/* probe finished */
1768 gettimeofday(&end, NULL);
1769 /* now print the timing results: */
1770 print_timing(timer1, timer2, timer3, relS->num_tuples, result, &start, &end);
1771#endif
1772
1773 /* clean-up temporary buffers */
1774 free(S_count_per_cluster);
1775 free(R_count_per_cluster);
1776
1777#if NUM_PASSES == 1
1778 /* clean up temporary relations */
1779 free(outRelR->tuples);
1780 free(outRelS->tuples);
1781 free(outRelR);
1782 free(outRelS);
1783#endif
1784
1785 joinresult->totalresults = result;
1786 joinresult->nthreads = 1;
1787
1788 return joinresult;
1789}
1790
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
Barrier implementation, defaults to Pthreads. On Mac custom implementation since barriers are not inc...
#define BARRIER_ARRIVE(B, RV)
Definition: barrier.h:29
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
Definition: generator.c:413
void PCM_initPerformanceMonitor(const char *pcmcfg, const char *pcmout)
void PCM_start()
void PCM_stop()
void PCM_cleanup()
void PCM_printResults()
void PCM_log(char *msg)
int64_t histogram_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
result_t * PRH(relation_t *relR, relation_t *relS, int nthreads)
void parallel_radix_partition(part_t *const part)
void serial_radix_partition(task_t *const task, task_queue_t *join_queue, const int R, const int D)
uint32_t get_hist_size(uint32_t relSize) __attribute__((always_inline))
result_t * join_init_run(relation_t *relR, relation_t *relS, JoinFunction jf, int nthreads)
int64_t bucket_chaining_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
result_t * PRO(relation_t *relR, relation_t *relS, int nthreads)
void * prj_thread(void *param)
result_t * RJ(relation_t *relR, relation_t *relS, int nthreads)
int64_t histogram_optimized_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
void radix_cluster(relation_t *restrict outRel, relation_t *restrict inRel, int32_t *restrict hist, int R, int D)
void prefetch(void *addr) __attribute__((always_inline))
void radix_cluster_nopadding(relation_t *outRel, relation_t *inRel, int R, int D)
result_t * PRHO(relation_t *relR, relation_t *relS, int nthreads)
void parallel_radix_partition_optimized(part_t *const part)
#define RELATION_PADDING
Definition: prj_params.h:92
#define THRESHOLD1(NTHR)
Definition: prj_params.h:63
#define SMALL_PADDING_TUPLES
Definition: prj_params.h:88
int get_num_numa_regions(void)
Definition: cpu_mapping.c:124
int get_numa_node_of_address(void *ptr)
Definition: cpu_mapping.c:135
int get_cpu_id(int thread_id)
Definition: cpu_mapping.c:78
int get_numa_id(int mytid)
Definition: cpu_mapping.c:105
#define CACHE_LINE_SIZE
Definition: npj_params.h:24
#define DEBUGMSG(COND, MSG,...)
#define MALLOC_CHECK(M)
#define NEXT_POW_2(V)
int numalocalize
Definition: generator.c:47
Provides interfaces for several variants of Radix Hash Join.
An interface to the Intel Performance Counters Monitoring.
Constant parameters used by Parallel Radix Join implementations.
#define NUM_PASSES
Definition: prj_params.h:21
#define PROBE_BUFFER_SIZE
Definition: prj_params.h:26
#define NUM_RADIX_BITS
Definition: prj_params.h:16
Definition: types.h:74
Definition: types.h:45
Implements task queue facility for the join processing.
Implements a chained-buffer storage model for tuples.