diff options
| author | nodist <kevin.comas.git@gmail.com> | 2026-06-02 16:26:09 -0400 |
|---|---|---|
| committer | nodist <kevin.comas.git@gmail.com> | 2026-06-02 16:26:09 -0400 |
| commit | b26ad08b39b8229dcea0bafc4a8ba4b0d7ad7154 (patch) | |
| tree | 1a469cc7f1ab2bbf2af42336518fa19ad0d0ed76 /docs/application/process.md | |
| parent | ba087e5dbcc50537d82da5dbc602df7292d3f24c (diff) | |
reformat with sync and async calls
Diffstat (limited to 'docs/application/process.md')
| -rw-r--r-- | docs/application/process.md | 309 |
1 files changed, 0 insertions, 309 deletions
diff --git a/docs/application/process.md b/docs/application/process.md deleted file mode 100644 index 2934201..0000000 --- a/docs/application/process.md +++ /dev/null @@ -1,309 +0,0 @@ -# Processes - ---- - -## Per Process Task Queue - -## Asynchronous Task Queue - -# Queuing - -## Sync - -## Async - -# Running - -# Example - -```c -#include <stdlib.h> -#include <stdio.h> -#include <stdint.h> -#include <stdbool.h> -#include <pthread.h> -#include <assert.h> - -#define PROCESSES 12 - -#define I 5 - -#define FIB 30 - -typedef struct _task task; - -typedef void task_fn(task *t); - -#define TASK_STATE_SIZE 20 - -#define TASK_QUEUE_ASYNC -1 - -#define TASK_MAIN_PROCESS 0 - -typedef struct _task { - _Atomic bool join_ready; - int32_t worker_id; - void *state[TASK_STATE_SIZE]; - void *return_value; - struct _task *next, *join; - task_fn *fn; -} task; - -task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; - -_Atomic size_t task_async_queue_length = 0; - -pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; - -void __attribute__((destructor)) task_destructor(void) { - while (task_pool) { - task *t = task_pool; - task_pool = task_pool->next; - free(t); - } - pthread_mutex_destroy(&task_pool_mutex); - pthread_cond_destroy(&task_async_queue_cond); - pthread_mutex_destroy(&task_async_queue_mutex); -} - -typedef struct { - task *head, *tail; - pthread_t thread; -} process; - -process worker[PROCESSES] = {}; - -task *task_init(task_fn fn) { - task *t = NULL; - pthread_mutex_lock(&task_pool_mutex); - if (task_pool) { - t = task_pool; - task_pool = task_pool->next; - } - pthread_mutex_unlock(&task_pool_mutex); - if (!t) - t = malloc(sizeof(task)); - __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - t->worker_id = TASK_QUEUE_ASYNC; - for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) - t->state[state_index] = NULL; - t->return_value = NULL; - t->next = t->join = NULL; - t->fn = fn; - return t; -} - -void task_free(task *t) { - pthread_mutex_lock(&task_pool_mutex); - t->next = task_pool; - task_pool = t; - pthread_mutex_unlock(&task_pool_mutex); -} - -void task_queue_async(task *t) { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_tail) - task_async_queue_tail = task_async_queue_tail->next = t; - else - task_async_queue_head = task_async_queue_tail = t; - pthread_mutex_unlock(&task_async_queue_mutex); - pthread_cond_signal(&task_async_queue_cond); -} - -void task_queue_sync(task *t, int32_t worker_id) { - if (worker_id == TASK_QUEUE_ASYNC) - return task_queue_async(t); - if (worker[worker_id].tail) - worker[worker_id].tail = worker[worker_id].tail->next = t; - else - worker[worker_id].head = worker[worker_id].tail = t; - return; -} - -void task_ready_queue(task *t) { - if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) - return; - task *join = t->join; - t->join = NULL; - __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); -} - -void task_join(task *restrict t, task *restrict join, task_fn fn) { - join->fn = fn; - t->join = join; - task_ready_queue(t); -} - -void task_done(task *t) { - task_ready_queue(t); -} - -_Atomic size_t running = PROCESSES; - -void *task_loop(void *arg) { - int32_t worker_id = (intptr_t) arg; - for (;;) { - task *t = NULL; - if (worker[worker_id].head) { - t = worker[worker_id].head; - if (worker[worker_id].head != worker[worker_id].tail) - worker[worker_id].head = worker[worker_id].head->next; - else - worker[worker_id].head = worker[worker_id].tail = NULL; - } else { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_head) { - t = task_async_queue_head; - if (task_async_queue_head != task_async_queue_tail) - task_async_queue_head = task_async_queue_head->next; - else - task_async_queue_head = task_async_queue_tail = NULL; - } - pthread_mutex_unlock(&task_async_queue_mutex); - } - if (t) { - t->next = NULL; - t->worker_id = worker_id; - t->fn(t); - continue; - } - running--; - if (!running) { - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_cond_signal(&task_async_queue_cond); - break; - } - pthread_mutex_lock(&task_async_queue_mutex); - pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); - pthread_mutex_unlock(&task_async_queue_mutex); - if (!running) - break; - running++; - } - return NULL; -} - -void task_run(void) { - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); - task_loop(0); - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_join(worker[worker_id].thread, NULL); -} - -/* -fib : Fn[n] $ ( - ? { - n <= 0 { 0 } - n < 2 { 1 } - { + (fib `call n - 1; fib `call n - 2) } - } - v : Vector[`fork_type fib] $ () - @ 1 .. I { v `push fib `fork FIB } - @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) } - `print "Complete\n" -) -*/ - -#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) - -_Atomic size_t counter = 0; - -/* - 0: i - 1: arg - 2: child_a - 3: child_b -*/ - -void fib_c(task *t) { - task *child_a = t->state[2], *child_b = t->state[3]; - t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); - task_free(child_a); - task_free(child_b); - task_done(t); -} - -void fib_a(task *t); - -void fib_b(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - task *child_b = t->state[3] = task_init(fib_a); - child_b->state[1] = (void*) (arg - 2); - task_queue_sync(child_b, t->worker_id); - task_join(child_b, t, fib_c); -} - -void fib_a(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - if (arg <= 0) { - t->return_value = (void*) 0; - task_done(t); - return; - } - if (arg < 2) { - t->return_value = (void*) 1; - task_done(t); - return; - } - task *child_a = t->state[2] = task_init(fib_a); - child_a->state[1] = (void*) (arg - 1); - task_queue_sync(child_a, t->worker_id); - task_join(child_a, t, fib_b); -} - -intptr_t fib(intptr_t n) { - if (n <= 0) - return 0; - if (n < 2) - return 1; - return fib(n - 1) + fib(n - 2); -} - -void fib_rec(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - printf("fib thread: %d\n", t->worker_id); - t->return_value = (void*) fib(arg); - task_done(t); -} - -void start_b(task *t) { - counter++; - task *f = t->state[(intptr_t) t->state[0]]; - intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; - printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); - task_free(f); - t->state[0]++; - if ((intptr_t) t->state[0] == I + 1) { - printf("Complete\n"); - task_done(t); - task_free(t); - return; - } - task_join(t->state[(intptr_t) t->state[0]], t, start_b); -} - -void start_a(task *t) { - t->state[0] = (void*) 1; - for (intptr_t i = 0; i < I; i++) { - //task *f = task_init(fib_a); - task *f = task_init(fib_rec); - f->state[0] = (void*) i; - f->state[1] = (void*) FIB; - t->state[i + 1] = f; - task_queue_async(f); - } - task_join(t->state[(intptr_t) t->state[0]], t, start_b); -} - -int main(void) { - task *t = task_init(start_a); - task_queue_sync(t, TASK_MAIN_PROCESS); - task_run(); - assert(counter == I); - return 0; -} -``` |
