# Threads --- ## Object Definitions ```c typedef struct _kpl_task kpl_task; typedef void kpl_task_fn(kpl_task *t); typedef struct _task { task *_Atomic next, *join; kpl_group *state; kpl_result return_value; task_fn *fn; int32_t thread_id; _Atomic bool join_ready; } task; #define KPL_TASK_SLAB_SIZE 50 typedef struct _kpl_task_slab { size_t array_index; struct _kpl_task_slab *next; kpl_task array[KPL_TASK_SLAB_SIZE]; } kpl_task_slab; #define KPL_QUEUE_ASYNC -1 #define KPL_MAIN_THREAD 0 typedef struct { kpl_task *_Atomic head, *_Atomic tail; struct { kpl_task *_Atomic next; } dummy; } kpl_atomic_queue; typedef struct { kpl_atomic_queue queue; _Atomic ssize_t priority; kpl_task_slab *slab; kpl_task *pool; sem_t counter; pthread_t thread; } kpl_thread; ``` ## Initialization On the start of each thread, set the cpu affinity to its thread id ## Task State ```text 0 -> length native : arguments process : arguments, locals, parent closure : arguments, locals, closure function iterator : arguments, locals, iterator functions, iterator function index ``` ## Per Process Task Queue ## Asynchronous Task Queue # Queuing ## Sync ## Async # Running ## Joining # Example ```c // gcc -std=gnu99 -Wall -Wextra -O2 -fhardened -fno-omit-frame-pointer -o thread_queue thread_queue.c #define _GNU_SOURCE #include #include #include #include #include #include #include #include #define I 50 #define FIB 25 typedef struct _task task; typedef void task_fn(task *t); #define TASK_STATE_SIZE 55 typedef struct _task { task *_Atomic next, *join; void *state[TASK_STATE_SIZE]; void *return_value; task_fn *fn; int32_t thread_id; _Atomic bool join_ready; } task; #define TASK_SLAB_SIZE 50 typedef struct _task_slab { size_t array_index; struct _task_slab *next; task array[TASK_SLAB_SIZE]; } task_slab; #define QUEUE_ASYNC -1 #define MAIN_THREAD 0 typedef struct { task *_Atomic head, *_Atomic tail; struct { task *_Atomic next; } dummy; } atomic_queue; typedef struct { atomic_queue queue; _Atomic ssize_t priority; task_slab *slab; task *pool; sem_t counter; pthread_t thread; } thread; int32_t avaiable_threads = {}; #define MAX_THREADS 64 thread threads[MAX_THREADS]; _Atomic ssize_t total_priority = {}; void priority_increment(int32_t thread_id) { threads[thread_id].priority++; total_priority++; } void priority_decrement(int32_t thread_id) { threads[thread_id].priority--; total_priority--; } void task_slab_init(int32_t thread_id) { task_slab *slab = calloc(1, sizeof(task_slab)); slab->next = threads[thread_id].slab; threads[thread_id].slab = slab; } task *task_slab_get(int32_t thread_id) { if (threads[thread_id].slab->array_index < TASK_SLAB_SIZE) return &threads[thread_id].slab->array[threads[thread_id].slab->array_index++]; task_slab_init(thread_id); return task_slab_get(thread_id); } void task_slab_free(int32_t thread_id) { task_slab *slab = threads[thread_id].slab; while (slab) { task_slab *tmp = slab; slab = slab->next; free(tmp); } } void task_queue_init(int32_t thread_id) { threads[thread_id].queue.head = (task*) &threads[thread_id].queue.dummy; threads[thread_id].queue.tail = (task*) &threads[thread_id].queue.dummy; threads[thread_id].queue.dummy.next = NULL; threads[thread_id].priority = 1; threads[thread_id].pool = NULL; } void task_queue_add(int32_t thread_id, task *t) { t->next = NULL; task *head = __atomic_exchange_n(&threads[thread_id].queue.head, t, __ATOMIC_SEQ_CST); head->next = t; if (t != (task*) &threads[thread_id].queue.dummy) priority_increment(thread_id); } task *task_queue_next(int32_t thread_id) { task *t = NULL; for (;;) { task *tail = threads[thread_id].queue.tail, *next = tail->next; if (tail == (task*) &threads[thread_id].queue.dummy) { if (!next) break; threads[thread_id].queue.tail = next; tail = next; next = tail->next; } if (next) { threads[thread_id].queue.tail = next; t = tail; break; } task *head = threads[thread_id].queue.head; if (tail != head) continue; task_queue_add(thread_id, (task*) &threads[thread_id].queue.dummy); next = tail->next; if (next) { threads[thread_id].queue.tail = next; t = tail; break; } } if (t) { t->next = NULL; priority_decrement(thread_id); } return t; } task *task_init(task_fn *fn, int32_t thread_id) { task *t = NULL; if (threads[thread_id].pool) { t = threads[thread_id].pool; threads[thread_id].pool = threads[thread_id].pool->join; } if (!t) t = task_slab_get(thread_id); __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); t->thread_id = QUEUE_ASYNC; for (ssize_t state_id = 0; state_id < TASK_STATE_SIZE; state_id++) t->state[state_id] = NULL; t->return_value = NULL; t->next = NULL; t->join = NULL; t->fn = fn; return t; } void task_free(task *t) { t->join = threads[t->thread_id].pool; threads[t->thread_id].pool = t; } void task_queue_async(task *t) { int32_t queue_thread_id = 0; ssize_t queue_priority = threads[0].priority; for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) { if (threads[thread_id].priority < queue_priority) { queue_priority = threads[thread_id].priority; queue_thread_id = thread_id; } } task_queue_add(queue_thread_id, t); sem_post(&threads[queue_thread_id].counter); } void task_done(task *t) { if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) return; task *join = t->join; t->join = NULL; __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); if (join->thread_id == t->thread_id) task_queue_add(t->thread_id, join); else task_queue_async(join); } void task_join(task *restrict t, task *restrict join, task_fn *fn) { join->fn = fn; t->join = join; task_done(t); } void *task_loop(void *arg) { const int32_t thread_id = (int32_t) (intptr_t) arg; cpu_set_t cpus; CPU_ZERO(&cpus); CPU_SET(thread_id, &cpus); if (pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus)) exit(thread_id + 1); for (;;) { task *t = task_queue_next(thread_id); if (t) { t->thread_id = thread_id; t->fn(t); continue; } priority_decrement(thread_id); if (!total_priority) { for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) sem_post(&threads[thread_id].counter); break; } sem_wait(&threads[thread_id].counter); if (!total_priority) break; priority_increment(thread_id); } return NULL; } void task_run() { for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) pthread_create(&threads[thread_id].thread, NULL, task_loop, (void*) (intptr_t) thread_id); task_loop(MAIN_THREAD); for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) pthread_join(threads[thread_id].thread, NULL); } void task_constructor(void) { cpu_set_t cpus; CPU_ZERO(&cpus); sched_getaffinity(0, sizeof(cpus), &cpus); total_priority = avaiable_threads = CPU_COUNT(&cpus); for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) { task_queue_init(thread_id); task_slab_init(thread_id); sem_init(&threads[thread_id].counter, 0, 1); } } void task_destructor(void) { for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) { sem_destroy(&threads[thread_id].counter); task_slab_free(thread_id); } } void fib_c(task *t) { task *child_a = t->state[2], *child_b = t->state[3]; t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); task_free(child_a); task_free(child_b); task_done(t); } void fib_a(task *t); void fib_b(task *t) { intptr_t arg = (intptr_t) t->state[1]; task *child_b = t->state[3] = task_init(fib_a, t->thread_id); child_b->state[1] = (void*) (arg - 2); task_queue_add(t->thread_id, child_b); task_join(child_b, t, fib_c); } void fib_a(task *t) { intptr_t arg = (intptr_t) t->state[1]; if (arg <= 0) { t->return_value = (void*) 0; task_done(t); return; } if (arg < 2) { t->return_value = (void*) 1; task_done(t); return; } task *child_a = t->state[2] = task_init(fib_a, t->thread_id); child_a->state[1] = (void*) (arg - 1); task_queue_add(t->thread_id, child_a); task_join(child_a, t, fib_b); } _Atomic size_t counter = 0; void start_b(task *t) { counter++; task *f = t->state[(intptr_t) t->state[0]]; intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; printf("i: %ld, fib(%ld) = %ld, thread: %d\n", i, arg, ret, f->thread_id); task_free(f); t->state[0]++; if ((intptr_t) t->state[0] == I + 1) { printf("Complete\n"); task_done(t); task_free(t); return; } task_join(t->state[(intptr_t) t->state[0]], t, start_b); } void start_a(task *t) { t->state[0] = (void*) 1; for (intptr_t i = 1; i <= I; i++) { task *f = task_init(fib_a, t->thread_id); f->state[0] = (void*) i; f->state[1] = (void*) FIB; t->state[i] = f; task_queue_async(f); } task_join(t->state[(intptr_t) t->state[0]], t, start_b); } int main(void) { task_constructor(); task *t = task_init(start_a, MAIN_THREAD); task_queue_add(MAIN_THREAD, t); task_run(); task_destructor(); assert(counter == I); return 0; } ```