Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
main.c
Go to the documentation of this file.
1
235#ifndef _GNU_SOURCE
236#define _GNU_SOURCE
237#endif
238#include <sched.h> /* sched_setaffinity */
239#include <stdio.h> /* printf */
240#include <sys/time.h> /* gettimeofday */
241#include <getopt.h> /* getopt */
242#include <stdlib.h> /* exit */
243#include <string.h> /* strcmp */
244#include <limits.h> /* INT_MAX */
245
246#include "no_partitioning_join.h" /* no partitioning joins: NPO, NPO_st */
247#include "parallel_radix_join.h" /* parallel radix joins: RJ, PRO, PRH, PRHO */
248#include "generator.h" /* create_relation_xk */
249
250#include "constants.h" /* DEFAULT_R_SEED, DEFAULT_R_SEED */
251#include "perf_counters.h" /* PCM_x */
252#include "affinity.h" /* pthread_attr_setaffinity_np & sched_setaffinity */
253#include "../config.h" /* autoconf header */
254
255#ifdef JOIN_RESULT_MATERIALIZE
256#include "tuple_buffer.h" /* for materialization */
257#endif
258
259#if !defined(__cplusplus)
260int getopt(int argc, char * const argv[],
261 const char *optstring);
262#endif
263
264typedef struct algo_t algo_t;
265typedef struct param_t param_t;
266
267struct algo_t {
268 char name[128];
269 result_t * (*joinAlgo)(relation_t * , relation_t *, int);
270};
271
272struct param_t {
273 algo_t * algo;
274 uint32_t nthreads;
275 uint64_t r_size;
276 uint64_t s_size;
277 uint32_t r_seed;
278 uint32_t s_seed;
279 double skew;
280 int nonunique_keys; /* non-unique keys allowed? */
281 int verbose;
282 int fullrange_keys; /* keys covers full int range? */
283 int basic_numa;/* alloc input chunks thread local? */
284 char * perfconf;
285 char * perfout;
287 char * loadfileR;
288 char * loadfileS;
289};
290
291extern char * optarg;
292extern int optind, opterr, optopt;
293
295extern int numalocalize; /* defined in generator.c */
296extern int nthreads; /* defined in generator.c */
297
299static struct algo_t algos [] =
300 {
301 {"PRO", PRO},
302 {"RJ", RJ},
303 {"PRH", PRH},
304 {"PRHO", PRHO},
305 {"NPO", NPO},
306 {"NPO_st", NPO_st}, /* NPO single threaded */
307 {{0}, 0}
308 };
309
310/* command line handling functions */
311void
312print_help();
313
314void
315print_version();
316
317void
318parse_args(int argc, char ** argv, param_t * cmd_params);
319
320int
321main(int argc, char ** argv)
322{
323 relation_t relR;
324 relation_t relS;
325 result_t * results;
326
327 /* start initially on CPU-0 */
328 cpu_set_t set;
329 CPU_ZERO(&set);
330 CPU_SET(0, &set);
331 if (sched_setaffinity(0, sizeof(set), &set) <0) {
332 perror("sched_setaffinity");
333 }
334
335 /* Command line parameters */
336 param_t cmd_params;
337
338 /* Default values if not specified on command line */
339 cmd_params.algo = &algos[0]; /* PRO */
340 cmd_params.nthreads = 2;
341 /* default dataset is Workload B (described in paper) */
342 cmd_params.r_size = 128000000;
343 cmd_params.s_size = 128000000;
344 cmd_params.r_seed = DEFAULT_R_SEED;
345 cmd_params.s_seed = DEFAULT_S_SEED;
346 cmd_params.skew = 0.0;
347 cmd_params.verbose = 0;
348 cmd_params.perfconf = NULL;
349 cmd_params.perfout = NULL;
350 cmd_params.nonunique_keys = 0;
351 cmd_params.fullrange_keys = 0;
352 cmd_params.basic_numa = 0;
353 cmd_params.loadfileR = NULL;
354 cmd_params.loadfileS = NULL;
355
356 parse_args(argc, argv, &cmd_params);
357
358#ifdef PERF_COUNTERS
359 PCM_CONFIG = cmd_params.perfconf;
360 PCM_OUT = cmd_params.perfout;
361#endif
362
363 /* create relation R */
364 fprintf(stderr,
365 "[INFO ] %s relation R with size = %.3lf MiB, #tuples = %llu : ",
366 (cmd_params.loadfileS != NULL)?("Loading"):("Creating"),
367 (double) sizeof(tuple_t) * cmd_params.r_size/1024.0/1024.0,
368 cmd_params.r_size);
369 fflush(stderr);
370
371 seed_generator(cmd_params.r_seed);
372
373 /* to pass information to the create_relation methods */
374 numalocalize = cmd_params.basic_numa;
375 nthreads = cmd_params.nthreads;
376
377 if(cmd_params.loadfileR != NULL){
378 /* load relation from file */
379 load_relation(&relR, cmd_params.loadfileR, cmd_params.r_size);
380 }
381 else if(cmd_params.fullrange_keys) {
382 create_relation_nonunique(&relR, cmd_params.r_size, INT_MAX);
383 }
384 else if(cmd_params.nonunique_keys) {
385 create_relation_nonunique(&relR, cmd_params.r_size, cmd_params.r_size);
386 }
387 else {
388 //create_relation_pk(&relR, cmd_params.r_size);
389 parallel_create_relation(&relR, cmd_params.r_size,
390 nthreads,
391 cmd_params.r_size);
392 }
393 fprintf(stderr, "OK \n");
394
395 /* create relation S */
396 fprintf(stderr,
397 "[INFO ] %s relation S with size = %.3lf MiB, #tuples = %lld : ",
398 (cmd_params.loadfileS != NULL)?("Loading"):("Creating"),
399 (double) sizeof(tuple_t) * cmd_params.s_size/1024.0/1024.0,
400 cmd_params.s_size);
401 fflush(stderr);
402
403 seed_generator(cmd_params.s_seed);
404
405 if(cmd_params.loadfileS != NULL){
406 /* load relation from file */
407 load_relation(&relS, cmd_params.loadfileS, cmd_params.s_size);
408 }
409 else if(cmd_params.fullrange_keys) {
410 create_relation_fk_from_pk(&relS, &relR, cmd_params.s_size);
411 }
412 else if(cmd_params.nonunique_keys) {
413 /* use size of R as the maxid */
414 create_relation_nonunique(&relS, cmd_params.s_size, cmd_params.r_size);
415 }
416 else {
417 /* if r_size == s_size then equal-dataset, else non-equal dataset */
418
419 if(cmd_params.skew > 0){
420 /* S is skewed */
421 create_relation_zipf(&relS, cmd_params.s_size,
422 cmd_params.r_size, cmd_params.skew);
423 }
424 else {
425 /* S is uniform foreign key */
426 //create_relation_fk(&relS, cmd_params.s_size, cmd_params.r_size);
427 parallel_create_relation(&relS, cmd_params.s_size,
428 nthreads,
429 cmd_params.r_size);
430 }
431 }
432 fprintf(stderr, "OK \n");
433
434
435 /* Run the selected join algorithm */
436 fprintf(stderr, "[INFO ] Running join algorithm %s ...\n", cmd_params.algo->name);
437 fprintf(stderr, "[INFO ] NUM_PASSES %d | NUM_RADIX_BITS %d\n", NUM_PASSES, NUM_RADIX_BITS);
438
439 results = cmd_params.algo->joinAlgo(&relR, &relS, cmd_params.nthreads);
440
441 fprintf(stderr, "[INFO ] Results = %llu. DONE.\n", results->totalresults);
442
443#if (defined(PERSIST_RELATIONS) && defined(JOIN_RESULT_MATERIALIZE))
444 fprintf(stderr, "[INFO ] Persisting the join result to \"Out.tbl\" ...\n");
445 write_result_relation(results, "Out.tbl");
446#endif
447
448 /* clean-up */
449 delete_relation(&relR);
450 delete_relation(&relS);
451 free(results);
452#ifdef JOIN_RESULT_MATERIALIZE
453 free(results->resultlist);
454#endif
455
456 return 0;
457}
458
459/* command line handling functions */
460void
461print_help(char * progname)
462{
463 printf("Usage: %s [options]\n", progname);
464
465 printf("\
466 Join algorithm selection, algorithms : RJ, PRO, PRH, PRHO, NPO, NPO_st \n\
467 -a --algo=<name> Run the hash join algorithm named <name> [PRO] \n\
468 \n\
469 Other join configuration options, with default values in [] : \n\
470 -n --nthreads=<N> Number of threads to use <N> [2] \n\
471 -r --r-size=<R> Number of tuples in build relation R <R> [128000000]\n\
472 -s --s-size=<S> Number of tuples in probe relation S <S> [128000000]\n\
473 -x --r-seed=<x> Seed value for generating relation R <x> [12345] \n\
474 -y --s-seed=<y> Seed value for generating relation S <y> [54321] \n\
475 -z --skew=<z> Zipf skew parameter for probe relation S <z> [0.0] \n\
476 -R --r-file=<Rf> The file to load build relation R from <Rf> [R.tbl] \n\
477 -S --s-file=<Sf> The file to load probe relation S from <Sf> [S.tbl] \n\
478 --non-unique Use non-unique (duplicated) keys in input relations \n\
479 --full-range Spread keys in relns. in full 32-bit integer range \n\
480 --basic-numa Numa-localize relations to threads (Experimental) \n\
481 \n\
482 Performance profiling options, when compiled with --enable-perfcounters. \n\
483 -p --perfconf=<P> Intel PCM config file with upto 4 counters [none] \n\
484 -o --perfout=<O> Output file to print performance counters [stdout] \n\
485 \n\
486 Basic user options \n\
487 -h --help Show this message \n\
488 --verbose Be more verbose -- show misc extra info \n\
489 --version Show version \n\
490 \n");
491}
492
493void
494print_version()
495{
496 printf("\n%s\n", PACKAGE_STRING);
497 printf("Copyright (c) 2012, 2013, ETH Zurich, Systems Group.\n");
498 printf("http://www.systems.ethz.ch/projects/paralleljoins\n\n");
499}
500
501static char *
502mystrdup (const char *s)
503{
504 char *ss = (char*) malloc (strlen (s) + 1);
505
506 if (ss != NULL)
507 memcpy (ss, s, strlen(s) + 1);
508
509 return ss;
510}
511
512void
513parse_args(int argc, char ** argv, param_t * cmd_params)
514{
515
516 int c, i, found;
517 /* Flag set by ‘--verbose’. */
518 static int verbose_flag;
519 static int nonunique_flag;
520 static int fullrange_flag;
521 static int basic_numa;
522
523 while(1) {
524 static struct option long_options[] =
525 {
526 /* These options set a flag. */
527 {"verbose", no_argument, &verbose_flag, 1},
528 {"brief", no_argument, &verbose_flag, 0},
529 {"non-unique", no_argument, &nonunique_flag, 1},
530 {"full-range", no_argument, &fullrange_flag, 1},
531 {"basic-numa", no_argument, &basic_numa, 1},
532 {"help", no_argument, 0, 'h'},
533 {"version", no_argument, 0, 'v'},
534 /* These options don't set a flag.
535 We distinguish them by their indices. */
536 {"algo", required_argument, 0, 'a'},
537 {"nthreads",required_argument, 0, 'n'},
538 {"perfconf",required_argument, 0, 'p'},
539 {"r-size", required_argument, 0, 'r'},
540 {"s-size", required_argument, 0, 's'},
541 {"perfout", required_argument, 0, 'o'},
542 {"r-seed", required_argument, 0, 'x'},
543 {"s-seed", required_argument, 0, 'y'},
544 {"skew", required_argument, 0, 'z'},
545 {"r-file", required_argument, 0, 'R'},
546 {"s-file", required_argument, 0, 'S'},
547 {0, 0, 0, 0}
548 };
549 /* getopt_long stores the option index here. */
550 int option_index = 0;
551
552 c = getopt_long (argc, argv, "a:n:p:r:s:o:x:y:z:R:S:hv",
553 long_options, &option_index);
554
555 /* Detect the end of the options. */
556 if (c == -1)
557 break;
558 switch (c)
559 {
560 case 0:
561 /* If this option set a flag, do nothing else now. */
562 if (long_options[option_index].flag != 0)
563 break;
564 printf ("option %s", long_options[option_index].name);
565 if (optarg)
566 printf (" with arg %s", optarg);
567 printf ("\n");
568 break;
569
570 case 'a':
571 i = 0; found = 0;
572 while(algos[i].joinAlgo) {
573 if(strcmp(optarg, algos[i].name) == 0) {
574 cmd_params->algo = &algos[i];
575 found = 1;
576 break;
577 }
578 i++;
579 }
580
581 if(found == 0) {
582 printf("[ERROR] Join algorithm named `%s' does not exist!\n",
583 optarg);
584 print_help(argv[0]);
585 exit(EXIT_SUCCESS);
586 }
587 break;
588
589 case 'h':
590 case '?':
591 /* getopt_long already printed an error message. */
592 print_help(argv[0]);
593 exit(EXIT_SUCCESS);
594 break;
595
596 case 'v':
597 print_version();
598 exit(EXIT_SUCCESS);
599 break;
600
601 case 'n':
602 cmd_params->nthreads = atoi(optarg);
603 break;
604
605 case 'p':
606 cmd_params->perfconf = mystrdup(optarg);
607 break;
608
609 case 'r':
610 cmd_params->r_size = atol(optarg);
611 break;
612
613 case 's':
614 cmd_params->s_size = atol(optarg);
615 break;
616
617 case 'o':
618 cmd_params->perfout = mystrdup(optarg);
619 break;
620
621 case 'x':
622 cmd_params->r_seed = atoi(optarg);
623 break;
624
625 case 'y':
626 cmd_params->s_seed = atoi(optarg);
627 break;
628
629 case 'z':
630 cmd_params->skew = atof(optarg);
631 break;
632
633 case 'R':
634 cmd_params->loadfileR = mystrdup(optarg);
635 break;
636
637 case 'S':
638 cmd_params->loadfileS = mystrdup(optarg);
639 break;
640
641 default:
642 break;
643 }
644 }
645
646 /* if (verbose_flag) */
647 /* printf ("verbose flag is set \n"); */
648
649 cmd_params->nonunique_keys = nonunique_flag;
650 cmd_params->verbose = verbose_flag;
651 cmd_params->fullrange_keys = fullrange_flag;
652 cmd_params->basic_numa = basic_numa;
653
654 /* Print any remaining command line arguments (not options). */
655 if (optind < argc) {
656 printf ("non-option arguments: ");
657 while (optind < argc)
658 printf ("%s ", argv[optind++]);
659 printf ("\n");
660 }
661}
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
Provides methods to generate data sets of various types.
int create_relation_fk_from_pk(relation_t *fkrel, relation_t *pkrel, int64_t num_tuples)
Definition: generator.c:519
int create_relation_nonunique(relation_t *relation, int64_t num_tuples, const int64_t maxid)
Definition: generator.c:554
int parallel_create_relation(relation_t *relation, uint64_t num_tuples, uint32_t nthreads, uint64_t maxid)
Definition: generator.c:293
void delete_relation(relation_t *rel)
Definition: generator.c:602
int create_relation_zipf(relation_t *relation, int64_t num_tuples, const int64_t maxid, const double zipf_param)
Definition: generator.c:583
void seed_generator(unsigned int seed)
Definition: generator.c:77
int load_relation(relation_t *relation, char *filename, uint64_t num_tuples)
Definition: generator.c:394
result_t * NPO(relation_t *relR, relation_t *relS, int nthreads)
result_t * NPO_st(relation_t *relR, relation_t *relS, int nthreads)
char * PCM_OUT
char * PCM_CONFIG
result_t * PRH(relation_t *relR, relation_t *relS, int nthreads)
result_t * PRO(relation_t *relR, relation_t *relS, int nthreads)
result_t * RJ(relation_t *relR, relation_t *relS, int nthreads)
result_t * PRHO(relation_t *relR, relation_t *relS, int nthreads)
int numalocalize
Definition: generator.c:47
The interface of No partitioning optimized (NPO) join algorithm.
Provides interfaces for several variants of Radix Hash Join.
An interface to the Intel Performance Counters Monitoring.
#define NUM_PASSES
Definition: prj_params.h:21
#define NUM_RADIX_BITS
Definition: prj_params.h:16
Definition: main.c:267
Definition: main.c:272
char * loadfileR
Definition: main.c:287
Definition: types.h:74
Definition: types.h:45
Implements a chained-buffer storage model for tuples.