28#define RAND_RANGE(N) ((double)rand() / ((double)RAND_MAX + 1) * (N))
29#define RAND_RANGE48(N,STATE) ((double)nrand48(STATE)/((double)RAND_MAX+1)*(N))
30#define MALLOC(SZ) alloc_aligned(SZ+RELATION_PADDING)
31#define FREE(X,SZ) free(X)
35#define BARRIER_ARRIVE(B,RV) \
36 RV = pthread_barrier_wait(B); \
37 if(RV !=0 && RV != PTHREAD_BARRIER_SERIAL_THREAD){ \
38 printf("Couldn't wait on barrier\n"); \
51static unsigned int seedValue;
54alloc_aligned(
size_t size)
61 perror(
"[ERROR] alloc_aligned() failed: out of memory");
69 uint64_t ntuples = size /
sizeof(
tuple_t);
89 seedValue = time(NULL);
105 for (i = relation->num_tuples - 1; i > 0; i--) {
106 int64_t j = RAND_RANGE(i);
107 intkey_t tmp = relation->tuples[i].key;
108 relation->tuples[i].key = relation->tuples[j].key;
109 relation->tuples[j].key = tmp;
114knuth_shuffle48(
relation_t * relation,
unsigned short * state)
117 for (i = relation->num_tuples - 1; i > 0; i--) {
118 int64_t j = RAND_RANGE48(i, state);
119 intkey_t tmp = relation->tuples[i].key;
120 relation->tuples[i].key = relation->tuples[j].key;
121 relation->tuples[j].key = tmp;
134 for (i = 0; i < rel->num_tuples; i++) {
135 rel->tuples[i].key = (i+1);
136 rel->tuples[i].payload = i;
149 volatile void * locks;
150 pthread_barrier_t * barrier;
159random_unique_gen_thread(
void * args)
163 int64_t firstkey = arg->firstkey;
164 int64_t maxid = arg->maxid;
165 uint64_t ridstart = arg->ridstart;
169 unsigned short state[3] = {0, 0, 0};
170 unsigned int seed = time(NULL) + * (
unsigned int *) pthread_self();
171 memcpy(state, &seed,
sizeof(seed));
173 for (i = 0; i < rel->num_tuples; i++) {
174 rel->tuples[i].key = firstkey;
175 rel->tuples[i].payload = ridstart + i;
177 if(firstkey == maxid)
190 volatile char * locks = (
volatile char *)(arg->locks);
193 uint64_t rel_offset_in_full = rel->tuples - fullrel->tuples;
194 uint64_t k = rel_offset_in_full + rel->num_tuples - 1;
195 for (i = rel->num_tuples - 1; i > 0; i--, k--) {
196 int64_t j = RAND_RANGE48(k, state);
200 intkey_t tmp = fullrel->tuples[k].key;
201 fullrel->tuples[k].key = fullrel->tuples[j].key;
202 fullrel->tuples[j].key = tmp;
215numa_localize_thread(
void * args)
221 for (i = 0; i < rel->num_tuples; i++) {
222 rel->tuples[i].key = 0;
234read_relation(
relation_t * rel,
char * filename);
242 FILE * fp = fopen(filename,
"w");
245 fprintf(fp,
"#KEY, VAL\n");
247 for (i = 0; i < rel->num_tuples; i++) {
248 fprintf(fp,
"%d %d\n", rel->tuples[i].key, rel->tuples[i].payload);
259random_gen(
relation_t *rel,
const int64_t maxid)
263 for (i = 0; i < rel->num_tuples; i++) {
264 rel->tuples[i].key = RAND_RANGE(maxid);
265 rel->tuples[i].payload = i;
275 relation->num_tuples = num_tuples;
276 relation->tuples = (
tuple_t*)MALLOC(relation->num_tuples *
sizeof(
tuple_t));
278 if (!relation->tuples) {
279 perror(
"out of memory");
283 random_unique_gen(relation);
285#ifdef PERSIST_RELATIONS
294 uint32_t nthreads, uint64_t maxid)
302 relation->num_tuples = num_tuples;
305 relation->tuples = (
tuple_t*) MALLOC(num_tuples *
sizeof(
tuple_t));
307 if (!relation->tuples) {
308 perror(
"out of memory");
313 pthread_t tid[nthreads];
316 pthread_barrier_t barrier;
318 unsigned int pagesize;
320 unsigned int npages_perthr;
321 uint64_t ntuples_perthr;
322 uint64_t ntuples_lastthr;
324 pagesize = getpagesize();
325 npages = (num_tuples *
sizeof(
tuple_t)) / pagesize + 1;
326 npages_perthr = npages / nthreads;
327 ntuples_perthr = npages_perthr * (pagesize/
sizeof(
tuple_t));
329 if(npages_perthr == 0)
330 ntuples_perthr = num_tuples / nthreads;
332 ntuples_lastthr = num_tuples - ntuples_perthr * (nthreads-1);
334 pthread_attr_init(&attr);
336 rv = pthread_barrier_init(&barrier, NULL, nthreads);
338 printf(
"[ERROR] Couldn't create the barrier\n");
343 volatile void * locks = (
volatile void *)calloc(num_tuples,
sizeof(
char));
345 for( i = 0; i < nthreads; i++ ) {
349 CPU_SET(cpu_idx, &set);
350 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &set);
352 args[i].firstkey = (offset + 1) % maxid;
353 args[i].maxid = maxid;
354 args[i].ridstart = offset;
355 args[i].rel.tuples = relation->tuples + offset;
356 args[i].rel.num_tuples = (i == nthreads-1) ? ntuples_lastthr
359 args[i].fullrel = relation;
360 args[i].locks = locks;
361 args[i].barrier = &barrier;
363 offset += ntuples_perthr;
365 rv = pthread_create(&tid[i], &attr, random_unique_gen_thread,
368 fprintf(stderr,
"[ERROR] pthread_create() return code is %d\n", rv);
373 for(i = 0; i < nthreads; i++){
374 pthread_join(tid[i], NULL);
382 pthread_barrier_destroy(&barrier);
384#ifdef PERSIST_RELATIONS
385 char *
const tables[] = {
"R.tbl",
"S.tbl"};
396 relation->num_tuples = num_tuples;
399 relation->tuples = (
tuple_t*) MALLOC(num_tuples *
sizeof(
tuple_t));
401 if (!relation->tuples) {
402 perror(
"out of memory");
407 read_relation(relation, filename);
420 pthread_t tid[nthreads];
424 unsigned int pagesize;
426 unsigned int npages_perthr;
427 uint64_t ntuples_perthr;
428 uint64_t ntuples_lastthr;
430 pagesize = getpagesize();
431 npages = (num_tuples *
sizeof(
tuple_t)) / pagesize + 1;
432 npages_perthr = npages / nthreads;
433 ntuples_perthr = npages_perthr * (pagesize/
sizeof(
tuple_t));
434 ntuples_lastthr = num_tuples - ntuples_perthr * (nthreads-1);
436 pthread_attr_init(&attr);
438 for( i = 0; i < nthreads; i++ ) {
442 CPU_SET(cpu_idx, &set);
443 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &set);
445 args[i].firstkey = offset + 1;
446 args[i].rel.tuples = relation + offset;
447 args[i].rel.num_tuples = (i == nthreads-1) ? ntuples_lastthr
449 offset += ntuples_perthr;
451 rv = pthread_create(&tid[i], &attr, numa_localize_thread,
454 fprintf(stderr,
"[ERROR] pthread_create() return code is %d\n", rv);
459 for(i = 0; i < nthreads; i++){
460 pthread_join(tid[i], NULL);
476 relation->num_tuples = num_tuples;
477 relation->tuples = (
tuple_t*)MALLOC(relation->num_tuples *
sizeof(
tuple_t));
479 if (!relation->tuples) {
480 perror(
"out of memory");
485 iters = num_tuples / maxid;
486 for(i = 0; i < iters; i++){
487 tmp.num_tuples = maxid;
488 tmp.tuples = relation->tuples + maxid * i;
489 random_unique_gen(&tmp);
493 remainder = num_tuples % maxid;
495 tmp.num_tuples = remainder;
496 tmp.tuples = relation->tuples + maxid * iters;
497 random_unique_gen(&tmp);
500#ifdef PERSIST_RELATIONS
528 if (rv && !fkrel->tuples) {
529 perror(
"[ERROR] Out of memory");
533 fkrel->num_tuples = num_tuples;
536 iters = num_tuples / pkrel->num_tuples;
537 for(i = 0; i < iters; i++){
538 memcpy(fkrel->tuples + i * pkrel->num_tuples, pkrel->tuples,
539 pkrel->num_tuples *
sizeof(
tuple_t));
543 remainder = num_tuples % pkrel->num_tuples;
545 memcpy(fkrel->tuples + i * pkrel->num_tuples, pkrel->tuples,
549 knuth_shuffle(fkrel);
559 relation->num_tuples = num_tuples;
560 relation->tuples = (
tuple_t*)MALLOC(relation->num_tuples *
sizeof(
tuple_t));
562 if (!relation->tuples) {
563 perror(
"out of memory");
567 random_gen(relation, maxid);
573zipf_ggl(
double * seed)
575 double t, d2=0.2147483647e10;
577 t = fmod(0.16807e5*t, d2);
579 return (t-1.0e0)/(d2-1.0e0);
584 const int64_t maxid,
const double zipf_param)
588 relation->num_tuples = num_tuples;
589 relation->tuples = (
tuple_t*) MALLOC(relation->num_tuples *
sizeof(
tuple_t));
591 if (!relation->tuples) {
592 perror(
"out of memory");
596 gen_zipf(num_tuples, maxid, zipf_param, &relation->tuples);
605 FREE(rel->tuples, rel->num_tuples *
sizeof(
tuple_t));
609read_relation(
relation_t * rel,
char * filename){
611 FILE * fp = fopen(filename,
"r");
646 uint64_t ntuples = rel->num_tuples;
650 for(uint64_t i = 0; i < ntuples; i++){
652 assert(fscanf(fp,
"%d %d", &key, &payload) == 2);
655 assert(fscanf(fp,
"%d,%d", &key, &payload) == 2);
657 assert(fscanf(fp,
"%d|%d", &key, &payload) == 2);
660 assert(fscanf(fp,
"%d", &key) == 1);
665 printf(
"[WARN ] key=%d, payload=%d\n", key, payload);
667 rel->tuples[i].key = key;
668 rel->tuples[i].payload = payload;
Affinity methods on Mac OS X. Mac OS X does not export interfaces that identify processors or control...
#define BARRIER_ARRIVE(B, RV)
Provides cpu mapping utility function.
Provides methods to generate data sets of various types.
item_t * gen_zipf(unsigned int stream_size, unsigned int alphabet_size, double zipf_factor, item_t **output)
int create_relation_fk_from_pk(relation_t *fkrel, relation_t *pkrel, int64_t num_tuples)
int create_relation_nonunique(relation_t *relation, int64_t num_tuples, const int64_t maxid)
int parallel_create_relation(relation_t *relation, uint64_t num_tuples, uint32_t nthreads, uint64_t maxid)
void delete_relation(relation_t *rel)
void write_relation(relation_t *rel, char *filename)
int create_relation_pk(relation_t *relation, int64_t num_tuples)
int create_relation_zipf(relation_t *relation, int64_t num_tuples, const int64_t maxid, const double zipf_param)
int numa_localize(tuple_t *relation, int64_t num_tuples, uint32_t nthreads)
void seed_generator(unsigned int seed)
int create_relation_fk(relation_t *relation, int64_t num_tuples, const int64_t maxid)
int load_relation(relation_t *relation, char *filename, uint64_t num_tuples)
int get_cpu_id(int thread_id)
A wrapper file ensuring 64bit key size.
Constant parameters used by Parallel Radix Join implementations.