From b26ad08b39b8229dcea0bafc4a8ba4b0d7ad7154 Mon Sep 17 00:00:00 2001 From: nodist Date: Tue, 2 Jun 2026 16:26:09 -0400 Subject: reformat with sync and async calls --- docs/application/index.md | 11 +- docs/application/interface.md | 46 ++++++ docs/application/pool.md | 10 ++ docs/application/process.md | 309 ------------------------------------- docs/application/testing.md | 17 +++ docs/application/thread.md | 344 ++++++++++++++++++++++++++++++++++++++++++ docs/application/type.md | 7 + 7 files changed, 434 insertions(+), 310 deletions(-) create mode 100644 docs/application/interface.md delete mode 100644 docs/application/process.md create mode 100644 docs/application/testing.md create mode 100644 docs/application/thread.md create mode 100644 docs/application/type.md (limited to 'docs/application') diff --git a/docs/application/index.md b/docs/application/index.md index b9a5e0b..3809ac9 100644 --- a/docs/application/index.md +++ b/docs/application/index.md @@ -2,10 +2,19 @@ --- +## Requirements + +* Linux X64 +* GNU Make +* GCC with -std=gnu99 -fhardened + ## Sections +* ##### [Interface](./interface.md) +* ##### [Type](./type.md) * ##### [Pool](./pool.md) -* ##### [Process](./process.md) +* ##### [Thread](./thread.md) +* ##### [Testing](./testing.md) ## Invocation diff --git a/docs/application/interface.md b/docs/application/interface.md new file mode 100644 index 0000000..1493aff --- /dev/null +++ b/docs/application/interface.md @@ -0,0 +1,46 @@ +# Interface + +--- + +General definitions used throughout the code + +## Object Definitions + +```c +typedef union { + bool b; + uint8_t u8; + uint16_t u16; + uint32_t u32; + uint64_t u64; + int8_t i8; + int16_t i16; + int32_t i32; + int64_t i64; + float f32; + double f64; + void *ptr; +} kpl_any; + +typedef size_t kpl_any_hash_fn(const kpl_any a); + +typedef bool kpl_any_eq_fn(const kpl_any a, const kpl_any b); + +typedef ssize_t kpl_any_cmp_fn(const kpl_any a, const kpl_any b); + +typedef int32_t kpl_any_print_fn(const kpl_any a, FILE *file, size_t idnt, uint32_t print_opts); + +typedef void kpl_any_free_fn(kpl_any a); + +typedef struct { + kpl_any_hash_fn *hash_fn; + kpl_any_eq_fn *eq_fn; + kpl_any_cmp_fn *cmp_fn; + kpl_any_print_fn *print_fn; + kpl_any_free_fn *free_fn; +} kpl_any_interace; + +typedef struct { + any value, info; +} kpl_result; +``` diff --git a/docs/application/pool.md b/docs/application/pool.md index dc7c1e3..b4c5913 100644 --- a/docs/application/pool.md +++ b/docs/application/pool.md @@ -1,3 +1,13 @@ # Memory Pool --- + +## Object Definitions + +```c +#define POOL_HEADER(STRUCT) struct STRUCT *prev, *next; uint32_t obj_size + +typedef struct _kpl_pool_obj { + POOL_HEADER(_kpl_pool_obj); +} kpl_pool_obj; +``` diff --git a/docs/application/process.md b/docs/application/process.md deleted file mode 100644 index 2934201..0000000 --- a/docs/application/process.md +++ /dev/null @@ -1,309 +0,0 @@ -# Processes - ---- - -## Per Process Task Queue - -## Asynchronous Task Queue - -# Queuing - -## Sync - -## Async - -# Running - -# Example - -```c -#include -#include -#include -#include -#include -#include - -#define PROCESSES 12 - -#define I 5 - -#define FIB 30 - -typedef struct _task task; - -typedef void task_fn(task *t); - -#define TASK_STATE_SIZE 20 - -#define TASK_QUEUE_ASYNC -1 - -#define TASK_MAIN_PROCESS 0 - -typedef struct _task { - _Atomic bool join_ready; - int32_t worker_id; - void *state[TASK_STATE_SIZE]; - void *return_value; - struct _task *next, *join; - task_fn *fn; -} task; - -task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; - -_Atomic size_t task_async_queue_length = 0; - -pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; - -void __attribute__((destructor)) task_destructor(void) { - while (task_pool) { - task *t = task_pool; - task_pool = task_pool->next; - free(t); - } - pthread_mutex_destroy(&task_pool_mutex); - pthread_cond_destroy(&task_async_queue_cond); - pthread_mutex_destroy(&task_async_queue_mutex); -} - -typedef struct { - task *head, *tail; - pthread_t thread; -} process; - -process worker[PROCESSES] = {}; - -task *task_init(task_fn fn) { - task *t = NULL; - pthread_mutex_lock(&task_pool_mutex); - if (task_pool) { - t = task_pool; - task_pool = task_pool->next; - } - pthread_mutex_unlock(&task_pool_mutex); - if (!t) - t = malloc(sizeof(task)); - __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - t->worker_id = TASK_QUEUE_ASYNC; - for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) - t->state[state_index] = NULL; - t->return_value = NULL; - t->next = t->join = NULL; - t->fn = fn; - return t; -} - -void task_free(task *t) { - pthread_mutex_lock(&task_pool_mutex); - t->next = task_pool; - task_pool = t; - pthread_mutex_unlock(&task_pool_mutex); -} - -void task_queue_async(task *t) { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_tail) - task_async_queue_tail = task_async_queue_tail->next = t; - else - task_async_queue_head = task_async_queue_tail = t; - pthread_mutex_unlock(&task_async_queue_mutex); - pthread_cond_signal(&task_async_queue_cond); -} - -void task_queue_sync(task *t, int32_t worker_id) { - if (worker_id == TASK_QUEUE_ASYNC) - return task_queue_async(t); - if (worker[worker_id].tail) - worker[worker_id].tail = worker[worker_id].tail->next = t; - else - worker[worker_id].head = worker[worker_id].tail = t; - return; -} - -void task_ready_queue(task *t) { - if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) - return; - task *join = t->join; - t->join = NULL; - __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); -} - -void task_join(task *restrict t, task *restrict join, task_fn fn) { - join->fn = fn; - t->join = join; - task_ready_queue(t); -} - -void task_done(task *t) { - task_ready_queue(t); -} - -_Atomic size_t running = PROCESSES; - -void *task_loop(void *arg) { - int32_t worker_id = (intptr_t) arg; - for (;;) { - task *t = NULL; - if (worker[worker_id].head) { - t = worker[worker_id].head; - if (worker[worker_id].head != worker[worker_id].tail) - worker[worker_id].head = worker[worker_id].head->next; - else - worker[worker_id].head = worker[worker_id].tail = NULL; - } else { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_head) { - t = task_async_queue_head; - if (task_async_queue_head != task_async_queue_tail) - task_async_queue_head = task_async_queue_head->next; - else - task_async_queue_head = task_async_queue_tail = NULL; - } - pthread_mutex_unlock(&task_async_queue_mutex); - } - if (t) { - t->next = NULL; - t->worker_id = worker_id; - t->fn(t); - continue; - } - running--; - if (!running) { - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_cond_signal(&task_async_queue_cond); - break; - } - pthread_mutex_lock(&task_async_queue_mutex); - pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); - pthread_mutex_unlock(&task_async_queue_mutex); - if (!running) - break; - running++; - } - return NULL; -} - -void task_run(void) { - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); - task_loop(0); - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_join(worker[worker_id].thread, NULL); -} - -/* -fib : Fn[n] $ ( - ? { - n <= 0 { 0 } - n < 2 { 1 } - { + (fib `call n - 1; fib `call n - 2) } - } - v : Vector[`fork_type fib] $ () - @ 1 .. I { v `push fib `fork FIB } - @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `join fib_task) } - `print "Complete\n" -) -*/ - -#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) - -_Atomic size_t counter = 0; - -/* - 0: i - 1: arg - 2: child_a - 3: child_b -*/ - -void fib_c(task *t) { - task *child_a = t->state[2], *child_b = t->state[3]; - t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); - task_free(child_a); - task_free(child_b); - task_done(t); -} - -void fib_a(task *t); - -void fib_b(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - task *child_b = t->state[3] = task_init(fib_a); - child_b->state[1] = (void*) (arg - 2); - task_queue_sync(child_b, t->worker_id); - task_join(child_b, t, fib_c); -} - -void fib_a(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - if (arg <= 0) { - t->return_value = (void*) 0; - task_done(t); - return; - } - if (arg < 2) { - t->return_value = (void*) 1; - task_done(t); - return; - } - task *child_a = t->state[2] = task_init(fib_a); - child_a->state[1] = (void*) (arg - 1); - task_queue_sync(child_a, t->worker_id); - task_join(child_a, t, fib_b); -} - -intptr_t fib(intptr_t n) { - if (n <= 0) - return 0; - if (n < 2) - return 1; - return fib(n - 1) + fib(n - 2); -} - -void fib_rec(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - printf("fib thread: %d\n", t->worker_id); - t->return_value = (void*) fib(arg); - task_done(t); -} - -void start_b(task *t) { - counter++; - task *f = t->state[(intptr_t) t->state[0]]; - intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; - printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); - task_free(f); - t->state[0]++; - if ((intptr_t) t->state[0] == I + 1) { - printf("Complete\n"); - task_done(t); - task_free(t); - return; - } - task_join(t->state[(intptr_t) t->state[0]], t, start_b); -} - -void start_a(task *t) { - t->state[0] = (void*) 1; - for (intptr_t i = 0; i < I; i++) { - //task *f = task_init(fib_a); - task *f = task_init(fib_rec); - f->state[0] = (void*) i; - f->state[1] = (void*) FIB; - t->state[i + 1] = f; - task_queue_async(f); - } - task_join(t->state[(intptr_t) t->state[0]], t, start_b); -} - -int main(void) { - task *t = task_init(start_a); - task_queue_sync(t, TASK_MAIN_PROCESS); - task_run(); - assert(counter == I); - return 0; -} -``` diff --git a/docs/application/testing.md b/docs/application/testing.md new file mode 100644 index 0000000..8657bf0 --- /dev/null +++ b/docs/application/testing.md @@ -0,0 +1,17 @@ +# Testing + +--- + +## Macros + +```c +TEST(NAME) { + // TEST BODY +} + +ASSERT(CONDITION) + +FAIL() +``` + +## Object Definitions diff --git a/docs/application/thread.md b/docs/application/thread.md new file mode 100644 index 0000000..04d9c98 --- /dev/null +++ b/docs/application/thread.md @@ -0,0 +1,344 @@ +# Threads + +--- + +## Object Definitions + +```c +typedef struct _kpl_task kpl_task; + +typedef void kpl_task_fn(kpl_task *t); + +#ifndef KPL_TASK_STATE_SIZE +#define KPL_TASK_STATE_SIZE 20 +#endif + +typedef struct _kpl_task { + POOL_HEADER(kpl_task); + _Atomic bool join_ready; + uint16_t worker_id; + kpl_any state[KPL_TASK_STATE_SIZE]; + kpl_result ret; + kpl_task_fn *fn; +} kpl_task; + +typedef struct { + kpl_task *head, *tail; + pthread_t thread; +} kpl_thread; + +#define KPL_PROCESSES $(nproc) + +kpl_thread kpl_worker[KPL_PROCESSES] = {}; + +_Atomic uint16_t kpl_worker_running = KPL_PROCESSES; +``` + +## Per Process Task Queue + +## Asynchronous Task Queue + +# Queuing + +## Sync + +## Async + +# Running + +Main thread is kpl_worker[0], threads are started after that + +## Joining + +# Example + +```c +#include +#include +#include +#include +#include +#include + +#define PROCESSES 12 + +#define I 5 + +#define FIB 30 + +typedef struct _task task; + +typedef void task_fn(task *t); + +#define TASK_STATE_SIZE 20 + +#define TASK_QUEUE_ASYNC -1 + +#define TASK_MAIN_PROCESS 0 + +typedef struct _task { + _Atomic bool join_ready; + int32_t worker_id; + void *state[TASK_STATE_SIZE]; + void *return_value; + struct _task *next, *join; + task_fn *fn; +} task; + +task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; + +_Atomic size_t task_async_queue_length = 0; + +pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; + +void __attribute__((destructor)) task_destructor(void) { + while (task_pool) { + task *t = task_pool; + task_pool = task_pool->next; + free(t); + } + pthread_mutex_destroy(&task_pool_mutex); + pthread_cond_destroy(&task_async_queue_cond); + pthread_mutex_destroy(&task_async_queue_mutex); +} + +typedef struct { + task *head, *tail; + pthread_t thread; +} process; + +process worker[PROCESSES] = {}; + +task *task_init(task_fn fn) { + task *t = NULL; + pthread_mutex_lock(&task_pool_mutex); + if (task_pool) { + t = task_pool; + task_pool = task_pool->next; + } + pthread_mutex_unlock(&task_pool_mutex); + if (!t) + t = malloc(sizeof(task)); + __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); + t->worker_id = TASK_QUEUE_ASYNC; + for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) + t->state[state_index] = NULL; + t->return_value = NULL; + t->next = t->join = NULL; + t->fn = fn; + return t; +} + +void task_free(task *t) { + pthread_mutex_lock(&task_pool_mutex); + t->next = task_pool; + task_pool = t; + pthread_mutex_unlock(&task_pool_mutex); +} + +void task_queue_async(task *t) { + pthread_mutex_lock(&task_async_queue_mutex); + if (task_async_queue_tail) + task_async_queue_tail = task_async_queue_tail->next = t; + else + task_async_queue_head = task_async_queue_tail = t; + pthread_mutex_unlock(&task_async_queue_mutex); + pthread_cond_signal(&task_async_queue_cond); +} + +void task_queue_sync(task *t, int32_t worker_id) { + if (worker_id == TASK_QUEUE_ASYNC) + return task_queue_async(t); + if (worker[worker_id].tail) + worker[worker_id].tail = worker[worker_id].tail->next = t; + else + worker[worker_id].head = worker[worker_id].tail = t; +} + +void task_ready_queue(task *t) { + if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) + return; + task *join = t->join; + t->join = NULL; + __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); + task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); +} + +void task_join(task *restrict t, task *restrict join, task_fn fn) { + join->fn = fn; + t->join = join; + task_ready_queue(t); +} + +void task_done(task *t) { + task_ready_queue(t); +} + +_Atomic size_t running = PROCESSES; + +void *task_loop(void *arg) { + int32_t worker_id = (intptr_t) arg; + for (;;) { + task *t = NULL; + if (worker[worker_id].head) { + t = worker[worker_id].head; + if (worker[worker_id].head != worker[worker_id].tail) + worker[worker_id].head = worker[worker_id].head->next; + else + worker[worker_id].head = worker[worker_id].tail = NULL; + } else { + pthread_mutex_lock(&task_async_queue_mutex); + if (task_async_queue_head) { + t = task_async_queue_head; + if (task_async_queue_head != task_async_queue_tail) + task_async_queue_head = task_async_queue_head->next; + else + task_async_queue_head = task_async_queue_tail = NULL; + } + pthread_mutex_unlock(&task_async_queue_mutex); + } + if (t) { + t->next = NULL; + t->worker_id = worker_id; + t->fn(t); + continue; + } + running--; + if (!running) { + for (intptr_t worker_id = 0; worker_id < PROCESSES; worker_id++) + pthread_cond_signal(&task_async_queue_cond); + break; + } + pthread_mutex_lock(&task_async_queue_mutex); + pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); + pthread_mutex_unlock(&task_async_queue_mutex); + if (!running) + break; + running++; + } + return NULL; +} + +void task_run(void) { + for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) + pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); + task_loop(0); + for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) + pthread_join(worker[worker_id].thread, NULL); +} + +/* +fib : Fn[n] $ ( + ? { + n <= 0 { 0 } + n < 2 { 1 } + { + (fib `sync n - 1; fib `sync n - 2) } + } + v : Vector[`fork_type fib] $ () + @ 1 .. I { v `push fib `async FIB } + @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `wait fib_task) } + `print "Complete\n" +) +*/ + +#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) + +_Atomic size_t counter = 0; + +/* + 0: i + 1: arg + 2: child_a + 3: child_b +*/ + +void fib_c(task *t) { + task *child_a = t->state[2], *child_b = t->state[3]; + t->return_value = (void*) ((intptr_t) child_a->return_value + (intptr_t) child_b->return_value); + task_free(child_a); + task_free(child_b); + task_done(t); +} + +void fib_a(task *t); + +void fib_b(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + task *child_b = t->state[3] = task_init(fib_a); + child_b->state[1] = (void*) (arg - 2); + task_queue_sync(child_b, t->worker_id); + task_join(child_b, t, fib_c); +} + +void fib_a(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + if (arg <= 0) { + t->return_value = (void*) 0; + task_done(t); + return; + } + if (arg < 2) { + t->return_value = (void*) 1; + task_done(t); + return; + } + task *child_a = t->state[2] = task_init(fib_a); + child_a->state[1] = (void*) (arg - 1); + task_queue_sync(child_a, t->worker_id); + task_join(child_a, t, fib_b); +} + +intptr_t fib(intptr_t n) { + if (n <= 0) + return 0; + if (n < 2) + return 1; + return fib(n - 1) + fib(n - 2); +} + +void fib_rec(task *t) { + intptr_t arg = (intptr_t) t->state[1]; + printf("fib thread: %d\n", t->worker_id); + t->return_value = (void*) fib(arg); + task_done(t); +} + +void start_b(task *t) { + counter++; + task *f = t->state[(intptr_t) t->state[0]]; + intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; + printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); + task_free(f); + t->state[0]++; + if ((intptr_t) t->state[0] == I + 1) { + printf("Complete\n"); + task_done(t); + task_free(t); + return; + } + task_join(t->state[(intptr_t) t->state[0]], t, start_b); +} + +void start_a(task *t) { + t->state[0] = (void*) 1; + for (intptr_t i = 0; i < I; i++) { + //task *f = task_init(fib_a); + task *f = task_init(fib_rec); + f->state[0] = (void*) i; + f->state[1] = (void*) FIB; + t->state[i + 1] = f; + task_queue_async(f); + } + task_join(t->state[(intptr_t) t->state[0]], t, start_b); +} + +int main(void) { + task *t = task_init(start_a); + task_queue_sync(t, TASK_MAIN_PROCESS); + task_run(); + assert(counter == I); + return 0; +} +``` diff --git a/docs/application/type.md b/docs/application/type.md new file mode 100644 index 0000000..4cc2fc8 --- /dev/null +++ b/docs/application/type.md @@ -0,0 +1,7 @@ +# Type + +--- + +## Object Definitions + +## Allocator -- cgit v1.2.3