diff options
| author | nodist <kevin.comas.git@gmail.com> | 2026-06-01 20:48:47 -0400 |
|---|---|---|
| committer | nodist <kevin.comas.git@gmail.com> | 2026-06-01 20:48:47 -0400 |
| commit | ba087e5dbcc50537d82da5dbc602df7292d3f24c (patch) | |
| tree | a72cc565b17f6ddcd46f38e7b04a623a88ad5282 /docs | |
| parent | 040fb35732e6c7c0d8f4ccf7c9bee7242cea6939 (diff) | |
process example
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/application/index.md | 17 | ||||
| -rw-r--r-- | docs/application/pool.md | 3 | ||||
| -rw-r--r-- | docs/application/process.md | 309 |
3 files changed, 319 insertions, 10 deletions
diff --git a/docs/application/index.md b/docs/application/index.md index d3b7d49..b9a5e0b 100644 --- a/docs/application/index.md +++ b/docs/application/index.md @@ -1,17 +1,14 @@ -# Processes +# Runtime --- -# Startup +## Sections -## Per Process Task Queue +* ##### [Pool](./pool.md) +* ##### [Process](./process.md) -## Asynchronous Task Queue +## Invocation -# Queuing +### File -## Sync - -## Async - -# Running +### REPL diff --git a/docs/application/pool.md b/docs/application/pool.md new file mode 100644 index 0000000..dc7c1e3 --- /dev/null +++ b/docs/application/pool.md @@ -0,0 +1,3 @@ +# Memory Pool + +--- diff --git a/docs/application/process.md b/docs/application/process.md new file mode 100644 index 0000000..2934201 --- /dev/null +++ b/docs/application/process.md @@ -0,0 +1,309 @@ +# Processes + +--- + +## Per Process Task Queue + +## Asynchronous Task Queue + +# Queuing + +## Sync + +## Async + +# Running + +# Example + +```c +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <stdbool.h> +#include <pthread.h> +#include <assert.h> + +#define PROCESSES 12 + +#define I 5 + +#define FIB 30 + +typedef struct _task task; + +typedef void task_fn(task *t); + +#define TASK_STATE_SIZE 20 + +#define TASK_QUEUE_ASYNC -1 + +#define TASK_MAIN_PROCESS 0 + +typedef struct _task { + _Atomic bool join_ready; + int32_t worker_id; + void *state[TASK_STATE_SIZE]; + void *return_value; + struct _task *next, *join; + task_fn *fn; +} task; + +task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; + +_Atomic size_t task_async_queue_length = 0; + +pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; + +void __attribute__((destructor)) task_destructor(void) { + while (task_pool) { + task *t = task_pool; + task_pool = task_pool->next; + free(t); + } + pthread_mutex_destroy(&task_pool_mutex); + pthread_cond_destroy(&task_async_queue_cond); + pthread_mutex_destroy(&task_async_queue_mutex); +} + +typedef struct { + task *head, *tail; + pthread_t thread; +} process; + +process worker[PROCESSES] = {}; + +task *task_init(task_fn fn) { + task *t = NULL; + pthread_mutex_lock(&task_pool_mutex); + if (task_pool) { + t = task_pool; + task_pool = task_pool->next; + } + pthread_mutex_unlock(&task_pool_mutex); + if (!t) + t = malloc(sizeof(task)); + __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); + t->worker_id = TASK_QUEUE_ASYNC; + for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) + t->state[state_index] = NULL; + t->return_value = NULL; + t->next = t->join = NULL; + t->fn = fn; + return t; +} + +void task_free(task *t) { + pthread_mutex_lock(&task_pool_mutex); + t->next = task_pool; + task_pool = t; + pthread_mutex_unlock(&task_pool_mutex); +} + +void task_queue_async(task *t) { + pthread_mutex_lock(&task_async_queue_mutex); + if (task_async_queue_tail) + task_async_queue_tail = task_async_queue_tail->next = t; + else + task_async_queue_head = task_async_queue_tail = t; + pthread_mutex_unlock(&task_async_queue_mutex); + pthread_cond_signal(&task_async_queue_cond); +} + +void task_queue_sync(task *t, int32_t worker_id) { + if (worker_id == TASK_QUEUE_ASYNC) + return task_queue_async(t); + if (worker[worker_id].tail) + worker[worker_id].tail = worker[worker_id].tail->next = t; + else + worker[worker_id].head = worker[worker_id].tail = t; + return; +} + +void task_ready_queue(task *t) { + if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) + return; + task *join = t->join; + t->join = NULL; + __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); + task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); +} + +void task_join(task *restrict t, task *restrict join, task_fn fn) { + join->fn = fn; + t->join = join; + task_ready_queue(t); +} + +void task_done(task *t) { + task_ready_queue(t); +} + +_Atomic size_t running = PROCESSES; + +void *task_loop(void *arg) { + int32_t worker_id = (intptr_t) arg; + for (;;) { + task *t = NULL; + if (worker[worker_id].head) { + t = worker[worker_id].head; + if (worker[worker_id].head != worker[worker_id].tail) + worker[worker_id].head = worker[worker_id].head->next; + else + worker[worker_id].head = worker[worker_id].tail = NULL; + } else { + pthread_mutex_lock(&task_async_queue_mutex); + if (task_async_queue_head) { + t = task_async_queue_head; + if (task_async_queue_head != task_async_queue_tail) + task_async_queue_head = task_async_queue_head->next; + else + task_async_queue_head = task_async_queue_tail = NULL; + } + pthread_mutex_unlock(&task_async_queue_mutex); + } + if (t) { + t->next = NULL; + t->worker_id = worker_id; + t->fn(t); + continue; + } + running--; + if (!running) { + for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) + pthread_cond_signal(&task_async_queue_cond); + break; + } + pthread_mutex_lock(&task_async_queue_mutex); + pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); + pthread_mutex_unlock(&task_async_queue_mutex); + if (!running) + break; + running++; + } + return NULL; +} + +void task_run(void) { + for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) + pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); + task_loop(0); + for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) + pthread_join(worker[worker_id].thread, NULL); +} + +/* +fib : Fn[n] $ ( + ? { + n <= 0 { 0 } + n < 2 { 1 } + { + (fib `call n - 1; fib `call n - 2) } + } + v : Vector[`fork_type fib] $ () + @ 1 .. I { v `push fib `fork FIB } + @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) } + `print "Complete\n" +) +*/ + +#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) + +_Atomic size_t counter = 0; + +/* + 0: i + 1: arg + 2: child_a + 3: child_b +*/ + +void fib_c(task *t) { + task *child_a = t->state[2], *child_b = t->state[3]; + t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); + task_free(child_a); + task_free(child_b); + task_done(t); +} + +void fib_a(task *t); + +void fib_b(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + task *child_b = t->state[3] = task_init(fib_a); + child_b->state[1] = (void*) (arg - 2); + task_queue_sync(child_b, t->worker_id); + task_join(child_b, t, fib_c); +} + +void fib_a(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + if (arg <= 0) { + t->return_value = (void*) 0; + task_done(t); + return; + } + if (arg < 2) { + t->return_value = (void*) 1; + task_done(t); + return; + } + task *child_a = t->state[2] = task_init(fib_a); + child_a->state[1] = (void*) (arg - 1); + task_queue_sync(child_a, t->worker_id); + task_join(child_a, t, fib_b); +} + +intptr_t fib(intptr_t n) { + if (n <= 0) + return 0; + if (n < 2) + return 1; + return fib(n - 1) + fib(n - 2); +} + +void fib_rec(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + printf("fib thread: %d\n", t->worker_id); + t->return_value = (void*) fib(arg); + task_done(t); +} + +void start_b(task *t) { + counter++; + task *f = t->state[(intptr_t) t->state[0]]; + intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; + printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); + task_free(f); + t->state[0]++; + if ((intptr_t) t->state[0] == I + 1) { + printf("Complete\n"); + task_done(t); + task_free(t); + return; + } + task_join(t->state[(intptr_t) t->state[0]], t, start_b); +} + +void start_a(task *t) { + t->state[0] = (void*) 1; + for (intptr_t i = 0; i < I; i++) { + //task *f = task_init(fib_a); + task *f = task_init(fib_rec); + f->state[0] = (void*) i; + f->state[1] = (void*) FIB; + t->state[i + 1] = f; + task_queue_async(f); + } + task_join(t->state[(intptr_t) t->state[0]], t, start_b); +} + +int main(void) { + task *t = task_init(start_a); + task_queue_sync(t, TASK_MAIN_PROCESS); + task_run(); + assert(counter == I); + return 0; +} +``` |
