summaryrefslogtreecommitdiff
path: root/docs/application/process.md
diff options
context:
space:
mode:
authornodist <kevin.comas.git@gmail.com>2026-06-02 16:26:09 -0400
committernodist <kevin.comas.git@gmail.com>2026-06-02 16:26:09 -0400
commitb26ad08b39b8229dcea0bafc4a8ba4b0d7ad7154 (patch)
tree1a469cc7f1ab2bbf2af42336518fa19ad0d0ed76 /docs/application/process.md
parentba087e5dbcc50537d82da5dbc602df7292d3f24c (diff)
reformat with sync and async calls
Diffstat (limited to 'docs/application/process.md')
-rw-r--r--docs/application/process.md309
1 files changed, 0 insertions, 309 deletions
diff --git a/docs/application/process.md b/docs/application/process.md
deleted file mode 100644
index 2934201..0000000
--- a/docs/application/process.md
+++ /dev/null
@@ -1,309 +0,0 @@
-# Processes
-
----
-
-## Per Process Task Queue
-
-## Asynchronous Task Queue
-
-# Queuing
-
-## Sync
-
-## Async
-
-# Running
-
-# Example
-
-```c
-#include <stdlib.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <stdbool.h>
-#include <pthread.h>
-#include <assert.h>
-
-#define PROCESSES 12
-
-#define I 5
-
-#define FIB 30
-
-typedef struct _task task;
-
-typedef void task_fn(task *t);
-
-#define TASK_STATE_SIZE 20
-
-#define TASK_QUEUE_ASYNC -1
-
-#define TASK_MAIN_PROCESS 0
-
-typedef struct _task {
- _Atomic bool join_ready;
- int32_t worker_id;
- void *state[TASK_STATE_SIZE];
- void *return_value;
- struct _task *next, *join;
- task_fn *fn;
-} task;
-
-task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL;
-
-_Atomic size_t task_async_queue_length = 0;
-
-pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER;
-
-void __attribute__((destructor)) task_destructor(void) {
- while (task_pool) {
- task *t = task_pool;
- task_pool = task_pool->next;
- free(t);
- }
- pthread_mutex_destroy(&task_pool_mutex);
- pthread_cond_destroy(&task_async_queue_cond);
- pthread_mutex_destroy(&task_async_queue_mutex);
-}
-
-typedef struct {
- task *head, *tail;
- pthread_t thread;
-} process;
-
-process worker[PROCESSES] = {};
-
-task *task_init(task_fn fn) {
- task *t = NULL;
- pthread_mutex_lock(&task_pool_mutex);
- if (task_pool) {
- t = task_pool;
- task_pool = task_pool->next;
- }
- pthread_mutex_unlock(&task_pool_mutex);
- if (!t)
- t = malloc(sizeof(task));
- __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
- t->worker_id = TASK_QUEUE_ASYNC;
- for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++)
- t->state[state_index] = NULL;
- t->return_value = NULL;
- t->next = t->join = NULL;
- t->fn = fn;
- return t;
-}
-
-void task_free(task *t) {
- pthread_mutex_lock(&task_pool_mutex);
- t->next = task_pool;
- task_pool = t;
- pthread_mutex_unlock(&task_pool_mutex);
-}
-
-void task_queue_async(task *t) {
- pthread_mutex_lock(&task_async_queue_mutex);
- if (task_async_queue_tail)
- task_async_queue_tail = task_async_queue_tail->next = t;
- else
- task_async_queue_head = task_async_queue_tail = t;
- pthread_mutex_unlock(&task_async_queue_mutex);
- pthread_cond_signal(&task_async_queue_cond);
-}
-
-void task_queue_sync(task *t, int32_t worker_id) {
- if (worker_id == TASK_QUEUE_ASYNC)
- return task_queue_async(t);
- if (worker[worker_id].tail)
- worker[worker_id].tail = worker[worker_id].tail->next = t;
- else
- worker[worker_id].head = worker[worker_id].tail = t;
- return;
-}
-
-void task_ready_queue(task *t) {
- if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join)
- return;
- task *join = t->join;
- t->join = NULL;
- __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
- task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC);
-}
-
-void task_join(task *restrict t, task *restrict join, task_fn fn) {
- join->fn = fn;
- t->join = join;
- task_ready_queue(t);
-}
-
-void task_done(task *t) {
- task_ready_queue(t);
-}
-
-_Atomic size_t running = PROCESSES;
-
-void *task_loop(void *arg) {
- int32_t worker_id = (intptr_t) arg;
- for (;;) {
- task *t = NULL;
- if (worker[worker_id].head) {
- t = worker[worker_id].head;
- if (worker[worker_id].head != worker[worker_id].tail)
- worker[worker_id].head = worker[worker_id].head->next;
- else
- worker[worker_id].head = worker[worker_id].tail = NULL;
- } else {
- pthread_mutex_lock(&task_async_queue_mutex);
- if (task_async_queue_head) {
- t = task_async_queue_head;
- if (task_async_queue_head != task_async_queue_tail)
- task_async_queue_head = task_async_queue_head->next;
- else
- task_async_queue_head = task_async_queue_tail = NULL;
- }
- pthread_mutex_unlock(&task_async_queue_mutex);
- }
- if (t) {
- t->next = NULL;
- t->worker_id = worker_id;
- t->fn(t);
- continue;
- }
- running--;
- if (!running) {
- for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
- pthread_cond_signal(&task_async_queue_cond);
- break;
- }
- pthread_mutex_lock(&task_async_queue_mutex);
- pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex);
- pthread_mutex_unlock(&task_async_queue_mutex);
- if (!running)
- break;
- running++;
- }
- return NULL;
-}
-
-void task_run(void) {
- for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
- pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id);
- task_loop(0);
- for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
- pthread_join(worker[worker_id].thread, NULL);
-}
-
-/*
-fib : Fn[n] $ (
- ? {
- n <= 0 { 0 }
- n < 2 { 1 }
- { + (fib `call n - 1; fib `call n - 2) }
- }
- v : Vector[`fork_type fib] $ ()
- @ 1 .. I { v `push fib `fork FIB }
- @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) }
- `print "Complete\n"
-)
-*/
-
-#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__)
-
-_Atomic size_t counter = 0;
-
-/*
- 0: i
- 1: arg
- 2: child_a
- 3: child_b
-*/
-
-void fib_c(task *t) {
- task *child_a = t->state[2], *child_b = t->state[3];
- t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value);
- task_free(child_a);
- task_free(child_b);
- task_done(t);
-}
-
-void fib_a(task *t);
-
-void fib_b(task *t) {
- intptr_t arg = (intptr_t) t->state[1];
- task *child_b = t->state[3] = task_init(fib_a);
- child_b->state[1] = (void*) (arg - 2);
- task_queue_sync(child_b, t->worker_id);
- task_join(child_b, t, fib_c);
-}
-
-void fib_a(task *t) {
- intptr_t arg = (intptr_t) t->state[1];
- if (arg <= 0) {
- t->return_value = (void*) 0;
- task_done(t);
- return;
- }
- if (arg < 2) {
- t->return_value = (void*) 1;
- task_done(t);
- return;
- }
- task *child_a = t->state[2] = task_init(fib_a);
- child_a->state[1] = (void*) (arg - 1);
- task_queue_sync(child_a, t->worker_id);
- task_join(child_a, t, fib_b);
-}
-
-intptr_t fib(intptr_t n) {
- if (n <= 0)
- return 0;
- if (n < 2)
- return 1;
- return fib(n - 1) + fib(n - 2);
-}
-
-void fib_rec(task *t) {
- intptr_t arg = (intptr_t) t->state[1];
- printf("fib thread: %d\n", t->worker_id);
- t->return_value = (void*) fib(arg);
- task_done(t);
-}
-
-void start_b(task *t) {
- counter++;
- task *f = t->state[(intptr_t) t->state[0]];
- intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value;
- printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret);
- task_free(f);
- t->state[0]++;
- if ((intptr_t) t->state[0] == I + 1) {
- printf("Complete\n");
- task_done(t);
- task_free(t);
- return;
- }
- task_join(t->state[(intptr_t) t->state[0]], t, start_b);
-}
-
-void start_a(task *t) {
- t->state[0] = (void*) 1;
- for (intptr_t i = 0; i < I; i++) {
- //task *f = task_init(fib_a);
- task *f = task_init(fib_rec);
- f->state[0] = (void*) i;
- f->state[1] = (void*) FIB;
- t->state[i + 1] = f;
- task_queue_async(f);
- }
- task_join(t->state[(intptr_t) t->state[0]], t, start_b);
-}
-
-int main(void) {
- task *t = task_init(start_a);
- task_queue_sync(t, TASK_MAIN_PROCESS);
- task_run();
- assert(counter == I);
- return 0;
-}
-```