37#ifdef JOIN_RESULT_MATERIALIZE
45#define BARRIER_ARRIVE(B,RV) \
46 RV = pthread_barrier_wait(B); \
47 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
48 printf("Couldn't wait on barrier\n"); \
55#define MALLOC_CHECK(M) \
57 printf("[ERROR] MALLOC_CHECK: %s : %d\n", __FILE__, __LINE__); \
58 perror(": malloc() failed!\n"); \
64#define HASH_BIT_MODULO(K, MASK, NBITS) (((K) & MASK) >> NBITS)
72#define NEXT_POW_2(V) \
84#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
87#define SYNC_TIMERS_START(A, TID) \
91 A->localtimer.sync1[0] = tnow; \
92 A->localtimer.sync1[1] = tnow; \
93 A->localtimer.sync3 = tnow; \
94 A->localtimer.sync4 = tnow; \
95 A->localtimer.finish_time = tnow; \
97 A->globaltimer->sync1[0] = tnow; \
98 A->globaltimer->sync1[1] = tnow; \
99 A->globaltimer->sync3 = tnow; \
100 A->globaltimer->sync4 = tnow; \
101 A->globaltimer->finish_time = tnow; \
105#define SYNC_TIMER_STOP(T) stopTimer(T)
106#define SYNC_GLOBAL_STOP(T, TID) if(TID==0){ stopTimer(T); }
108#define SYNC_TIMERS_START(A, TID)
109#define SYNC_TIMER_STOP(T)
110#define SYNC_GLOBAL_STOP(T, TID)
115#define DEBUGMSG(COND, MSG, ...) \
116 if(COND) { fprintf(stdout, "[DEBUG] "MSG, ## __VA_ARGS__); }
118#define DEBUGMSG(COND, MSG, ...)
122#if defined(__cplusplus)
123#define restrict __restrict__
131typedef struct synctimer_t synctimer_t;
132typedef int64_t (*JoinFunction)(
const relation_t *
const,
147 uint64_t finish_time;
171 pthread_barrier_t * barrier;
172 JoinFunction join_function;
181 int32_t parts_processed;
182 uint64_t timer1, timer2, timer3;
183 struct timeval start, end;
186 synctimer_t localtimer;
188 synctimer_t * globaltimer;
199 uint64_t total_tuples;
208alloc_aligned(
size_t size)
215 perror(
"alloc_aligned() failed: out of memory");
248 int * next, * bucket;
249 const uint32_t numR = R->num_tuples;
257 next = (
int*) malloc(
sizeof(
int) * numR);
259 bucket = (
int*) calloc(N,
sizeof(
int));
261 const tuple_t *
const Rtuples = R->tuples;
262 for(uint32_t i=0; i < numR; ){
263 uint32_t idx = HASH_BIT_MODULO(R->tuples[i].key, MASK,
NUM_RADIX_BITS);
264 next[i] = bucket[idx];
272 const tuple_t *
const Stuples = S->tuples;
273 const uint32_t numS = S->num_tuples;
275#ifdef JOIN_RESULT_MATERIALIZE
281 for(uint32_t i=0; i < numS; i++ ){
283 uint32_t idx = HASH_BIT_MODULO(Stuples[i].key, MASK,
NUM_RADIX_BITS);
285 for(
int hit = bucket[idx]; hit > 0; hit = next[hit-1]){
287 if(Stuples[i].key == Rtuples[hit-1].key){
289#ifdef JOIN_RESULT_MATERIALIZE
291 tuple_t * joinres = cb_next_writepos(chainedbuf);
292 joinres->key = Rtuples[hit-1].payload;
293 joinres->payload = Stuples[i].payload;
312get_hist_size(uint32_t relSize) __attribute__((always_inline));
320 if(relSize < 4) relSize = 4;
336 int32_t * restrict hist;
337 const tuple_t * restrict
const Rtuples = R->tuples;
338 const uint32_t numR = R->num_tuples;
342 hist = (int32_t*) calloc(Nhist+2,
sizeof(int32_t));
344 for( uint32_t i = 0; i < numR; i++ ) {
346 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK,
NUM_RADIX_BITS);
352 for( uint32_t i = 2, sum = 0; i <= Nhist+1; i++ ) {
357 tuple_t *
const tmpRtuples = tmpR->tuples;
359 for( uint32_t i = 0; i < numR; i++ ) {
361 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, MASK,
NUM_RADIX_BITS) + 1;
363 tmpRtuples[hist[idx]] = Rtuples[i];
369 const uint32_t numS = S->num_tuples;
370 const tuple_t *
const Stuples = S->tuples;
372 for( uint32_t i = 0; i < numS; i++ ) {
374 uint32_t idx = HASH_BIT_MODULO(Stuples[i].key, MASK,
NUM_RADIX_BITS);
376 int j = hist[idx], end = hist[idx+1];
379 for(; j < end; j++) {
381 if(Stuples[i].key == tmpRtuples[j].key) {
399prefetch(
void * addr) __attribute__((always_inline));
406 __asm__ __volatile__ (
"prefetcht0 %0" ::
"m" (*(uint32_t*)addr));
425#warning SIMD comparison for 64-bit keys are not implemented!
428 int32_t * restrict hist;
429 const tuple_t * restrict
const Rtuples = R->tuples;
430 const uint32_t numR = R->num_tuples;
434 hist = (int32_t*) calloc(Nhist+2,
sizeof(int32_t));
437 for( uint32_t i = 0; i < numR; i++ ) {
438 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, mask,
NUM_RADIX_BITS);
443 for( uint32_t i = 2, sum = 0; i <= Nhist+1; i++ ) {
448 tuple_t * restrict
const tmpRtuples = tmpR->tuples;
450 for( uint32_t i = 0; i < numR; i++ ) {
451 uint32_t idx = HASH_BIT_MODULO(Rtuples[i].key, mask,
NUM_RADIX_BITS) + 1;
452 tmpRtuples[hist[idx]] = Rtuples[i];
456#ifdef SMALL_PADDING_TUPLES
458 for( uint32_t i = numR+3; i >= numR; i-- ) {
459 tmpRtuples[numR].key = 0;
467 const uint32_t numS = S->num_tuples;
468 const tuple_t * restrict
const Stuples = S->tuples;
469 __m128i counter = _mm_setzero_si128();
478 key_buffer[k] = skey;
479 hash_buffer[k] = idx;
485 int j = hist[hash_buffer[k]];
486 int end = hist[hash_buffer[k]+1];
487 __m128i search_key = _mm_set1_epi32(key_buffer[k]);
489 for( ; j < end; j += 2) {
491 __m128i keyvals = _mm_loadu_si128((__m128i
const *)(tmpRtuples + j));
492 keyvals = _mm_cmpeq_epi32(keyvals, search_key);
493 counter = _mm_add_epi32(keyvals, counter);
500 const intkey_t skey = Stuples[i].key;
505 int end = hist[idx+1];
506 __m128i search_key = _mm_set1_epi32(skey);
508 for( ; j < end; j += 2) {
510 __m128i keyvals = _mm_loadu_si128((__m128i
const *)(tmpRtuples + j));
511 keyvals = _mm_cmpeq_epi32(keyvals, search_key);
512 counter = _mm_add_epi32(keyvals, counter);
518 match += -( _mm_extract_epi32(counter, 0) +
519 _mm_extract_epi32(counter, 2) );
547 int32_t * restrict hist,
552 uint32_t M = ((1 << D) - 1) << R;
554 uint32_t fanOut = 1 << D;
559 uint32_t dst[fanOut];
562 for( i=0; i < inRel->num_tuples; i++ ){
563 uint32_t idx = HASH_BIT_MODULO(inRel->tuples[i].key, M, R);
568 for ( i=0; i < fanOut; i++ ) {
577 for( i=0; i < inRel->num_tuples; i++ ){
578 uint32_t idx = HASH_BIT_MODULO(inRel->tuples[i].key, M, R);
579 outRel->tuples[ dst[idx] ] = inRel->tuples[i];
600 uint32_t * tuples_per_cluster;
603 const uint32_t M = ((1 << D) - 1) << R;
604 const uint32_t fanOut = 1 << D;
605 const uint32_t ntuples = inRel->num_tuples;
607 tuples_per_cluster = (uint32_t*)calloc(fanOut,
sizeof(uint32_t));
614 input = inRel->tuples;
616 for( i=0; i < ntuples; i++ ){
617 uint32_t idx = (uint32_t)(HASH_BIT_MODULO(input->key, M, R));
618 tuples_per_cluster[idx]++;
624 for ( i=0; i < fanOut; i++ ) {
625 dst[i] = outRel->tuples + offset;
626 offset += tuples_per_cluster[i];
630 input = inRel->tuples;
632 for( i=0; i < ntuples; i++ ){
633 uint32_t idx = (uint32_t)(HASH_BIT_MODULO(input->key, M, R));
646 free(tuples_per_cluster);
660 const int R,
const int D)
663 uint32_t offsetR = 0, offsetS = 0;
664 const int fanOut = 1 << D;
665 int32_t * outputR, * outputS;
667 outputR = (int32_t*)calloc(fanOut+1,
sizeof(int32_t));
668 outputS = (int32_t*)calloc(fanOut+1,
sizeof(int32_t));
677 for(i = 0; i < fanOut; i++) {
678 if(outputR[i] > 0 && outputS[i] > 0) {
679 task_t * t = task_queue_get_slot_atomic(join_queue);
680 t->relR.num_tuples = outputR[i];
681 t->relR.tuples = task->tmpR.tuples + offsetR
683 t->tmpR.tuples = task->relR.tuples + offsetR
685 offsetR += outputR[i];
687 t->relS.num_tuples = outputS[i];
688 t->relS.tuples = task->tmpS.tuples + offsetS
690 t->tmpS.tuples = task->relS.tuples + offsetS
692 offsetS += outputS[i];
695 task_queue_add_atomic(join_queue, t);
698 offsetR += outputR[i];
699 offsetS += outputS[i];
717 const tuple_t * restrict rel = part->rel;
718 int32_t ** hist = part->hist;
719 int64_t * restrict output = part->output;
721 const uint32_t my_tid = part->thrargs->my_tid;
722 const uint32_t nthreads = part->thrargs->nthreads;
723 const uint32_t num_tuples = part->num_tuples;
725 const int32_t R = part->R;
726 const int32_t D = part->D;
727 const uint32_t fanOut = 1 << D;
728 const uint32_t MASK = (fanOut - 1) << R;
729 const uint32_t padding = part->padding;
735 int64_t dst[fanOut+1];
739 int32_t * my_hist = hist[my_tid];
741 for(i = 0; i < num_tuples; i++) {
742 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
747 for(i = 0; i < fanOut; i++){
752 SYNC_TIMER_STOP(&part->thrargs->localtimer.sync1[part->relidx]);
756 SYNC_GLOBAL_STOP(&part->thrargs->globaltimer->sync1[part->relidx], my_tid);
759 for(i = 0; i < my_tid; i++) {
760 for(j = 0; j < fanOut; j++)
761 output[j] += hist[i][j];
763 for(i = my_tid; i < nthreads; i++) {
764 for(j = 1; j < fanOut; j++)
765 output[j] += hist[i][j-1];
768 for(i = 0; i < fanOut; i++ ) {
769 output[i] += i * padding;
772 output[fanOut] = part->total_tuples + fanOut * padding;
774 tuple_t * restrict tmp = part->tmp;
777 for(i = 0; i < num_tuples; i++ ){
778 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
779 tmp[dst[idx]] = rel[i];
798#define TUPLESPERCACHELINE (CACHE_LINE_SIZE/sizeof(tuple_t))
811store_nontemp_64B(
void * dst,
void * src)
814 register __m256i * d1 = (__m256i*) dst;
815 register __m256i s1 = *((__m256i*) src);
816 register __m256i * d2 = d1+1;
817 register __m256i s2 = *(((__m256i*) src)+1);
819 _mm256_stream_si256(d1, s1);
820 _mm256_stream_si256(d2, s2);
822#elif defined(__SSE2__)
824 register __m128i * d1 = (__m128i*) dst;
825 register __m128i * d2 = d1+1;
826 register __m128i * d3 = d1+2;
827 register __m128i * d4 = d1+3;
828 register __m128i s1 = *(__m128i*) src;
829 register __m128i s2 = *((__m128i*)src + 1);
830 register __m128i s3 = *((__m128i*)src + 2);
831 register __m128i s4 = *((__m128i*)src + 3);
833 _mm_stream_si128 (d1, s1);
834 _mm_stream_si128 (d2, s2);
835 _mm_stream_si128 (d3, s3);
836 _mm_stream_si128 (d4, s4);
859 const tuple_t * restrict rel = part->rel;
860 int32_t ** hist = part->hist;
861 int64_t * restrict output = part->output;
863 const uint32_t my_tid = part->thrargs->my_tid;
864 const uint32_t nthreads = part->thrargs->nthreads;
865 const uint32_t num_tuples = part->num_tuples;
867 const int32_t R = part->R;
868 const int32_t D = part->D;
869 const uint32_t fanOut = 1 << D;
870 const uint32_t MASK = (fanOut - 1) << R;
871 const uint32_t padding = part->padding;
879 int32_t * my_hist = hist[my_tid];
881 for(i = 0; i < num_tuples; i++) {
882 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
887 for(i = 0; i < fanOut; i++){
892 SYNC_TIMER_STOP(&part->thrargs->localtimer.sync1[part->relidx]);
896 SYNC_GLOBAL_STOP(&part->thrargs->globaltimer->sync1[part->relidx], my_tid);
899 for(i = 0; i < my_tid; i++) {
900 for(j = 0; j < fanOut; j++)
901 output[j] += hist[i][j];
903 for(i = my_tid; i < nthreads; i++) {
904 for(j = 1; j < fanOut; j++)
905 output[j] += hist[i][j-1];
909 tuple_t * restrict tmp = part->tmp;
913 for(i = 0; i < fanOut; i++ ) {
914 uint64_t off = output[i] + i * padding;
918 buffer[i].data.slot = off;
920 output[fanOut] = part->total_tuples + fanOut * padding;
923 for(i = 0; i < num_tuples; i++ ){
924 uint32_t idx = HASH_BIT_MODULO(rel[i].key, MASK, R);
925 uint64_t slot = buffer[idx].data.slot;
927 uint32_t slotMod = (slot) & (TUPLESPERCACHELINE - 1);
928 tup[slotMod] = rel[i];
930 if(slotMod == (TUPLESPERCACHELINE-1)){
932 store_nontemp_64B((tmp+slot-(TUPLESPERCACHELINE-1)), (buffer+idx));
936 buffer[idx].data.slot = slot+1;
941 for(i = 0; i < fanOut; i++ ) {
942 uint64_t slot = buffer[i].data.slot;
943 uint32_t sz = (slot) & (TUPLESPERCACHELINE - 1);
945 for(uint32_t j = 0; j < sz; j++) {
946 tmp[slot] = buffer[i].data.tuples[j];
967 int32_t my_tid = args->my_tid;
973 uint64_t results = 0;
985 int64_t * outputR = (int64_t *) calloc((fanOut+1),
sizeof(int64_t));
986 int64_t * outputS = (int64_t *) calloc((fanOut+1),
sizeof(int64_t));
990 part_queue = args->part_queue[numaid];
991 join_queue = args->join_queue[numaid];
994 skew_queue = args->skew_queue;
997 args->histR[my_tid] = (int32_t *) calloc(fanOut,
sizeof(int32_t));
998 args->histS[my_tid] = (int32_t *) calloc(fanOut,
sizeof(int32_t));
1002 args->parts_processed = 0;
1015 SYNC_TIMERS_START(args, my_tid);
1020 gettimeofday(&args->start, NULL);
1021 startTimer(&args->timer1);
1022 startTimer(&args->timer2);
1023 startTimer(&args->timer3);
1030 part.thrargs = args;
1031 part.padding = PADDING_TUPLES;
1034 part.rel = args->relR;
1035 part.tmp = args->tmpR;
1036 part.hist = args->histR;
1037 part.output = outputR;
1038 part.num_tuples = args->numR;
1039 part.total_tuples = args->totalR;
1042#ifdef USE_SWWC_OPTIMIZED_PART
1049 part.rel = args->relS;
1050 part.tmp = args->tmpS;
1051 part.hist = args->histS;
1052 part.output = outputS;
1053 part.num_tuples = args->numS;
1054 part.total_tuples = args->totalS;
1057#ifdef USE_SWWC_OPTIMIZED_PART
1073 const int thresh1 = 64*
THRESHOLD1(args->nthreads);
1082 for(i = 0; i < fanOut; i++) {
1083 int32_t ntupR = outputR[i+1] - outputR[i] - PADDING_TUPLES;
1084 int32_t ntupS = outputS[i+1] - outputS[i] - PADDING_TUPLES;
1088 if(ntupR > thresh1 || ntupS > thresh1){
1089 DEBUGMSG(1,
"Adding to skew_queue= R:%d, S:%d\n", ntupR, ntupS);
1091 task_t * t = task_queue_get_slot(skew_queue);
1093 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1094 t->relR.tuples = args->tmpR + outputR[i];
1095 t->tmpR.tuples = args->relR + outputR[i];
1097 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1098 t->relS.tuples = args->tmpS + outputS[i];
1099 t->tmpS.tuples = args->relS + outputS[i];
1101 task_queue_add(skew_queue, t);
1105 if(ntupR > 0 && ntupS > 0) {
1107 void * ptr = (
void*)&((args->tmpR + outputR[i])[0]);
1127 task_queue_t * numalocal_part_queue = args->part_queue[pq_idx];
1129 task_t * t = task_queue_get_slot(numalocal_part_queue);
1131 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1132 t->relR.tuples = args->tmpR + outputR[i];
1133 t->tmpR.tuples = args->relR + outputR[i];
1135 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1136 t->relS.tuples = args->tmpS + outputS[i];
1137 t->tmpS.tuples = args->relS + outputS[i];
1140 task_queue_add(numalocal_part_queue, t);
1146 DEBUGMSG(1,
"Pass-2: # partitioning tasks = %d\n", part_queue->count);
1155 SYNC_TIMER_STOP(&args->localtimer.sync3);
1159 SYNC_GLOBAL_STOP(&args->globaltimer->sync3, my_tid);
1167 join_queue = part_queue;
1173 while((task = task_queue_get_atomic(part_queue))){
1180#warning Only 2-pass partitioning is implemented, set NUM_PASSES to 2!
1187 part.thrargs = args;
1192 *args->skewtask = task_queue_get_atomic(skew_queue);
1195 if( *args->skewtask == NULL)
1198 DEBUGMSG((my_tid==0),
"Got skew task = R: %d, S: %d\n",
1199 (*args->skewtask)->relR.num_tuples,
1200 (*args->skewtask)->relS.num_tuples);
1202 int32_t numperthr = (*args->skewtask)->relR.num_tuples / args->nthreads;
1203 const int fanOut2 = (1 << D);
1208 outputR = (int64_t*) calloc(fanOut2 + 1,
sizeof(int64_t));
1209 outputS = (int64_t*) calloc(fanOut2 + 1,
sizeof(int64_t));
1211 free(args->histR[my_tid]);
1212 free(args->histS[my_tid]);
1214 args->histR[my_tid] = (int32_t*) calloc(fanOut2,
sizeof(int32_t));
1215 args->histS[my_tid] = (int32_t*) calloc(fanOut2,
sizeof(int32_t));
1221 part.rel = (*args->skewtask)->relR.tuples + my_tid * numperthr;
1222 part.tmp = (*args->skewtask)->tmpR.tuples;
1223 part.hist = args->histR;
1224 part.output = outputR;
1225 part.num_tuples = (my_tid == (args->nthreads-1)) ?
1226 ((*args->skewtask)->relR.num_tuples - my_tid * numperthr)
1228 part.total_tuples = (*args->skewtask)->relR.num_tuples;
1232 numperthr = (*args->skewtask)->relS.num_tuples / args->nthreads;
1234 part.rel = (*args->skewtask)->relS.tuples + my_tid * numperthr;
1235 part.tmp = (*args->skewtask)->tmpS.tuples;
1236 part.hist = args->histS;
1237 part.output = outputS;
1238 part.num_tuples = (my_tid == (args->nthreads-1)) ?
1239 ((*args->skewtask)->relS.num_tuples - my_tid * numperthr)
1241 part.total_tuples = (*args->skewtask)->relS.num_tuples;
1252 for(i = 0; i < fanOut2; i++) {
1255 if(ntupR > THR1 || ntupS > THR1){
1257 DEBUGMSG(1,
"Large join task = R: %d, S: %d\n", ntupR, ntupS);
1260 for(
int k=0; k < args->nthreads; k++) {
1261 int ns = (k == args->nthreads-1)
1262 ? (ntupS - k*(ntupS/args->nthreads))
1263 : (ntupS/args->nthreads);
1264 task_t * t = task_queue_get_slot(part_queue);
1266 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1267 t->relR.tuples = (*args->skewtask)->tmpR.tuples + outputR[i];
1268 t->tmpR.tuples = (*args->skewtask)->relR.tuples + outputR[i];
1270 t->relS.num_tuples = t->tmpS.num_tuples = ns;
1271 t->relS.tuples = (*args->skewtask)->tmpS.tuples + outputS[i]
1272 + k*(ntupS/args->nthreads);
1273 t->tmpS.tuples = (*args->skewtask)->relS.tuples + outputS[i]
1274 + k*(ntupS/args->nthreads);
1276 task_queue_add(part_queue, t);
1280 if(ntupR > 0 && ntupS > 0) {
1281 task_t * t = task_queue_get_slot(join_queue);
1283 t->relR.num_tuples = t->tmpR.num_tuples = ntupR;
1284 t->relR.tuples = (*args->skewtask)->tmpR.tuples + outputR[i];
1285 t->tmpR.tuples = (*args->skewtask)->relR.tuples + outputR[i];
1287 t->relS.num_tuples = t->tmpS.num_tuples = ntupS;
1288 t->relS.tuples = (*args->skewtask)->tmpS.tuples + outputS[i];
1289 t->tmpS.tuples = (*args->skewtask)->relS.tuples + outputS[i];
1291 task_queue_add(join_queue, t);
1293 DEBUGMSG(1,
"Join added = R: %d, S: %d\n",
1294 t->relR.num_tuples, t->relS.num_tuples);
1303 while((task = task_queue_get_atomic(part_queue)))
1304 task_queue_add(join_queue, task);
1312 SYNC_TIMER_STOP(&args->localtimer.sync4);
1316 SYNC_GLOBAL_STOP(&args->globaltimer->sync4, my_tid);
1319 if(my_tid == 0) stopTimer(&args->timer3);
1322 DEBUGMSG((my_tid == 0),
"Number of join tasks = %d\n", join_queue->count);
1327 PCM_log(
"======= Partitioning phase profiling results ======\n");
1335#ifdef JOIN_RESULT_MATERIALIZE
1338 void * chainedbuf = NULL;
1341 while((task = task_queue_get_atomic(join_queue))){
1345 results += args->join_function(&task->relR, &task->relS, &task->tmpR, chainedbuf);
1347 args->parts_processed ++;
1350 args->result = results;
1352#ifdef JOIN_RESULT_MATERIALIZE
1353 args->threadresult->nresults = results;
1354 args->threadresult->threadid = my_tid;
1355 args->threadresult->results = (
void *) chainedbuf;
1359 SYNC_TIMER_STOP(&args->localtimer.finish_time);
1366 stopTimer(&args->timer2);
1367 stopTimer(&args->timer1);
1368 gettimeofday(&args->end, NULL);
1373 SYNC_GLOBAL_STOP(&args->globaltimer->finish_time, my_tid);
1378 PCM_log(
"=========== Build+Probe profiling results =========\n");
1380 PCM_log(
"===================================================\n");
1392print_timing(uint64_t total, uint64_t build, uint64_t part,
1393 uint64_t numtuples, int64_t result,
1394 struct timeval * start,
struct timeval * end)
1396 double diff_usec = (((*end).tv_sec*1000000L + (*end).tv_usec)
1397 - ((*start).tv_sec*1000000L+(*start).tv_usec));
1398 double cyclestuple = total;
1399 cyclestuple /= numtuples;
1401 fprintf(stderr,
"PASSES, RADIX BITS, RUNTIME TOTAL, BUILD, PART (cycles),");
1402 fprintf(stderr,
"TOTAL-TIME-USECS, TOTAL-TUPLES, CYCLES-PER-TUPLE: \n");
1403 fprintf(stdout,
"%d, %d, %llu, %llu, %llu, ",
1405 fprintf(stdout,
"%.4lf, ", diff_usec);
1406 fprintf(stdout,
"%llu, %.4lf\n", result, cyclestuple);
1427 pthread_t tid[nthreads];
1428 pthread_attr_t attr;
1429 pthread_barrier_t barrier;
1431 arg_t args[nthreads];
1433 int32_t ** histR, ** histS;
1435 int32_t numperthr[2];
1445 task_t * skewtask = NULL;
1446 skew_queue = task_queue_init(FANOUT_PASS1);
1449 for(i = 0; i < numnuma; i++){
1450 part_queue[i] = task_queue_init(FANOUT_PASS1);
1457#ifdef JOIN_RESULT_MATERIALIZE
1463 tmpRelR = (
tuple_t*) alloc_aligned(relR->num_tuples *
sizeof(
tuple_t) +
1465 tmpRelS = (
tuple_t*) alloc_aligned(relS->num_tuples *
sizeof(
tuple_t) +
1471 uint64_t numwithpad;
1473 numwithpad = (relR->num_tuples *
sizeof(
tuple_t) +
1477 numwithpad = (relS->num_tuples *
sizeof(
tuple_t) +
1484 histR = (int32_t**) alloc_aligned(nthreads *
sizeof(int32_t*));
1485 histS = (int32_t**) alloc_aligned(nthreads *
sizeof(int32_t*));
1488 rv = pthread_barrier_init(&barrier, NULL, nthreads);
1490 printf(
"[ERROR] Couldn't create the barrier\n");
1494 pthread_attr_init(&attr);
1498 args[0].globaltimer = (synctimer_t*) malloc(
sizeof(synctimer_t));
1502 numperthr[0] = relR->num_tuples / nthreads;
1503 numperthr[1] = relS->num_tuples / nthreads;
1504 for(i = 0; i < nthreads; i++){
1507 DEBUGMSG(1,
"Assigning thread-%d to CPU-%d\n", i, cpu_idx);
1510 CPU_SET(cpu_idx, &set);
1511 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &set);
1514 args[i].relR = relR->tuples + i * numperthr[0];
1515 args[i].tmpR = tmpRelR;
1516 args[i].histR = histR;
1518 args[i].relS = relS->tuples + i * numperthr[1];
1519 args[i].tmpS = tmpRelS;
1520 args[i].histS = histS;
1522 args[i].numR = (i == (nthreads-1)) ?
1523 (relR->num_tuples - i * numperthr[0]) : numperthr[0];
1524 args[i].numS = (i == (nthreads-1)) ?
1525 (relS->num_tuples - i * numperthr[1]) : numperthr[1];
1526 args[i].totalR = relR->num_tuples;
1527 args[i].totalS = relS->num_tuples;
1530 args[i].part_queue = part_queue;
1531 args[i].join_queue = join_queue;
1533 args[i].skew_queue = skew_queue;
1534 args[i].skewtask = &skewtask;
1536 args[i].barrier = &barrier;
1537 args[i].join_function = jf;
1538 args[i].nthreads = nthreads;
1539 args[i].threadresult = &(joinresult->resultlist[i]);
1541 rv = pthread_create(&tid[i], &attr,
prj_thread, (
void*)&args[i]);
1543 printf(
"[ERROR] return code from pthread_create() is %d\n", rv);
1549 for(i = 0; i < nthreads; i++){
1550 pthread_join(tid[i], NULL);
1551 result += args[i].result;
1553 joinresult->totalresults = result;
1554 joinresult->nthreads = nthreads;
1558 fprintf(stdout,
"TID JTASKS T1.1 T1.1-IDLE T1.2 T1.2-IDLE "\
1559 "T3 T3-IDLE T4 T4-IDLE T5 T5-IDLE\n");
1560 for(i = 0; i < nthreads; i++){
1561 synctimer_t * glob = args[0].globaltimer;
1562 synctimer_t * local = & args[i].localtimer;
1564 "%d %d %llu %llu %llu %llu %llu %llu %llu %llu "\
1566 (i+1), args[i].parts_processed, local->sync1[0],
1567 glob->sync1[0] - local->sync1[0],
1568 local->sync1[1] - glob->sync1[0],
1569 glob->sync1[1] - local->sync1[1],
1570 local->sync3 - glob->sync1[1],
1571 glob->sync3 - local->sync3,
1572 local->sync4 - glob->sync3,
1573 glob->sync4 - local->sync4,
1574 local->finish_time - glob->sync4,
1575 glob->finish_time - local->finish_time);
1581 print_timing(args[0].timer1, args[0].timer2, args[0].timer3,
1582 relS->num_tuples, result,
1583 &args[0].start, &args[0].end);
1587 for(i = 0; i < nthreads; i++) {
1594 for(i = 0; i < numnuma; i++){
1595 task_queue_free(part_queue[i]);
1596 task_queue_free(join_queue[i]);
1600 task_queue_free(skew_queue);
1605 free(args[0].globaltimer);
1641 struct timeval start, end;
1642 uint64_t timer1, timer2, timer3;
1651#ifdef JOIN_RESULT_MATERIALIZE
1658 outRelR->tuples = (
tuple_t*) malloc(sz);
1659 outRelR->num_tuples = relR->num_tuples;
1662 outRelS->tuples = (
tuple_t*) malloc(sz);
1663 outRelS->num_tuples = relS->num_tuples;
1666 gettimeofday(&start, NULL);
1667 startTimer(&timer1);
1668 startTimer(&timer2);
1669 startTimer(&timer3);
1700 free(outRelR->tuples);
1701 free(outRelS->tuples);
1706#error Only 1 or 2 pass partitioning is implemented, change NUM_PASSES!
1714 int * R_count_per_cluster = (
int*)calloc((1<<
NUM_RADIX_BITS),
sizeof(int));
1715 int * S_count_per_cluster = (
int*)calloc((1<<
NUM_RADIX_BITS),
sizeof(int));
1718 for( i=0; i < relR->num_tuples; i++ ){
1720 R_count_per_cluster[idx] ++;
1722 for( i=0; i < relS->num_tuples; i++ ){
1724 S_count_per_cluster[idx] ++;
1727#ifdef JOIN_RESULT_MATERIALIZE
1730 void * chainedbuf = NULL;
1739 if(R_count_per_cluster[i] > 0 && S_count_per_cluster[i] > 0){
1741 tmpR.num_tuples = R_count_per_cluster[i];
1742 tmpR.tuples = relR->tuples + r;
1743 r += R_count_per_cluster[i];
1745 tmpS.num_tuples = S_count_per_cluster[i];
1746 tmpS.tuples = relS->tuples + s;
1747 s += S_count_per_cluster[i];
1752 r += R_count_per_cluster[i];
1753 s += S_count_per_cluster[i];
1757#ifdef JOIN_RESULT_MATERIALIZE
1759 thrres->nresults = result;
1760 thrres->threadid = 0;
1761 thrres->results = (
void *) chainedbuf;
1768 gettimeofday(&end, NULL);
1770 print_timing(timer1, timer2, timer3, relS->num_tuples, result, &start, &end);
1774 free(S_count_per_cluster);
1775 free(R_count_per_cluster);
1779 free(outRelR->tuples);
1780 free(outRelS->tuples);
1785 joinresult->totalresults = result;
1786 joinresult->nthreads = 1;
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
Barrier implementation, defaults to Pthreads. On Mac custom implementation since barriers are not inc...
#define BARRIER_ARRIVE(B, RV)
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
int64_t histogram_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
result_t * PRH(relation_t *relR, relation_t *relS, int nthreads)
void parallel_radix_partition(part_t *const part)
void serial_radix_partition(task_t *const task, task_queue_t *join_queue, const int R, const int D)
uint32_t get_hist_size(uint32_t relSize) __attribute__((always_inline))
result_t * join_init_run(relation_t *relR, relation_t *relS, JoinFunction jf, int nthreads)
int64_t bucket_chaining_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
result_t * PRO(relation_t *relR, relation_t *relS, int nthreads)
void * prj_thread(void *param)
result_t * RJ(relation_t *relR, relation_t *relS, int nthreads)
int64_t histogram_optimized_join(const relation_t *const R, const relation_t *const S, relation_t *const tmpR, void *output)
void radix_cluster(relation_t *restrict outRel, relation_t *restrict inRel, int32_t *restrict hist, int R, int D)
void prefetch(void *addr) __attribute__((always_inline))
void radix_cluster_nopadding(relation_t *outRel, relation_t *inRel, int R, int D)
result_t * PRHO(relation_t *relR, relation_t *relS, int nthreads)
void parallel_radix_partition_optimized(part_t *const part)
#define SMALL_PADDING_TUPLES
int get_num_numa_regions(void)
int get_numa_node_of_address(void *ptr)
int get_cpu_id(int thread_id)
int get_numa_id(int mytid)
#define DEBUGMSG(COND, MSG,...)
Provides interfaces for several variants of Radix Hash Join.
An interface to the Intel Performance Counters Monitoring.
Constant parameters used by Parallel Radix Join implementations.
#define PROBE_BUFFER_SIZE
Implements task queue facility for the join processing.
Implements a chained-buffer storage model for tuples.