Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
generator.c
1/* @version $Id: generator.c 4546 2013-12-07 13:56:09Z bcagri $ */
2
3#ifndef _GNU_SOURCE
4#define _GNU_SOURCE
5#endif
6#include <assert.h> /* assert() */
7#include <sched.h> /* CPU_ZERO, CPU_SET */
8#include <pthread.h> /* pthread_attr_setaffinity_np */
9#include <stdio.h> /* perror */
10#include <stdlib.h> /* posix_memalign */
11#include <math.h> /* fmod, pow */
12#include <time.h> /* time() */
13#include <unistd.h> /* getpagesize() */
14#include <string.h> /* memcpy() */
15
16#include "cpu_mapping.h" /* get_cpu_id() */
17#include "generator.h" /* create_relation_*() */
18#include "affinity.h" /* pthread_attr_setaffinity_np */
19#include "genzipf.h" /* gen_zipf() */
20#include "lock.h"
21#include "prj_params.h" /* RELATION_PADDING for Parallel Radix */
22
23#ifdef __cplusplus
24namespace eth_hashjoin {
25#endif // __cplusplus
26
27/* return a random number in range [0,N] */
28#define RAND_RANGE(N) ((double)rand() / ((double)RAND_MAX + 1) * (N))
29#define RAND_RANGE48(N,STATE) ((double)nrand48(STATE)/((double)RAND_MAX+1)*(N))
30#define MALLOC(SZ) alloc_aligned(SZ+RELATION_PADDING) /*malloc(SZ+RELATION_PADDING)*/
31#define FREE(X,SZ) free(X)
32
33#ifndef BARRIER_ARRIVE
35#define BARRIER_ARRIVE(B,RV) \
36 RV = pthread_barrier_wait(B); \
37 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
38 printf("Couldn't wait on barrier\n"); \
39 exit(EXIT_FAILURE); \
40 }
41#endif
42
43/* Uncomment the following to persist input relations to disk. */
44/* #define PERSIST_RELATIONS 1 */
45
48int nthreads;
49
50static int seeded = 0;
51static unsigned int seedValue;
52
53void *
54alloc_aligned(size_t size)
55{
56 void * ret;
57 int rv;
58 rv = posix_memalign((void**)&ret, CACHE_LINE_SIZE, size);
59
60 if (rv) {
61 perror("[ERROR] alloc_aligned() failed: out of memory");
62 return 0;
63 }
64
67 if(numalocalize) {
68 tuple_t * mem = (tuple_t *) ret;
69 uint64_t ntuples = size / sizeof(tuple_t);
70 numa_localize(mem, ntuples, nthreads);
71 }
72
73 return ret;
74}
75
76void
77seed_generator(unsigned int seed)
78{
79 srand(seed);
80 seedValue = seed;
81 seeded = 1;
82}
83
85static void
86check_seed()
87{
88 if(!seeded) {
89 seedValue = time(NULL);
90 srand(seedValue);
91 seeded = 1;
92 }
93}
94
95
101void
102knuth_shuffle(relation_t * relation)
103{
104 int i;
105 for (i = relation->num_tuples - 1; i > 0; i--) {
106 int64_t j = RAND_RANGE(i);
107 intkey_t tmp = relation->tuples[i].key;
108 relation->tuples[i].key = relation->tuples[j].key;
109 relation->tuples[j].key = tmp;
110 }
111}
112
113void
114knuth_shuffle48(relation_t * relation, unsigned short * state)
115{
116 int i;
117 for (i = relation->num_tuples - 1; i > 0; i--) {
118 int64_t j = RAND_RANGE48(i, state);
119 intkey_t tmp = relation->tuples[i].key;
120 relation->tuples[i].key = relation->tuples[j].key;
121 relation->tuples[j].key = tmp;
122 }
123}
124
129void
130random_unique_gen(relation_t *rel)
131{
132 uint64_t i;
133
134 for (i = 0; i < rel->num_tuples; i++) {
135 rel->tuples[i].key = (i+1);
136 rel->tuples[i].payload = i;
137 }
138
139 /* randomly shuffle elements */
140 knuth_shuffle(rel);
141}
142
144 relation_t rel;
145 int64_t firstkey;
146 int64_t maxid;
147 uint64_t ridstart;
148 relation_t * fullrel;
149 volatile void * locks;
150 pthread_barrier_t * barrier;
151};
152
153typedef struct create_arg_t create_arg_t;
154
158void *
159random_unique_gen_thread(void * args)
160{
161 create_arg_t * arg = (create_arg_t *) args;
162 relation_t * rel = & arg->rel;
163 int64_t firstkey = arg->firstkey;
164 int64_t maxid = arg->maxid;
165 uint64_t ridstart = arg->ridstart;
166 uint64_t i;
167
168 /* for randomly seeding nrand48() */
169 unsigned short state[3] = {0, 0, 0};
170 unsigned int seed = time(NULL) + * (unsigned int *) pthread_self();
171 memcpy(state, &seed, sizeof(seed));
172
173 for (i = 0; i < rel->num_tuples; i++) {
174 rel->tuples[i].key = firstkey;
175 rel->tuples[i].payload = ridstart + i;
176
177 if(firstkey == maxid)
178 firstkey = 0;
179
180 firstkey ++;
181 }
182
183 /* randomly shuffle elements */
184 /* knuth_shuffle48(rel, state); */
185
186 /* wait at a barrier until all threads finish initializing data */
187 BARRIER_ARRIVE(arg->barrier, i);
188
189 /* parallel synchronized knuth-shuffle */
190 volatile char * locks = (volatile char *)(arg->locks);
191 relation_t * fullrel = arg->fullrel;
192
193 uint64_t rel_offset_in_full = rel->tuples - fullrel->tuples;
194 uint64_t k = rel_offset_in_full + rel->num_tuples - 1;
195 for (i = rel->num_tuples - 1; i > 0; i--, k--) {
196 int64_t j = RAND_RANGE48(k, state);
197 lock(locks+k); /* lock this rel-idx=i, fullrel-idx=k */
198 lock(locks+j); /* lock full rel-idx=j */
199
200 intkey_t tmp = fullrel->tuples[k].key;
201 fullrel->tuples[k].key = fullrel->tuples[j].key;
202 fullrel->tuples[j].key = tmp;
203
204 unlock(locks+j);
205 unlock(locks+k);
206 }
207
208 return 0;
209}
210
214void *
215numa_localize_thread(void * args)
216{
217 create_arg_t * arg = (create_arg_t *) args;
218 relation_t * rel = & arg->rel;
219 uint64_t i;
220
221 for (i = 0; i < rel->num_tuples; i++) {
222 rel->tuples[i].key = 0;
223 }
224
225 return 0;
226}
227
228
233void
234read_relation(relation_t * rel, char * filename);
235
239void
240write_relation(relation_t * rel, char * filename)
241{
242 FILE * fp = fopen(filename, "w");
243 uint64_t i;
244
245 fprintf(fp, "#KEY, VAL\n");
246
247 for (i = 0; i < rel->num_tuples; i++) {
248 fprintf(fp, "%d %d\n", rel->tuples[i].key, rel->tuples[i].payload);
249 }
250
251 fclose(fp);
252}
253
258void
259random_gen(relation_t *rel, const int64_t maxid)
260{
261 uint64_t i;
262
263 for (i = 0; i < rel->num_tuples; i++) {
264 rel->tuples[i].key = RAND_RANGE(maxid);
265 rel->tuples[i].payload = i;
266 }
267}
268
269int
270create_relation_pk(relation_t *relation, int64_t num_tuples)
271{
272
273 check_seed();
274
275 relation->num_tuples = num_tuples;
276 relation->tuples = (tuple_t*)MALLOC(relation->num_tuples * sizeof(tuple_t));
277
278 if (!relation->tuples) {
279 perror("out of memory");
280 return -1;
281 }
282
283 random_unique_gen(relation);
284
285#ifdef PERSIST_RELATIONS
286 write_relation(relation, "R.tbl");
287#endif
288
289 return 0;
290}
291
292int
293parallel_create_relation(relation_t *relation, uint64_t num_tuples,
294 uint32_t nthreads, uint64_t maxid)
295{
296 int rv;
297 uint32_t i;
298 uint64_t offset = 0;
299
300 check_seed();
301
302 relation->num_tuples = num_tuples;
303
304 /* we need aligned allocation of items */
305 relation->tuples = (tuple_t*) MALLOC(num_tuples * sizeof(tuple_t));
306
307 if (!relation->tuples) {
308 perror("out of memory");
309 return -1;
310 }
311
312 create_arg_t args[nthreads];
313 pthread_t tid[nthreads];
314 cpu_set_t set;
315 pthread_attr_t attr;
316 pthread_barrier_t barrier;
317
318 unsigned int pagesize;
319 unsigned int npages;
320 unsigned int npages_perthr;
321 uint64_t ntuples_perthr;
322 uint64_t ntuples_lastthr;
323
324 pagesize = getpagesize();
325 npages = (num_tuples * sizeof(tuple_t)) / pagesize + 1;
326 npages_perthr = npages / nthreads;
327 ntuples_perthr = npages_perthr * (pagesize/sizeof(tuple_t));
328
329 if(npages_perthr == 0)
330 ntuples_perthr = num_tuples / nthreads;
331
332 ntuples_lastthr = num_tuples - ntuples_perthr * (nthreads-1);
333
334 pthread_attr_init(&attr);
335
336 rv = pthread_barrier_init(&barrier, NULL, nthreads);
337 if(rv != 0){
338 printf("[ERROR] Couldn't create the barrier\n");
339 exit(EXIT_FAILURE);
340 }
341
342
343 volatile void * locks = (volatile void *)calloc(num_tuples, sizeof(char));
344
345 for( i = 0; i < nthreads; i++ ) {
346 int cpu_idx = get_cpu_id(i);
347
348 CPU_ZERO(&set);
349 CPU_SET(cpu_idx, &set);
350 pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &set);
351
352 args[i].firstkey = (offset + 1) % maxid;
353 args[i].maxid = maxid;
354 args[i].ridstart = offset;
355 args[i].rel.tuples = relation->tuples + offset;
356 args[i].rel.num_tuples = (i == nthreads-1) ? ntuples_lastthr
357 : ntuples_perthr;
358
359 args[i].fullrel = relation;
360 args[i].locks = locks;
361 args[i].barrier = &barrier;
362
363 offset += ntuples_perthr;
364
365 rv = pthread_create(&tid[i], &attr, random_unique_gen_thread,
366 (void*)&args[i]);
367 if (rv){
368 fprintf(stderr, "[ERROR] pthread_create() return code is %d\n", rv);
369 exit(-1);
370 }
371 }
372
373 for(i = 0; i < nthreads; i++){
374 pthread_join(tid[i], NULL);
375 }
376
377 /* randomly shuffle elements */
378 /* knuth_shuffle(relation); */
379
380 /* clean up */
381 free((char*)locks);
382 pthread_barrier_destroy(&barrier);
383
384#ifdef PERSIST_RELATIONS
385 char * const tables[] = {"R.tbl", "S.tbl"};
386 static int rs = 0;
387 write_relation(relation, tables[(rs++)%2]);
388#endif
389
390 return 0;
391}
392
393int
394load_relation(relation_t * relation, char * filename, uint64_t num_tuples)
395{
396 relation->num_tuples = num_tuples;
397
398 /* we need aligned allocation of items */
399 relation->tuples = (tuple_t*) MALLOC(num_tuples * sizeof(tuple_t));
400
401 if (!relation->tuples) {
402 perror("out of memory");
403 return -1;
404 }
405
406 /* load from the given input file */
407 read_relation(relation, filename);
408
409 return 0;
410}
411
412int
413numa_localize(tuple_t * relation, int64_t num_tuples, uint32_t nthreads)
414{
415 uint32_t i, rv;
416 uint64_t offset = 0;
417
418 /* we need aligned allocation of items */
419 create_arg_t args[nthreads];
420 pthread_t tid[nthreads];
421 cpu_set_t set;
422 pthread_attr_t attr;
423
424 unsigned int pagesize;
425 unsigned int npages;
426 unsigned int npages_perthr;
427 uint64_t ntuples_perthr;
428 uint64_t ntuples_lastthr;
429
430 pagesize = getpagesize();
431 npages = (num_tuples * sizeof(tuple_t)) / pagesize + 1;
432 npages_perthr = npages / nthreads;
433 ntuples_perthr = npages_perthr * (pagesize/sizeof(tuple_t));
434 ntuples_lastthr = num_tuples - ntuples_perthr * (nthreads-1);
435
436 pthread_attr_init(&attr);
437
438 for( i = 0; i < nthreads; i++ ) {
439 int cpu_idx = get_cpu_id(i);
440
441 CPU_ZERO(&set);
442 CPU_SET(cpu_idx, &set);
443 pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &set);
444
445 args[i].firstkey = offset + 1;
446 args[i].rel.tuples = relation + offset;
447 args[i].rel.num_tuples = (i == nthreads-1) ? ntuples_lastthr
448 : ntuples_perthr;
449 offset += ntuples_perthr;
450
451 rv = pthread_create(&tid[i], &attr, numa_localize_thread,
452 (void*)&args[i]);
453 if (rv){
454 fprintf(stderr, "[ERROR] pthread_create() return code is %d\n", rv);
455 exit(-1);
456 }
457 }
458
459 for(i = 0; i < nthreads; i++){
460 pthread_join(tid[i], NULL);
461 }
462
463 return 0;
464}
465
466
467int
468create_relation_fk(relation_t *relation, int64_t num_tuples, const int64_t maxid)
469{
470 int32_t i, iters;
471 int64_t remainder;
472 relation_t tmp;
473
474 check_seed();
475
476 relation->num_tuples = num_tuples;
477 relation->tuples = (tuple_t*)MALLOC(relation->num_tuples * sizeof(tuple_t));
478
479 if (!relation->tuples) {
480 perror("out of memory");
481 return -1;
482 }
483
484 /* alternative generation method */
485 iters = num_tuples / maxid;
486 for(i = 0; i < iters; i++){
487 tmp.num_tuples = maxid;
488 tmp.tuples = relation->tuples + maxid * i;
489 random_unique_gen(&tmp);
490 }
491
492 /* if num_tuples is not an exact multiple of maxid */
493 remainder = num_tuples % maxid;
494 if(remainder > 0) {
495 tmp.num_tuples = remainder;
496 tmp.tuples = relation->tuples + maxid * iters;
497 random_unique_gen(&tmp);
498 }
499
500#ifdef PERSIST_RELATIONS
501 write_relation(relation, "S.tbl");
502#endif
503
504 return 0;
505}
506
518int
520 int64_t num_tuples)
521{
522 int rv, i, iters;
523 int64_t remainder;
524
525 rv = posix_memalign((void**)&fkrel->tuples, CACHE_LINE_SIZE,
526 num_tuples * sizeof(tuple_t) + RELATION_PADDING);
527
528 if (rv && !fkrel->tuples) {
529 perror("[ERROR] Out of memory");
530 return -1;
531 }
532
533 fkrel->num_tuples = num_tuples;
534
535 /* alternative generation method */
536 iters = num_tuples / pkrel->num_tuples;
537 for(i = 0; i < iters; i++){
538 memcpy(fkrel->tuples + i * pkrel->num_tuples, pkrel->tuples,
539 pkrel->num_tuples * sizeof(tuple_t));
540 }
541
542 /* if num_tuples is not an exact multiple of pkrel->num_tuples */
543 remainder = num_tuples % pkrel->num_tuples;
544 if(remainder > 0) {
545 memcpy(fkrel->tuples + i * pkrel->num_tuples, pkrel->tuples,
546 remainder * sizeof(tuple_t));
547 }
548
549 knuth_shuffle(fkrel);
550
551 return 0;
552}
553
554int create_relation_nonunique(relation_t *relation, int64_t num_tuples,
555 const int64_t maxid)
556{
557 check_seed();
558
559 relation->num_tuples = num_tuples;
560 relation->tuples = (tuple_t*)MALLOC(relation->num_tuples * sizeof(tuple_t));
561
562 if (!relation->tuples) {
563 perror("out of memory");
564 return -1;
565 }
566
567 random_gen(relation, maxid);
568
569 return 0;
570}
571
572double
573zipf_ggl(double * seed)
574{
575 double t, d2=0.2147483647e10;
576 t = *seed;
577 t = fmod(0.16807e5*t, d2);
578 *seed = t;
579 return (t-1.0e0)/(d2-1.0e0);
580}
581
582int
583create_relation_zipf(relation_t * relation, int64_t num_tuples,
584 const int64_t maxid, const double zipf_param)
585{
586 check_seed();
587
588 relation->num_tuples = num_tuples;
589 relation->tuples = (tuple_t*) MALLOC(relation->num_tuples * sizeof(tuple_t));
590
591 if (!relation->tuples) {
592 perror("out of memory");
593 return -1;
594 }
595
596 gen_zipf(num_tuples, maxid, zipf_param, &relation->tuples);
597
598 return 0;
599}
600
601void
603{
604 /* clean up */
605 FREE(rel->tuples, rel->num_tuples * sizeof(tuple_t));
606}
607
608void
609read_relation(relation_t * rel, char * filename){
610
611 FILE * fp = fopen(filename, "r");
612
613 /* skip the header line */
614 char c;
615 do{
616 c = fgetc(fp);
617 } while (c != '\n');
618
619 /* search for a whitespace for "key payload" format */
620 int fmtspace = 0;
621 int fmtcomma = 0;
622 int fmtpipe = 0;
623 do{
624 c = fgetc(fp);
625 if(c == ' '){
626 fmtspace = 1;
627 break;
628 }
629 if(c == ','){
630 fmtcomma = 1;
631 break;
632 }
633 if(c == '|'){
634 fmtpipe = 1;
635 break;
636 }
637 } while (c != '\n');
638
639 /* rewind back to the beginning and start parsing again */
640 rewind(fp);
641 /* skip the header line */
642 do{
643 c = fgetc(fp);
644 } while (c != '\n');
645
646 uint64_t ntuples = rel->num_tuples;
647 intkey_t key = 0;
648 value_t payload = 0;
649 int warn = 1;
650 for(uint64_t i = 0; i < ntuples; i++){
651 if(fmtspace){
652 assert(fscanf(fp, "%d %d", &key, &payload) == 2);
653 }
654 else if(fmtcomma){
655 assert(fscanf(fp, "%d,%d", &key, &payload) == 2);
656 } else if(fmtpipe){
657 assert(fscanf(fp, "%d|%d", &key, &payload) == 2);
658 }
659 else {
660 assert(fscanf(fp, "%d", &key) == 1);
661 }
662
663 if(warn && key < 0){
664 warn = 0;
665 printf("[WARN ] key=%d, payload=%d\n", key, payload);
666 }
667 rel->tuples[i].key = key;
668 rel->tuples[i].payload = payload;
669 }
670
671 fclose(fp);
672
673}
674
675#ifdef __cplusplus
676} // namespace eth_hashjoin
677#endif // __cplusplus
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
#define BARRIER_ARRIVE(B, RV)
Definition: barrier.h:29
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
item_t * gen_zipf(unsigned int stream_size, unsigned int alphabet_size, double zipf_factor, item_t **output)
Definition: genzipf.c:103
int create_relation_fk_from_pk(relation_t *fkrel, relation_t *pkrel, int64_t num_tuples)
Definition: generator.c:519
int create_relation_nonunique(relation_t *relation, int64_t num_tuples, const int64_t maxid)
Definition: generator.c:554
int parallel_create_relation(relation_t *relation, uint64_t num_tuples, uint32_t nthreads, uint64_t maxid)
Definition: generator.c:293
void delete_relation(relation_t *rel)
Definition: generator.c:602
void write_relation(relation_t *rel, char *filename)
Definition: generator.c:240
int create_relation_pk(relation_t *relation, int64_t num_tuples)
Definition: generator.c:270
int create_relation_zipf(relation_t *relation, int64_t num_tuples, const int64_t maxid, const double zipf_param)
Definition: generator.c:583
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
Definition: generator.c:413
void seed_generator(unsigned int seed)
Definition: generator.c:77
int create_relation_fk(relation_t *relation, int64_t num_tuples, const int64_t maxid)
Definition: generator.c:468
int load_relation(relation_t *relation, char *filename, uint64_t num_tuples)
Definition: generator.c:394
#define RELATION_PADDING
Definition: prj_params.h:92
int get_cpu_id(int thread_id)
Definition: cpu_mapping.c:78
int numalocalize
Definition: generator.c:47
A wrapper file ensuring 64bit key size.
Definition: generator64.hpp:10
#define CACHE_LINE_SIZE
Definition: npj_params.h:24
Constant parameters used by Parallel Radix Join implementations.
Definition: types.h:45