summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornodist <kevin.comas.git@gmail.com>2026-06-01 20:48:47 -0400
committernodist <kevin.comas.git@gmail.com>2026-06-01 20:48:47 -0400
commitba087e5dbcc50537d82da5dbc602df7292d3f24c (patch)
treea72cc565b17f6ddcd46f38e7b04a623a88ad5282
parent040fb35732e6c7c0d8f4ccf7c9bee7242cea6939 (diff)
process example
-rw-r--r--docs/application/index.md17
-rw-r--r--docs/application/pool.md3
-rw-r--r--docs/application/process.md309
-rw-r--r--mkdocs.yml4
4 files changed, 322 insertions, 11 deletions
diff --git a/docs/application/index.md b/docs/application/index.md
index d3b7d49..b9a5e0b 100644
--- a/docs/application/index.md
+++ b/docs/application/index.md
@@ -1,17 +1,14 @@
-# Processes
+# Runtime
---
-# Startup
+## Sections
-## Per Process Task Queue
+* ##### [Pool](./pool.md)
+* ##### [Process](./process.md)
-## Asynchronous Task Queue
+## Invocation
-# Queuing
+### File
-## Sync
-
-## Async
-
-# Running
+### REPL
diff --git a/docs/application/pool.md b/docs/application/pool.md
new file mode 100644
index 0000000..dc7c1e3
--- /dev/null
+++ b/docs/application/pool.md
@@ -0,0 +1,3 @@
+# Memory Pool
+
+---
diff --git a/docs/application/process.md b/docs/application/process.md
new file mode 100644
index 0000000..2934201
--- /dev/null
+++ b/docs/application/process.md
@@ -0,0 +1,309 @@
+# Processes
+
+---
+
+## Per Process Task Queue
+
+## Asynchronous Task Queue
+
+# Queuing
+
+## Sync
+
+## Async
+
+# Running
+
+# Example
+
+```c
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <assert.h>
+
+#define PROCESSES 12
+
+#define I 5
+
+#define FIB 30
+
+typedef struct _task task;
+
+typedef void task_fn(task *t);
+
+#define TASK_STATE_SIZE 20
+
+#define TASK_QUEUE_ASYNC -1
+
+#define TASK_MAIN_PROCESS 0
+
+typedef struct _task {
+ _Atomic bool join_ready;
+ int32_t worker_id;
+ void *state[TASK_STATE_SIZE];
+ void *return_value;
+ struct _task *next, *join;
+ task_fn *fn;
+} task;
+
+task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL;
+
+_Atomic size_t task_async_queue_length = 0;
+
+pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER;
+
+void __attribute__((destructor)) task_destructor(void) {
+ while (task_pool) {
+ task *t = task_pool;
+ task_pool = task_pool->next;
+ free(t);
+ }
+ pthread_mutex_destroy(&task_pool_mutex);
+ pthread_cond_destroy(&task_async_queue_cond);
+ pthread_mutex_destroy(&task_async_queue_mutex);
+}
+
+typedef struct {
+ task *head, *tail;
+ pthread_t thread;
+} process;
+
+process worker[PROCESSES] = {};
+
+task *task_init(task_fn fn) {
+ task *t = NULL;
+ pthread_mutex_lock(&task_pool_mutex);
+ if (task_pool) {
+ t = task_pool;
+ task_pool = task_pool->next;
+ }
+ pthread_mutex_unlock(&task_pool_mutex);
+ if (!t)
+ t = malloc(sizeof(task));
+ __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
+ t->worker_id = TASK_QUEUE_ASYNC;
+ for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++)
+ t->state[state_index] = NULL;
+ t->return_value = NULL;
+ t->next = t->join = NULL;
+ t->fn = fn;
+ return t;
+}
+
+void task_free(task *t) {
+ pthread_mutex_lock(&task_pool_mutex);
+ t->next = task_pool;
+ task_pool = t;
+ pthread_mutex_unlock(&task_pool_mutex);
+}
+
+void task_queue_async(task *t) {
+ pthread_mutex_lock(&task_async_queue_mutex);
+ if (task_async_queue_tail)
+ task_async_queue_tail = task_async_queue_tail->next = t;
+ else
+ task_async_queue_head = task_async_queue_tail = t;
+ pthread_mutex_unlock(&task_async_queue_mutex);
+ pthread_cond_signal(&task_async_queue_cond);
+}
+
+void task_queue_sync(task *t, int32_t worker_id) {
+ if (worker_id == TASK_QUEUE_ASYNC)
+ return task_queue_async(t);
+ if (worker[worker_id].tail)
+ worker[worker_id].tail = worker[worker_id].tail->next = t;
+ else
+ worker[worker_id].head = worker[worker_id].tail = t;
+ return;
+}
+
+void task_ready_queue(task *t) {
+ if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join)
+ return;
+ task *join = t->join;
+ t->join = NULL;
+ __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
+ task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC);
+}
+
+void task_join(task *restrict t, task *restrict join, task_fn fn) {
+ join->fn = fn;
+ t->join = join;
+ task_ready_queue(t);
+}
+
+void task_done(task *t) {
+ task_ready_queue(t);
+}
+
+_Atomic size_t running = PROCESSES;
+
+void *task_loop(void *arg) {
+ int32_t worker_id = (intptr_t) arg;
+ for (;;) {
+ task *t = NULL;
+ if (worker[worker_id].head) {
+ t = worker[worker_id].head;
+ if (worker[worker_id].head != worker[worker_id].tail)
+ worker[worker_id].head = worker[worker_id].head->next;
+ else
+ worker[worker_id].head = worker[worker_id].tail = NULL;
+ } else {
+ pthread_mutex_lock(&task_async_queue_mutex);
+ if (task_async_queue_head) {
+ t = task_async_queue_head;
+ if (task_async_queue_head != task_async_queue_tail)
+ task_async_queue_head = task_async_queue_head->next;
+ else
+ task_async_queue_head = task_async_queue_tail = NULL;
+ }
+ pthread_mutex_unlock(&task_async_queue_mutex);
+ }
+ if (t) {
+ t->next = NULL;
+ t->worker_id = worker_id;
+ t->fn(t);
+ continue;
+ }
+ running--;
+ if (!running) {
+ for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
+ pthread_cond_signal(&task_async_queue_cond);
+ break;
+ }
+ pthread_mutex_lock(&task_async_queue_mutex);
+ pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex);
+ pthread_mutex_unlock(&task_async_queue_mutex);
+ if (!running)
+ break;
+ running++;
+ }
+ return NULL;
+}
+
+void task_run(void) {
+ for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
+ pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id);
+ task_loop(0);
+ for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
+ pthread_join(worker[worker_id].thread, NULL);
+}
+
+/*
+fib : Fn[n] $ (
+ ? {
+ n <= 0 { 0 }
+ n < 2 { 1 }
+ { + (fib `call n - 1; fib `call n - 2) }
+ }
+ v : Vector[`fork_type fib] $ ()
+ @ 1 .. I { v `push fib `fork FIB }
+ @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) }
+ `print "Complete\n"
+)
+*/
+
+#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__)
+
+_Atomic size_t counter = 0;
+
+/*
+ 0: i
+ 1: arg
+ 2: child_a
+ 3: child_b
+*/
+
+void fib_c(task *t) {
+ task *child_a = t->state[2], *child_b = t->state[3];
+ t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value);
+ task_free(child_a);
+ task_free(child_b);
+ task_done(t);
+}
+
+void fib_a(task *t);
+
+void fib_b(task *t) {
+ intptr_t arg = (intptr_t) t->state[1];
+ task *child_b = t->state[3] = task_init(fib_a);
+ child_b->state[1] = (void*) (arg - 2);
+ task_queue_sync(child_b, t->worker_id);
+ task_join(child_b, t, fib_c);
+}
+
+void fib_a(task *t) {
+ intptr_t arg = (intptr_t) t->state[1];
+ if (arg <= 0) {
+ t->return_value = (void*) 0;
+ task_done(t);
+ return;
+ }
+ if (arg < 2) {
+ t->return_value = (void*) 1;
+ task_done(t);
+ return;
+ }
+ task *child_a = t->state[2] = task_init(fib_a);
+ child_a->state[1] = (void*) (arg - 1);
+ task_queue_sync(child_a, t->worker_id);
+ task_join(child_a, t, fib_b);
+}
+
+intptr_t fib(intptr_t n) {
+ if (n <= 0)
+ return 0;
+ if (n < 2)
+ return 1;
+ return fib(n - 1) + fib(n - 2);
+}
+
+void fib_rec(task *t) {
+ intptr_t arg = (intptr_t) t->state[1];
+ printf("fib thread: %d\n", t->worker_id);
+ t->return_value = (void*) fib(arg);
+ task_done(t);
+}
+
+void start_b(task *t) {
+ counter++;
+ task *f = t->state[(intptr_t) t->state[0]];
+ intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value;
+ printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret);
+ task_free(f);
+ t->state[0]++;
+ if ((intptr_t) t->state[0] == I + 1) {
+ printf("Complete\n");
+ task_done(t);
+ task_free(t);
+ return;
+ }
+ task_join(t->state[(intptr_t) t->state[0]], t, start_b);
+}
+
+void start_a(task *t) {
+ t->state[0] = (void*) 1;
+ for (intptr_t i = 0; i < I; i++) {
+ //task *f = task_init(fib_a);
+ task *f = task_init(fib_rec);
+ f->state[0] = (void*) i;
+ f->state[1] = (void*) FIB;
+ t->state[i + 1] = f;
+ task_queue_async(f);
+ }
+ task_join(t->state[(intptr_t) t->state[0]], t, start_b);
+}
+
+int main(void) {
+ task *t = task_init(start_a);
+ task_queue_sync(t, TASK_MAIN_PROCESS);
+ task_run();
+ assert(counter == I);
+ return 0;
+}
+```
diff --git a/mkdocs.yml b/mkdocs.yml
index 43aeaea..6efaf5e 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -37,4 +37,6 @@ nav:
- List: 'type_system/list.md'
- Namespace: 'type_system/namespace.md'
- Application:
- - Processes: 'application/index.md'
+ - Runtime: 'application/index.md'
+ - Pool: 'application/pool.md'
+ - Processes: 'application/process.md'