diff options
Diffstat (limited to 'docs')
| -rw-r--r-- | docs/application/shared.md | 30 | ||||
| -rw-r--r-- | docs/application/thread.md | 393 |
2 files changed, 220 insertions, 203 deletions
diff --git a/docs/application/shared.md b/docs/application/shared.md index c5d7e9e..b666684 100644 --- a/docs/application/shared.md +++ b/docs/application/shared.md @@ -9,33 +9,7 @@ typedef struct _kpl_shared { KPL_POOL_HEADER(_kpl_shared); _Atomic bool mutating; bool mark; - kpl_class class; - kpl_task *queue_head, queue_tail; - pthread_mutex_t queue_mutex; + void *data; + kpl_atomic_queue queue; } kpl_shared; - -static _Atomic int32_t shared_threads_marked; - -static kpl_shared *shared_pool_sweep, *shared_pool; - -static pthread_mutex_t shared_pool_mutex; ``` - -## Tracing - -1. Once triggered add a init task to the async queue -2. This task moves the `shared_pool` to the `shared_pool_sweep` and sets the `shared_wait` to off for each thread -3. Each thread will run down its queue queue marking found shared objects -3. Once all have run and `shared_threads_marked == available_threads` add a mark and sweep task to the async queue -4. The async queue is moved to a temp queue and as each item on the tmep queue is marked it is moved back to the async queue -5. A sweep on the `shared_pool_sweep` is done -6. What remains on the `shared_pool_sweep` is added to `shared_pool` - -## Mutating - -1. Check if state of `mutating` - 1. If false, set to true and add the task to the `async_queue_pool` - 2. Once complete lock `queue_mutex` and check the `queue_pool` - 3. If empty set the state of `mutating` to false - 4. Else queue the next task -2. If the state of `mutating` is true add, lock `queue_mutex` the task to `queue_tail` diff --git a/docs/application/thread.md b/docs/application/thread.md index ee2a317..b3f8d21 100644 --- a/docs/application/thread.md +++ b/docs/application/thread.md @@ -10,34 +10,38 @@ typedef struct _kpl_task kpl_task; typedef void kpl_task_fn(kpl_task *t); typedef struct _kpl_task { - KPL_POOL_HEADER(_kpl_task); - _Atomic bool next_ready; - uint16_t thread_id; - kpl_task_fn *fn; + _Atomic bool join_ready; + int32_t thread_id; kpl_group *state; - kpl_interface *ret_interface; kpl_result ret; + kpl_task *_Atomic next, *join; + task_fn *fn; } kpl_task; -typedef struct { - _Atomic bool gc_wait; - kpl_task *queue_head, *queue_tail; - pthread_t thread; -} kpl_thread; - -#define KPL_MAX_THREADS 64 +#define KPL_TASK_SLAB_SIZE 50 -static int32_t available_threads; // find with sched_getaffinity +typedef struct _kpl_task_slab { + size_t array_index; + struct _task_slab *next; + kpl_task array[KPL_TASK_SLAB_SIZE]; +} kpl_task_slab; -static _Atomic int32_t running_threads; // init to available_threads +#define KPL_QUEUE_ASYNC -1 -static kpl_task *async_queue_head, *async_queue_tail; +#define KPL_MAIN_THREAD 0 -static pthread_mutex_t async_queue_mutex; - -static pthread_cond_t async_queue_cond; +typedef struct { + kpl_task *_Atomic head, *_Atomic tail, dummy; +} kpl_atomic_queue; -static kpl_thread threads[KPL_MAX_THREADS]; +typedef struct { + kpl_atomic_queue queue; + _Atomic ssize_t priority; + kpl_task_slab *slab; + kpl_task *pool; + sem_t counter; + pthread_t thread; +} thread; ``` ## Initialization @@ -66,213 +70,264 @@ iterator : arguments, locals, iterator functions, iterator function index # Running -Main thread is kpl_worker[0], threads are started after that - ## Joining # Example ```c +#define _GNU_SOURCE #include <stdlib.h> #include <stdio.h> #include <stdint.h> #include <stdbool.h> #include <pthread.h> +#include <semaphore.h> +#include <sched.h> #include <assert.h> -#define PROCESSES 12 +#define I 50 -#define I 5 - -#define FIB 30 +#define FIB 25 typedef struct _task task; typedef void task_fn(task *t); -#define TASK_STATE_SIZE 20 - -#define TASK_QUEUE_ASYNC -1 - -#define TASK_MAIN_PROCESS 0 +#define TASK_STATE_SIZE 55 typedef struct _task { _Atomic bool join_ready; - int32_t worker_id; + int32_t thread_id; void *state[TASK_STATE_SIZE]; void *return_value; - struct _task *next, *join; + task *_Atomic next, *join; task_fn *fn; } task; -task *task_pool = NULL, *task_async_queue_head = NULL, *task_async_queue_tail = NULL; +#define TASK_SLAB_SIZE 50 -_Atomic size_t task_async_queue_length = 0; +typedef struct _task_slab { + size_t array_index; + struct _task_slab *next; + task array[TASK_SLAB_SIZE]; +} task_slab; -pthread_mutex_t task_pool_mutex = PTHREAD_MUTEX_INITIALIZER, task_async_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +#define QUEUE_ASYNC -1 -pthread_cond_t task_async_queue_cond = PTHREAD_COND_INITIALIZER; +#define MAIN_THREAD 0 -void __attribute__((destructor)) task_destructor(void) { - while (task_pool) { - task *t = task_pool; - task_pool = task_pool->next; - free(t); - } - pthread_mutex_destroy(&task_pool_mutex); - pthread_cond_destroy(&task_async_queue_cond); - pthread_mutex_destroy(&task_async_queue_mutex); -} +typedef struct { + task *_Atomic head, *_Atomic tail, dummy; +} atomic_queue; typedef struct { - task *head, *tail; + atomic_queue queue; + _Atomic ssize_t priority; + task_slab *slab; + task *pool; + sem_t counter; pthread_t thread; -} process; +} thread; + +int32_t avaiable_threads = {}; + +#define MAX_THREADS 64 + +thread threads[MAX_THREADS]; -process worker[PROCESSES] = {}; +_Atomic ssize_t total_priority = {}; + +void priority_increment(int32_t thread_id) { + threads[thread_id].priority++; + total_priority++; +} + +void priority_decrement(int32_t thread_id) { + threads[thread_id].priority--; + total_priority--; +} + +void task_slab_init(int32_t thread_id) { + task_slab *slab = calloc(1, sizeof(task_slab)); + slab->next = threads[thread_id].slab; + threads[thread_id].slab = slab; +} + +task *task_slab_get(int32_t thread_id) { + if (threads[thread_id].slab->array_index < TASK_SLAB_SIZE) + return &threads[thread_id].slab->array[threads[thread_id].slab->array_index++]; + task_slab_init(thread_id); + return task_slab_get(thread_id); +} + +void task_slab_free(int32_t thread_id) { + task_slab *slab = threads[thread_id].slab; + while (slab) { + task_slab *tmp = slab; + slab = slab->next; + free(tmp); + } +} + +void task_queue_init(int32_t thread_id) { + threads[thread_id].queue.head = &threads[thread_id].queue.dummy; + threads[thread_id].queue.tail = &threads[thread_id].queue.dummy; + threads[thread_id].queue.dummy.next = NULL; + threads[thread_id].priority = 1; + threads[thread_id].pool = NULL; +} + +void task_queue_add(int32_t thread_id, task *t) { + t->next = NULL; + task *head = __atomic_exchange_n(&threads[thread_id].queue.head, t, __ATOMIC_SEQ_CST); + head->next = t; + if (t != &threads[thread_id].queue.dummy) + priority_increment(thread_id); +} + +task *task_queue_next(int32_t thread_id) { + task *t = NULL; + for (;;) { + task *tail = threads[thread_id].queue.tail, *next = tail->next; + if (tail == &threads[thread_id].queue.dummy) { + if (!next) + break; + threads[thread_id].queue.tail = next; + tail = next; + next = tail->next; + } + if (next) { + threads[thread_id].queue.tail = next; + t = tail; + break; + } + task *head = threads[thread_id].queue.head; + if (tail != head) + continue; + task_queue_add(thread_id, &threads[thread_id].queue.dummy); + next = tail->next; + if (next) { + threads[thread_id].queue.tail = next; + t = tail; + break; + } + } + if (t) { + t->next = NULL; + priority_decrement(thread_id); + } + return t; +} -task *task_init(task_fn fn) { +task *task_init(task_fn *fn, int32_t thread_id) { task *t = NULL; - pthread_mutex_lock(&task_pool_mutex); - if (task_pool) { - t = task_pool; - task_pool = task_pool->next; + if (threads[thread_id].pool) { + t = threads[thread_id].pool; + threads[thread_id].pool = threads[thread_id].pool->join; } - pthread_mutex_unlock(&task_pool_mutex); if (!t) - t = malloc(sizeof(task)); + t = task_slab_get(thread_id); __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - t->worker_id = TASK_QUEUE_ASYNC; - for (ssize_t state_index = 0; state_index < TASK_STATE_SIZE; state_index++) - t->state[state_index] = NULL; + t->thread_id = QUEUE_ASYNC; + for (ssize_t state_id = 0; state_id < TASK_STATE_SIZE; state_id++) + t->state[state_id] = NULL; t->return_value = NULL; - t->next = t->join = NULL; + t->next = NULL; + t->join = NULL; t->fn = fn; return t; } void task_free(task *t) { - pthread_mutex_lock(&task_pool_mutex); - t->next = task_pool; - task_pool = t; - pthread_mutex_unlock(&task_pool_mutex); + t->join = threads[t->thread_id].pool; + threads[t->thread_id].pool = t; } void task_queue_async(task *t) { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_tail) - task_async_queue_tail = task_async_queue_tail->next = t; - else - task_async_queue_head = task_async_queue_tail = t; - pthread_mutex_unlock(&task_async_queue_mutex); - pthread_cond_signal(&task_async_queue_cond); -} - -void task_queue_sync(task *t, int32_t worker_id) { - if (worker_id == TASK_QUEUE_ASYNC) - return task_queue_async(t); - if (worker[worker_id].tail) - worker[worker_id].tail = worker[worker_id].tail->next = t; - else - worker[worker_id].head = worker[worker_id].tail = t; + int32_t queue_thread_id = 0; + ssize_t queue_priority = threads[0].priority; + for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) { + if (threads[thread_id].priority < queue_priority) { + queue_priority = threads[thread_id].priority; + queue_thread_id = thread_id; + } + } + task_queue_add(queue_thread_id, t); + sem_post(&threads[queue_thread_id].counter); } -void task_ready_queue(task *t) { +void task_done(task *t) { if (!__atomic_test_and_set(&t->join_ready, __ATOMIC_SEQ_CST) || !t->join) return; task *join = t->join; t->join = NULL; __atomic_clear(&t->join_ready, __ATOMIC_SEQ_CST); - task_queue_sync(join, join->worker_id == t->worker_id ? t->worker_id : TASK_QUEUE_ASYNC); + if (join->thread_id == t->thread_id) + task_queue_add(t->thread_id, join); + else + task_queue_async(join); } -void task_join(task *restrict t, task *restrict join, task_fn fn) { +void task_join(task *restrict t, task *restrict join, task_fn *fn) { join->fn = fn; t->join = join; - task_ready_queue(t); -} - -void task_done(task *t) { - task_ready_queue(t); + task_done(t); } -_Atomic size_t running = PROCESSES; - void *task_loop(void *arg) { - int32_t worker_id = (intptr_t) arg; + const int32_t thread_id = (int32_t) (intptr_t) arg; + cpu_set_t cpus; + CPU_ZERO(&cpus); + CPU_SET(thread_id, &cpus); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpus), &cpus)) + exit(thread_id + 1); for (;;) { - task *t = NULL; - if (worker[worker_id].head) { - t = worker[worker_id].head; - if (worker[worker_id].head != worker[worker_id].tail) - worker[worker_id].head = worker[worker_id].head->next; - else - worker[worker_id].head = worker[worker_id].tail = NULL; - } else { - pthread_mutex_lock(&task_async_queue_mutex); - if (task_async_queue_head) { - t = task_async_queue_head; - if (task_async_queue_head != task_async_queue_tail) - task_async_queue_head = task_async_queue_head->next; - else - task_async_queue_head = task_async_queue_tail = NULL; - } - pthread_mutex_unlock(&task_async_queue_mutex); - } + task *t = task_queue_next(thread_id); if (t) { - t->next = NULL; - t->worker_id = worker_id; + t->thread_id = thread_id; t->fn(t); continue; } - running--; - if (!running) { - for (intptr_t worker_id = 0; worker_id < PROCESSES; worker_id++) - pthread_cond_signal(&task_async_queue_cond); + priority_decrement(thread_id); + if (!total_priority) { + for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) + sem_post(&threads[thread_id].counter); break; } - pthread_mutex_lock(&task_async_queue_mutex); - pthread_cond_wait(&task_async_queue_cond, &task_async_queue_mutex); - pthread_mutex_unlock(&task_async_queue_mutex); - if (!running) + sem_wait(&threads[thread_id].counter); + if (!total_priority) break; - running++; + priority_increment(thread_id); } return NULL; } -void task_run(void) { - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_create(&worker[worker_id].thread, NULL, task_loop, (void*) worker_id); +void task_run() { + for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) + pthread_create(&threads[thread_id].thread, NULL, task_loop, (void*) (intptr_t) thread_id); task_loop(0); - for (intptr_t worker_id = 1; worker_id < PROCESSES; worker_id++) - pthread_join(worker[worker_id].thread, NULL); + for (int32_t thread_id = 1; thread_id < avaiable_threads; thread_id++) + pthread_join(threads[thread_id].thread, NULL); } -/* -fib : Fn[n] $ ( - ? { - n <= 0 { 0 } - n < 2 { 1 } - { + (fib `sync n - 1; fib `sync n - 2) } +void task_constructor(void) { + cpu_set_t cpus; + CPU_ZERO(&cpus); + sched_getaffinity(0, sizeof(cpus), &cpus); + total_priority = avaiable_threads = CPU_COUNT(&cpus); + for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) { + task_queue_init(thread_id); + task_slab_init(thread_id); + sem_init(&threads[thread_id].counter, 0, 1); } - v : Array[`fork_type fib] $ () - @ 1 .. I { v `push fib `async FIB } - @ v {[fib_task] `print "fib(%) = %\n" `format (FIB; `await fib_task) } - `print "Complete\n" -) -*/ - -#define LOG printf("%d: %s\n", t->worker_id, __FUNCTION__) - -_Atomic size_t counter = 0; +} -/* - 0: i - 1: arg - 2: child_a - 3: child_b -*/ +void task_destructor(void) { + for (int32_t thread_id = 0; thread_id < avaiable_threads; thread_id++) { + sem_destroy(&threads[thread_id].counter); + task_slab_free(thread_id); + } +} void fib_c(task *t) { task *child_a = t->state[2], *child_b = t->state[3]; @@ -286,50 +341,37 @@ void fib_a(task *t); void fib_b(task *t) { intptr_t arg = (intptr_t) t->state[1]; - task *child_b = t->state[3] = task_init(fib_a); + task *child_b = t->state[3] = task_init(fib_a, t->thread_id); child_b->state[1] = (void*) (arg - 2); - task_queue_sync(child_b, t->worker_id); + task_queue_add(t->thread_id, child_b); task_join(child_b, t, fib_c); } void fib_a(task *t) { intptr_t arg = (intptr_t) t->state[1]; if (arg <= 0) { - t->return_value = (void*) 0; - task_done(t); - return; + t->return_value = (void*) 0; + task_done(t); + return; } if (arg < 2) { t->return_value = (void*) 1; task_done(t); return; } - task *child_a = t->state[2] = task_init(fib_a); + task *child_a = t->state[2] = task_init(fib_a, t->thread_id); child_a->state[1] = (void*) (arg - 1); - task_queue_sync(child_a, t->worker_id); + task_queue_add(t->thread_id, child_a); task_join(child_a, t, fib_b); } -intptr_t fib(intptr_t n) { - if (n <= 0) - return 0; - if (n < 2) - return 1; - return fib(n - 1) + fib(n - 2); -} - -void fib_rec(task *t) { - intptr_t arg = (intptr_t) t->state[1]; - printf("fib thread: %d\n", t->worker_id); - t->return_value = (void*) fib(arg); - task_done(t); -} +_Atomic size_t counter = 0; void start_b(task *t) { counter++; task *f = t->state[(intptr_t) t->state[0]]; intptr_t i = (intptr_t) f->state[0], arg = (intptr_t) f->state[1], ret = (intptr_t) f->return_value; - printf("i: %ld, fib(%ld) = %ld\n", i + 1, arg, ret); + printf("i: %ld, fib(%ld) = %ld, thread: %d\n", i, arg, ret, f->thread_id); task_free(f); t->state[0]++; if ((intptr_t) t->state[0] == I + 1) { @@ -343,21 +385,22 @@ void start_b(task *t) { void start_a(task *t) { t->state[0] = (void*) 1; - for (intptr_t i = 0; i < I; i++) { - //task *f = task_init(fib_a); - task *f = task_init(fib_rec); + for (intptr_t i = 1; i <= I; i++) { + task *f = task_init(fib_a, t->thread_id); f->state[0] = (void*) i; f->state[1] = (void*) FIB; - t->state[i + 1] = f; + t->state[i] = f; task_queue_async(f); } task_join(t->state[(intptr_t) t->state[0]], t, start_b); } int main(void) { - task *t = task_init(start_a); - task_queue_sync(t, TASK_MAIN_PROCESS); + task_constructor(); + task *t = task_init(start_a, MAIN_THREAD); + task_queue_add(MAIN_THREAD, t); task_run(); + task_destructor(); assert(counter == I); return 0; } |
