Multi-core Hash Joins
Main-memory hash join implementations for multi-core CPUs
task_queue.h
Go to the documentation of this file.
1
10#ifndef TASK_QUEUE_H
11#define TASK_QUEUE_H
12
13#include <pthread.h>
14#include <stdlib.h>
15
16#include "types.h" /* relation_t, int32_t */
17
23typedef struct task_t task_t;
24typedef struct task_list_t task_list_t;
25typedef struct task_queue_t task_queue_t;
26
27struct task_t {
28 relation_t relR;
29 relation_t tmpR;
30 relation_t relS;
31 relation_t tmpS;
32 task_t * next;
33};
34
36 task_t * tasks;
37 task_list_t * next;
38 int curr;
39};
40
42 pthread_mutex_t lock;
43 pthread_mutex_t alloc_lock;
44 task_t * head;
45 task_list_t * free_list;
46 int32_t count;
47 int32_t alloc_size;
48};
49
50inline
51task_t *
52get_next_task(task_queue_t * tq) __attribute__((always_inline));
53
54inline
55void
56add_tasks(task_queue_t * tq, task_t * t) __attribute__((always_inline));
57
58inline
59task_t *
60get_next_task(task_queue_t * tq)
61{
62 pthread_mutex_lock(&tq->lock);
63 task_t * ret = 0;
64 if(tq->count > 0){
65 ret = tq->head;
66 tq->head = ret->next;
67 tq->count --;
68 }
69 pthread_mutex_unlock(&tq->lock);
70
71 return ret;
72}
73
74inline
75void
76add_tasks(task_queue_t * tq, task_t * t)
77{
78 pthread_mutex_lock(&tq->lock);
79 t->next = tq->head;
80 tq->head = t;
81 tq->count ++;
82 pthread_mutex_unlock(&tq->lock);
83}
84
85/* atomically get the next available task */
86inline
87task_t *
88task_queue_get_atomic(task_queue_t * tq) __attribute__((always_inline));
89
90/* atomically add a task */
91inline
92void
93task_queue_add_atomic(task_queue_t * tq, task_t * t)
94 __attribute__((always_inline));
95
96inline
97void
98task_queue_add(task_queue_t * tq, task_t * t) __attribute__((always_inline));
99
100inline
101void
102task_queue_copy_atomic(task_queue_t * tq, task_t * t)
103 __attribute__((always_inline));
104
105/* get a free slot of task_t */
106inline
107task_t *
108task_queue_get_slot_atomic(task_queue_t * tq) __attribute__((always_inline));
109
110inline
111task_t *
112task_queue_get_slot(task_queue_t * tq) __attribute__((always_inline));
113
114/* initialize a task queue with given allocation block size */
116task_queue_init(int alloc_size);
117
118void
119task_queue_free(task_queue_t * tq);
120
121/**************** DEFINITIONS ********************************************/
122
123inline
124task_t *
125task_queue_get_atomic(task_queue_t * tq)
126{
127 pthread_mutex_lock(&tq->lock);
128 task_t * ret = 0;
129 if(tq->count > 0){
130 ret = tq->head;
131 tq->head = ret->next;
132 tq->count --;
133 }
134 pthread_mutex_unlock(&tq->lock);
135
136 return ret;
137}
138
139inline
140void
141task_queue_add_atomic(task_queue_t * tq, task_t * t)
142{
143 pthread_mutex_lock(&tq->lock);
144 t->next = tq->head;
145 tq->head = t;
146 tq->count ++;
147 pthread_mutex_unlock(&tq->lock);
148
149}
150
151inline
152void
153task_queue_add(task_queue_t * tq, task_t * t)
154{
155 t->next = tq->head;
156 tq->head = t;
157 tq->count ++;
158}
159
160/* sorted add
161inline
162void
163task_queue_add_atomic(task_queue_t * tq, task_t * t)
164{
165 pthread_mutex_lock(&tq->lock);
166 task_queue_add(tq, t);
167 pthread_mutex_unlock(&tq->lock);
168
169}
170
171inline
172int32_t
173maxtuples(task_t * t) __attribute__((always_inline));
174
175inline
176int32_t
177maxtuples(task_t * t)
178{
179 int32_t max = t->relS.num_tuples;
180 if(t->relR.num_tuples > max)
181 max = t->relR.num_tuples;
182
183 return max;
184}
185
186inline
187void
188task_queue_add(task_queue_t * tq, task_t * t)
189{
190 int32_t maxnew;
191
192 if(tq->head == NULL ||
193 ((maxnew = maxtuples(t)) >= maxtuples(tq->head))) {
194
195 t->next = tq->head;
196 tq->head = t;
197 tq->count ++;
198 return;
199
200 }
201
202 task_t * prev, * curr;
203 prev = tq->head;
204 curr = tq->head->next;
205
206 while(curr) {
207 if(maxnew < maxtuples(curr)) {
208 prev = curr;
209 curr = curr->next;
210 }
211 else
212 break;
213 }
214
215 if(curr) {
216 t->next = curr->next;
217 curr->next = t;
218 }
219 else {
220 t->next = prev->next;
221 prev->next = t;
222 }
223
224 tq->count ++;
225 return;
226}
227*/
228
229inline
230void
231task_queue_copy_atomic(task_queue_t * tq, task_t * t)
232{
233 pthread_mutex_lock(&tq->lock);
234 task_t * slot = task_queue_get_slot(tq);
235 *slot = *t; /* copy */
236 task_queue_add(tq, slot);
237 pthread_mutex_unlock(&tq->lock);
238}
239
240inline
241task_t *
242task_queue_get_slot(task_queue_t * tq)
243{
244 task_list_t * l = tq->free_list;
245 task_t * ret;
246 if(l->curr < tq->alloc_size) {
247 ret = &(l->tasks[l->curr]);
248 l->curr++;
249 }
250 else {
251 task_list_t * nl = (task_list_t*) malloc(sizeof(task_list_t));
252 nl->tasks = (task_t*) malloc(tq->alloc_size * sizeof(task_t));
253 nl->curr = 1;
254 nl->next = tq->free_list;
255 tq->free_list = nl;
256 ret = &(nl->tasks[0]);
257 }
258
259 return ret;
260}
261
262/* get a free slot of task_t */
263inline
264task_t *
265task_queue_get_slot_atomic(task_queue_t * tq)
266{
267 pthread_mutex_lock(&tq->alloc_lock);
268 task_t * ret = task_queue_get_slot(tq);
269 pthread_mutex_unlock(&tq->alloc_lock);
270
271 return ret;
272}
273
274/* initialize a task queue with given allocation block size */
276task_queue_init(int alloc_size)
277{
278 task_queue_t * ret = (task_queue_t*) malloc(sizeof(task_queue_t));
279 ret->free_list = (task_list_t*) malloc(sizeof(task_list_t));
280 ret->free_list->tasks = (task_t*) malloc(alloc_size * sizeof(task_t));
281 ret->free_list->curr = 0;
282 ret->free_list->next = NULL;
283 ret->count = 0;
284 ret->alloc_size = alloc_size;
285 ret->head = NULL;
286 pthread_mutex_init(&ret->lock, NULL);
287 pthread_mutex_init(&ret->alloc_lock, NULL);
288
289 return ret;
290}
291
292void
293task_queue_free(task_queue_t * tq)
294{
295 task_list_t * tmp = tq->free_list;
296 while(tmp) {
297 free(tmp->tasks);
298 task_list_t * tmp2 = tmp->next;
299 free(tmp);
300 tmp = tmp2;
301 }
302 free(tq);
303}
304
307#endif /* TASK_QUEUE_H */
Provides general type definitions used by all join algorithms.