summaryrefslogtreecommitdiff
path: root/docs/application
diff options
context:
space:
mode:
authornodist <kevin.comas.git@gmail.com>2026-06-15 11:49:47 -0400
committernodist <kevin.comas.git@gmail.com>2026-06-15 11:49:47 -0400
commit7842aabba533dbd639039dd341dd64ee47cd4cd7 (patch)
tree2b9429acba5f036c88e7f2a470a3cdfff93bd2c2 /docs/application
parent7fd642630ee17f7150c1b881631acbe422b3a26b (diff)
atomic threading
Diffstat (limited to 'docs/application')
-rw-r--r--docs/application/shared.md30
-rw-r--r--docs/application/thread.md393
2 files changed, 220 insertions, 203 deletions
diff --git a/docs/application/shared.md b/docs/application/shared.md
index c5d7e9e..b666684 100644
--- a/docs/application/shared.md
+++ b/docs/application/shared.md
@@ -9,33 +9,7 @@ typedef struct _kpl_shared {
KPL_POOL_HEADER(_kpl_shared);
_Atomic bool mutating;
bool mark;
- kpl_class class;
- kpl_task *queue_head, queue_tail;
- pthread_mutex_t queue_mutex;
+ void *data;
+ kpl_atomic_queue queue;
} kpl_shared;
-
-static _Atomic int32_t shared_threads_marked;
-
-static kpl_shared *shared_pool_sweep, *shared_pool;
-
-static pthread_mutex_t shared_pool_mutex;
```
-
-## Tracing
-
-1. Once triggered add a init task to the async queue
-2. This task moves the `shared_pool` to the `shared_pool_sweep` and sets the `shared_wait` to off for each thread
-3. Each thread will run down its queue queue marking found shared objects
-3. Once all have run and `shared_threads_marked == available_threads` add a mark and sweep task to the async queue
-4. The async queue is moved to a temp queue and as each item on the tmep queue is marked it is moved back to the async queue
-5. A sweep on the `shared_pool_sweep` is done
-6. What remains on the `shared_pool_sweep` is added to `shared_pool`
-
-## Mutating
-
-1. Check if state of `mutating`
- 1. If false, set to true and add the task to the `async_queue_pool`
- 2. Once complete lock `queue_mutex` and check the `queue_pool`
- 3. If empty set the state of `mutating` to false
- 4. Else queue the next task
-2. If the state of `mutating` is true add, lock `queue_mutex` the task to `queue_tail`
diff --git a/docs/application/thread.md b/docs/application/thread.md
index ee2a317..b3f8d21 100644
--- a/docs/application/thread.md
+++ b/docs/application/thread.md
@@ -10,34 +10,38 @@ typedef struct _kpl_task kpl_task;
typedef void kpl_task_fn(kpl_task *t);
typedef struct _kpl_task {
- KPL_POOL_HEADER(_kpl_task);
- _Atomic bool next_ready;
- uint16_t thread_id;
- kpl_task_fn *fn;
+ _Atomic bool join_ready;
+ int32_t thread_id;
kpl_group *state;
- kpl_interface *ret_interface;
kpl_result ret;
+ kpl_task *_Atomic next, *join;
+ task_fn *fn;
} kpl_task;
-typedef struct {
- _Atomic bool gc_wait;
- kpl_task *queue_head, *queue_tail;
- pthread_t thread;
-} kpl_thread;
-
-#define KPL_MAX_THREADS 64
+#define KPL_TASK_SLAB_SIZE 50
-static int32_t available_threads; // find with sched_getaffinity
+typedef struct _kpl_task_slab {
+ size_t array_index;
+ struct _task_slab *next;
+ kpl_task array[KPL_TASK_SLAB_SIZE];
+} kpl_task_slab;
-static _Atomic int32_t running_threads; // init to available_threads
+#define KPL_QUEUE_ASYNC -1
-static kpl_task *async_queue_head, *async_queue_tail;
+#define KPL_MAIN_THREAD 0
-static pthread_mutex_t async_queue_mutex;
-
-static pthread_cond_t async_queue_cond;
+typedef struct {
+ kpl_task *_Atomic head, *_Atomic tail, dummy;
+} kpl_atomic_queue;
-static kpl_thread threads[KPL_MAX_THREADS];
+typedef struct {
+ kpl_atomic_queue queue;
+ _Atomic ssize_t priority;
+ kpl_task_slab *slab;
+ kpl_task *pool;
+ sem_t counter;
+ pthread_t thread;
+} thread;
```
## Initialization
@@ -66,213 +70,264 @@ iterator : arguments, locals, iterator functions, iterator function index
# Running
-Main thread is kpl_worker[0], threads are started after that
-
## Joining
# Example
```c
+#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
+#include <semaphore.h>
+#include <sched.h>
#include <assert.h>
-#define PROCESSES 12
+#define I 50
-#define I 5
-
-#define FIB 30
+#define FIB 25
typedef struct _task task;
typedef void task_fn(task *t);
-#define TASK_STATE_SIZE 20
-
-#define TASK_QUEUE_ASYNC -1
-
-#define TASK_MAIN_PROCESS 0
+#define TASK_STATE_SIZE 55
typedef struct _task {
_Atomic bool join_ready;
- int32_t worker_id;
+ int32_t thread_id;
void *state[TASK_STATE_SIZE];
void *return_value;
- struct _task *next, *join;
+ task *_Atomic next, *join;
task_fn *fn;
} task;
-task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL;
+#define TASK_SLAB_SIZE 50
-_Atomic size_t task_async_queue_length = 0;
+typedef struct _task_slab {
+ size_t array_index;
+ struct _task_slab *next;
+ task array[TASK_SLAB_SIZE];
+} task_slab;
-pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+#define QUEUE_ASYNC -1
-pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER;
+#define MAIN_THREAD 0
-void __attribute__((destructor)) task_destructor(void) {
- while (task_pool) {
- task *t = task_pool;
- task_pool = task_pool->next;
- free(t);
- }
- pthread_mutex_destroy(&task_pool_mutex);
- pthread_cond_destroy(&task_async_queue_cond);
- pthread_mutex_destroy(&task_async_queue_mutex);
-}
+typedef struct {
+ task *_Atomic head, *_Atomic tail, dummy;
+} atomic_queue;
typedef struct {
- task *head, *tail;
+ atomic_queue queue;
+ _Atomic ssize_t priority;
+ task_slab *slab;
+ task *pool;
+ sem_t counter;
pthread_t thread;
-} process;
+} thread;
+
+int32_t avaiable_threads = {};
+
+#define MAX_THREADS 64
+
+thread threads[MAX_THREADS];
-process worker[PROCESSES] = {};
+_Atomic ssize_t total_priority = {};
+
+void priority_increment(int32_t thread_id) {
+ threads[thread_id].priority++;
+ total_priority++;
+}
+
+void priority_decrement(int32_t thread_id) {
+ threads[thread_id].priority--;
+ total_priority--;
+}
+
+void task_slab_init(int32_t thread_id) {
+ task_slab *slab = calloc(1, sizeof(task_slab));
+ slab->next = threads[thread_id].slab;
+ threads[thread_id].slab = slab;
+}
+
+task *task_slab_get(int32_t thread_id) {
+ if (threads[thread_id].slab->array_index < TASK_SLAB_SIZE)
+ return &threads[thread_id].slab->array[threads[thread_id].slab->array_index++];
+ task_slab_init(thread_id);
+ return task_slab_get(thread_id);
+}
+
+void task_slab_free(int32_t thread_id) {
+ task_slab *slab = threads[thread_id].slab;
+ while (slab) {
+ task_slab *tmp = slab;
+ slab = slab->next;
+ free(tmp);
+ }
+}
+
+void task_queue_init(int32_t thread_id) {
+ threads[thread_id].queue.head = &threads[thread_id].queue.dummy;
+ threads[thread_id].queue.tail = &threads[thread_id].queue.dummy;
+ threads[thread_id].queue.dummy.next = NULL;
+ threads[thread_id].priority = 1;
+ threads[thread_id].pool = NULL;
+}
+
+void task_queue_add(int32_t thread_id, task *t) {
+ t->next = NULL;
+ task *head = __atomic_exchange_n(&threads[thread_id].queue.head, t, __ATOMIC_SEQ_CST);
+ head->next = t;
+ if (t != &threads[thread_id].queue.dummy)
+ priority_increment(thread_id);
+}
+
+task *task_queue_next(int32_t thread_id) {
+ task *t = NULL;
+ for (;;) {
+ task *tail = threads[thread_id].queue.tail, *next = tail->next;
+ if (tail == &threads[thread_id].queue.dummy) {
+ if (!next)
+ break;
+ threads[thread_id].queue.tail = next;
+ tail = next;
+ next = tail->next;
+ }
+ if (next) {
+ threads[thread_id].queue.tail = next;
+ t = tail;
+ break;
+ }
+ task *head = threads[thread_id].queue.head;
+ if (tail != head)
+ continue;
+ task_queue_add(thread_id, &threads[thread_id].queue.dummy);
+ next = tail->next;
+ if (next) {
+ threads[thread_id].queue.tail = next;
+ t = tail;
+ break;
+ }
+ }
+ if (t) {
+ t->next = NULL;
+ priority_decrement(thread_id);
+ }
+ return t;
+}
-task *task_init(task_fn fn) {
+task *task_init(task_fn *fn, int32_t thread_id) {
task *t = NULL;
- pthread_mutex_lock(&task_pool_mutex);
- if (task_pool) {
- t = task_pool;
- task_pool = task_pool->next;
+ if (threads[thread_id].pool) {
+ t = threads[thread_id].pool;
+ threads[thread_id].pool = threads[thread_id].pool->join;
}
- pthread_mutex_unlock(&task_pool_mutex);
if (!t)
- t = malloc(sizeof(task));
+ t = task_slab_get(thread_id);
__atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
- t->worker_id = TASK_QUEUE_ASYNC;
- for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++)
- t->state[state_index] = NULL;
+ t->thread_id = QUEUE_ASYNC;
+ for (ssize_t state_id = 0; state_id < TASK_STATE_SIZE; state_id++)
+ t->state[state_id] = NULL;
t->return_value = NULL;
- t->next = t->join = NULL;
+ t->next = NULL;
+ t->join = NULL;
t->fn = fn;
return t;
}
void task_free(task *t) {
- pthread_mutex_lock(&task_pool_mutex);
- t->next = task_pool;
- task_pool = t;
- pthread_mutex_unlock(&task_pool_mutex);
+ t->join = threads[t->thread_id].pool;
+ threads[t->thread_id].pool = t;
}
void task_queue_async(task *t) {
- pthread_mutex_lock(&task_async_queue_mutex);
- if (task_async_queue_tail)
- task_async_queue_tail = task_async_queue_tail->next = t;
- else
- task_async_queue_head = task_async_queue_tail = t;
- pthread_mutex_unlock(&task_async_queue_mutex);
- pthread_cond_signal(&task_async_queue_cond);
-}
-
-void task_queue_sync(task *t, int32_t worker_id) {
- if (worker_id == TASK_QUEUE_ASYNC)
- return task_queue_async(t);
- if (worker[worker_id].tail)
- worker[worker_id].tail = worker[worker_id].tail->next = t;
- else
- worker[worker_id].head = worker[worker_id].tail = t;
+ int32_t queue_thread_id = 0;
+ ssize_t queue_priority = threads[0].priority;
+ for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) {
+ if (threads[thread_id].priority < queue_priority) {
+ queue_priority = threads[thread_id].priority;
+ queue_thread_id = thread_id;
+ }
+ }
+ task_queue_add(queue_thread_id, t);
+ sem_post(&threads[queue_thread_id].counter);
}
-void task_ready_queue(task *t) {
+void task_done(task *t) {
if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join)
return;
task *join = t->join;
t->join = NULL;
__atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST);
- task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC);
+ if (join->thread_id == t->thread_id)
+ task_queue_add(t->thread_id, join);
+ else
+ task_queue_async(join);
}
-void task_join(task *restrict t, task *restrict join, task_fn fn) {
+void task_join(task *restrict t, task *restrict join, task_fn *fn) {
join->fn = fn;
t->join = join;
- task_ready_queue(t);
-}
-
-void task_done(task *t) {
- task_ready_queue(t);
+ task_done(t);
}
-_Atomic size_t running = PROCESSES;
-
void *task_loop(void *arg) {
- int32_t worker_id = (intptr_t) arg;
+ const int32_t thread_id = (int32_t) (intptr_t) arg;
+ cpu_set_t cpus;
+ CPU_ZERO(&cpus);
+ CPU_SET(thread_id, &cpus);
+ if (pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus))
+ exit(thread_id + 1);
for (;;) {
- task *t = NULL;
- if (worker[worker_id].head) {
- t = worker[worker_id].head;
- if (worker[worker_id].head != worker[worker_id].tail)
- worker[worker_id].head = worker[worker_id].head->next;
- else
- worker[worker_id].head = worker[worker_id].tail = NULL;
- } else {
- pthread_mutex_lock(&task_async_queue_mutex);
- if (task_async_queue_head) {
- t = task_async_queue_head;
- if (task_async_queue_head != task_async_queue_tail)
- task_async_queue_head = task_async_queue_head->next;
- else
- task_async_queue_head = task_async_queue_tail = NULL;
- }
- pthread_mutex_unlock(&task_async_queue_mutex);
- }
+ task *t = task_queue_next(thread_id);
if (t) {
- t->next = NULL;
- t->worker_id = worker_id;
+ t->thread_id = thread_id;
t->fn(t);
continue;
}
- running--;
- if (!running) {
- for (intptr_t worker_id = 0; worker_id < PROCESSES; worker_id++)
- pthread_cond_signal(&task_async_queue_cond);
+ priority_decrement(thread_id);
+ if (!total_priority) {
+ for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++)
+ sem_post(&threads[thread_id].counter);
break;
}
- pthread_mutex_lock(&task_async_queue_mutex);
- pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex);
- pthread_mutex_unlock(&task_async_queue_mutex);
- if (!running)
+ sem_wait(&threads[thread_id].counter);
+ if (!total_priority)
break;
- running++;
+ priority_increment(thread_id);
}
return NULL;
}
-void task_run(void) {
- for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
- pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id);
+void task_run() {
+ for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++)
+ pthread_create(&threads[thread_id].thread, NULL, task_loop, (void*) (intptr_t) thread_id);
task_loop(0);
- for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++)
- pthread_join(worker[worker_id].thread, NULL);
+ for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++)
+ pthread_join(threads[thread_id].thread, NULL);
}
-/*
-fib : Fn[n] $ (
- ? {
- n <= 0 { 0 }
- n < 2 { 1 }
- { + (fib `sync n - 1; fib `sync n - 2) }
+void task_constructor(void) {
+ cpu_set_t cpus;
+ CPU_ZERO(&cpus);
+ sched_getaffinity(0, sizeof(cpus), &cpus);
+ total_priority = avaiable_threads = CPU_COUNT(&cpus);
+ for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) {
+ task_queue_init(thread_id);
+ task_slab_init(thread_id);
+ sem_init(&threads[thread_id].counter, 0, 1);
}
- v : Array[`fork_type fib] $ ()
- @ 1 .. I { v `push fib `async FIB }
- @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `await fib_task) }
- `print "Complete\n"
-)
-*/
-
-#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__)
-
-_Atomic size_t counter = 0;
+}
-/*
- 0: i
- 1: arg
- 2: child_a
- 3: child_b
-*/
+void task_destructor(void) {
+ for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) {
+ sem_destroy(&threads[thread_id].counter);
+ task_slab_free(thread_id);
+ }
+}
void fib_c(task *t) {
task *child_a = t->state[2], *child_b = t->state[3];
@@ -286,50 +341,37 @@ void fib_a(task *t);
void fib_b(task *t) {
intptr_t arg = (intptr_t) t->state[1];
- task *child_b = t->state[3] = task_init(fib_a);
+ task *child_b = t->state[3] = task_init(fib_a, t->thread_id);
child_b->state[1] = (void*) (arg - 2);
- task_queue_sync(child_b, t->worker_id);
+ task_queue_add(t->thread_id, child_b);
task_join(child_b, t, fib_c);
}
void fib_a(task *t) {
intptr_t arg = (intptr_t) t->state[1];
if (arg <= 0) {
- t->return_value = (void*) 0;
- task_done(t);
- return;
+ t->return_value = (void*) 0;
+ task_done(t);
+ return;
}
if (arg < 2) {
t->return_value = (void*) 1;
task_done(t);
return;
}
- task *child_a = t->state[2] = task_init(fib_a);
+ task *child_a = t->state[2] = task_init(fib_a, t->thread_id);
child_a->state[1] = (void*) (arg - 1);
- task_queue_sync(child_a, t->worker_id);
+ task_queue_add(t->thread_id, child_a);
task_join(child_a, t, fib_b);
}
-intptr_t fib(intptr_t n) {
- if (n <= 0)
- return 0;
- if (n < 2)
- return 1;
- return fib(n - 1) + fib(n - 2);
-}
-
-void fib_rec(task *t) {
- intptr_t arg = (intptr_t) t->state[1];
- printf("fib thread: %d\n", t->worker_id);
- t->return_value = (void*) fib(arg);
- task_done(t);
-}
+_Atomic size_t counter = 0;
void start_b(task *t) {
counter++;
task *f = t->state[(intptr_t) t->state[0]];
intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value;
- printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret);
+ printf("i: %ld, fib(%ld) = %ld, thread: %d\n", i, arg, ret, f->thread_id);
task_free(f);
t->state[0]++;
if ((intptr_t) t->state[0] == I + 1) {
@@ -343,21 +385,22 @@ void start_b(task *t) {
void start_a(task *t) {
t->state[0] = (void*) 1;
- for (intptr_t i = 0; i < I; i++) {
- //task *f = task_init(fib_a);
- task *f = task_init(fib_rec);
+ for (intptr_t i = 1; i <= I; i++) {
+ task *f = task_init(fib_a, t->thread_id);
f->state[0] = (void*) i;
f->state[1] = (void*) FIB;
- t->state[i + 1] = f;
+ t->state[i] = f;
task_queue_async(f);
}
task_join(t->state[(intptr_t) t->state[0]], t, start_b);
}
int main(void) {
- task *t = task_init(start_a);
- task_queue_sync(t, TASK_MAIN_PROCESS);
+ task_constructor();
+ task *t = task_init(start_a, MAIN_THREAD);
+ task_queue_add(MAIN_THREAD, t);
task_run();
+ task_destructor();
assert(counter == I);
return 0;
}