# Processes --- ## Per Process Task Queue ## Asynchronous Task Queue # Queuing ## Sync ## Async # Running # Example ```c #include #include #include #include #include #include #define PROCESSES 12 #define I 5 #define FIB 30 typedef struct _task task; typedef void task_fn(task *t); #define TASK_STATE_SIZE 20 #define TASK_QUEUE_ASYNC -1 #define TASK_MAIN_PROCESS 0 typedef struct _task { _Atomic bool join_ready; int32_t worker_id; void *state[TASK_STATE_SIZE]; void *return_value; struct _task *next, *join; task_fn *fn; } task; task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; _Atomic size_t task_async_queue_length = 0; pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; void __attribute__((destructor)) task_destructor(void) { while (task_pool) { task *t = task_pool; task_pool = task_pool->next; free(t); } pthread_mutex_destroy(&task_pool_mutex); pthread_cond_destroy(&task_async_queue_cond); pthread_mutex_destroy(&task_async_queue_mutex); } typedef struct { task *head, *tail; pthread_t thread; } process; process worker[PROCESSES] = {}; task *task_init(task_fn fn) { task *t = NULL; pthread_mutex_lock(&task_pool_mutex); if (task_pool) { t = task_pool; task_pool = task_pool->next; } pthread_mutex_unlock(&task_pool_mutex); if (!t) t = malloc(sizeof(task)); __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); t->worker_id = TASK_QUEUE_ASYNC; for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) t->state[state_index] = NULL; t->return_value = NULL; t->next = t->join = NULL; t->fn = fn; return t; } void task_free(task *t) { pthread_mutex_lock(&task_pool_mutex); t->next = task_pool; task_pool = t; pthread_mutex_unlock(&task_pool_mutex); } void task_queue_async(task *t) { pthread_mutex_lock(&task_async_queue_mutex); if (task_async_queue_tail) task_async_queue_tail = task_async_queue_tail->next = t; else task_async_queue_head = task_async_queue_tail = t; pthread_mutex_unlock(&task_async_queue_mutex); pthread_cond_signal(&task_async_queue_cond); } void task_queue_sync(task *t, int32_t worker_id) { if (worker_id == TASK_QUEUE_ASYNC) return task_queue_async(t); if (worker[worker_id].tail) worker[worker_id].tail = worker[worker_id].tail->next = t; else worker[worker_id].head = worker[worker_id].tail = t; return; } void task_ready_queue(task *t) { if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) return; task *join = t->join; t->join = NULL; __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); } void task_join(task *restrict t, task *restrict join, task_fn fn) { join->fn = fn; t->join = join; task_ready_queue(t); } void task_done(task *t) { task_ready_queue(t); } _Atomic size_t running = PROCESSES; void *task_loop(void *arg) { int32_t worker_id = (intptr_t) arg; for (;;) { task *t = NULL; if (worker[worker_id].head) { t = worker[worker_id].head; if (worker[worker_id].head != worker[worker_id].tail) worker[worker_id].head = worker[worker_id].head->next; else worker[worker_id].head = worker[worker_id].tail = NULL; } else { pthread_mutex_lock(&task_async_queue_mutex); if (task_async_queue_head) { t = task_async_queue_head; if (task_async_queue_head != task_async_queue_tail) task_async_queue_head = task_async_queue_head->next; else task_async_queue_head = task_async_queue_tail = NULL; } pthread_mutex_unlock(&task_async_queue_mutex); } if (t) { t->next = NULL; t->worker_id = worker_id; t->fn(t); continue; } running--; if (!running) { for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) pthread_cond_signal(&task_async_queue_cond); break; } pthread_mutex_lock(&task_async_queue_mutex); pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); pthread_mutex_unlock(&task_async_queue_mutex); if (!running) break; running++; } return NULL; } void task_run(void) { for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); task_loop(0); for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) pthread_join(worker[worker_id].thread, NULL); } /* fib : Fn[n] $ ( ? { n <= 0 { 0 } n < 2 { 1 } { + (fib `call n - 1; fib `call n - 2) } } v : Vector[`fork_type fib] $ () @ 1 .. I { v `push fib `fork FIB } @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) } `print "Complete\n" ) */ #define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) _Atomic size_t counter = 0; /* 0: i 1: arg 2: child_a 3: child_b */ void fib_c(task *t) { task *child_a = t->state[2], *child_b = t->state[3]; t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); task_free(child_a); task_free(child_b); task_done(t); } void fib_a(task *t); void fib_b(task *t) { intptr_t arg = (intptr_t) t->state[1]; task *child_b = t->state[3] = task_init(fib_a); child_b->state[1] = (void*) (arg - 2); task_queue_sync(child_b, t->worker_id); task_join(child_b, t, fib_c); } void fib_a(task *t) { intptr_t arg = (intptr_t) t->state[1]; if (arg <= 0) { t->return_value = (void*) 0; task_done(t); return; } if (arg < 2) { t->return_value = (void*) 1; task_done(t); return; } task *child_a = t->state[2] = task_init(fib_a); child_a->state[1] = (void*) (arg - 1); task_queue_sync(child_a, t->worker_id); task_join(child_a, t, fib_b); } intptr_t fib(intptr_t n) { if (n <= 0) return 0; if (n < 2) return 1; return fib(n - 1) + fib(n - 2); } void fib_rec(task *t) { intptr_t arg = (intptr_t) t->state[1]; printf("fib thread: %d\n", t->worker_id); t->return_value = (void*) fib(arg); task_done(t); } void start_b(task *t) { counter++; task *f = t->state[(intptr_t) t->state[0]]; intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); task_free(f); t->state[0]++; if ((intptr_t) t->state[0] == I + 1) { printf("Complete\n"); task_done(t); task_free(t); return; } task_join(t->state[(intptr_t) t->state[0]], t, start_b); } void start_a(task *t) { t->state[0] = (void*) 1; for (intptr_t i = 0; i < I; i++) { //task *f = task_init(fib_a); task *f = task_init(fib_rec); f->state[0] = (void*) i; f->state[1] = (void*) FIB; t->state[i + 1] = f; task_queue_async(f); } task_join(t->state[(intptr_t) t->state[0]], t, start_b); } int main(void) { task *t = task_init(start_a); task_queue_sync(t, TASK_MAIN_PROCESS); task_run(); assert(counter == I); return 0; } ```