// Thread_Group Internal Procedures // #NOTE: There is no logging in this implementation! void init(Work_List* list) { Assert(list != nullptr); semaphore_init(&list->semaphore); mutex_init(&list->mutex); } void destroy(Work_List* list) { Assert(list != nullptr); semaphore_destroy(&list->semaphore); mutex_destroy(&list->mutex); } void add_work(Work_List* list, Work_Entry* entry) { lock(&list->mutex); if (list->last) { // This list has nodes in it. Put this entry onto the end so we're FIFO list->last->next = entry; list->last = entry; } else { // The list is empty, the list will have 1 entry: list->first = entry; list->last = entry; } list->count += 1; unlock(&list->mutex); signal(&list->semaphore); } Work_Entry* get_work(Work_List* list) { lock(&list->mutex); // Grab first node; may be null if work has been stolen Work_Entry* result = list->first; if (result) { // Update the head of the list to be the next item list->first = result->next; // If the new 'first' pointer is null, the list has become empty, so set 'last' to null also if (list->first == nullptr) { list->last = nullptr; } } unlock(&list->mutex); return result; } s64 thread_group_run (Thread* thread) { // This is the main loop that a particular thread runs. // It waits on its semaphore, looking for new work on its 'available' list. // When it finds work, it calls the thread group procedure with the work, // then puts the work into its 'completed' list. Worker_Info* info = thread->worker_info; Thread_Group* group = info->group; Work_Entry* entry = nullptr; while(!group->should_exit) { if (!entry) { wait_for(&info->available.semaphore); if (group->should_exit) break; // Remove work from the list. Might be none if another thread stole. entry = get_work(&info->available); } if (entry) { entry->thread_index = thread->index; entry->next = nullptr; Thread_Continue_Status should_continue = Thread_Continue_Status::CONTINUE; if (group->proc) { should_continue = group->proc(group, thread, entry->work); } // The work is done, add it to the completed list: add_work(&info->completed, entry); if (should_continue == Thread_Continue_Status::STOP) { break; } } if (info->work_steal_indices.count) { if (group->should_exit) break; // Check for more work. If there's none, try to steal some before going to sleep. entry = get_work(&info->available); if (entry) { // Decrement the semaphore for the work we dequeued. wait_for(&info->available.semaphore); } else { // no work left, let's steal some for (s64 i = 0; i < info->work_steal_indices.count; i += 1) { entry = get_work(&group->worker_info[i].available); if (entry) { break; } } } } else { entry = nullptr; } } return 0; } // Thread_Group API: void thread_group_init (Thread_Group* group, s32 group_thread_count, Thread_Group_Proc group_proc, bool enable_work_stealing = false) { // Set allocator if not already set: if (!group->allocator.proc) { group->allocator = context_allocator(); } push_allocator(group->allocator); group->worker_info = ArrayView(group_thread_count); group->proc = group_proc; for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; thread_init(&info->thread, thread_group_run); info->thread.worker_info = info; init(&info->available); init(&info->completed); info->group = group; info->worker_index = (s32)i; if (enable_work_stealing) { // Make an array that contains all worker indices except for ours. // This gets shuffled for work stealing. Why? Because we want to // search through 2 threads for work, but if they are mostly empty, // we might have to scan through the whole array anyway. Maybe this // is not the best way to do it. info->work_steal_indices = ArrayView(group_thread_count-1, false); s32 cursor = 0; for (s32 j = 0; j < group_thread_count; j += 1) { if (j == i) continue; info->work_steal_indices[cursor] = j; cursor += 1; } } } group->initialized = true; } void start (Thread_Group* group) { Assert(group->worker_info.count > 0); for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; thread_start(&info->thread); } group->started = true; } bool shutdown (Thread_Group* group, s32 timeout_milliseconds = -1) { Assert(group->initialized); group->should_exit = true; bool all_threads_done = true; if (group->started) { for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; signal(&info->available.semaphore); } f64 start = 0; if (timeout_milliseconds > 0) { start = GetUnixTimestamp(); } s64 remaining_timeout_ms = (s64)timeout_milliseconds; for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; if (remaining_timeout_ms > 0) { s64 time_elapsed_ms = (s64)((GetUnixTimestamp() - start) * 1000.0); remaining_timeout_ms = (timeout_milliseconds - time_elapsed_ms); if (remaining_timeout_ms < 0) remaining_timeout_ms = 0; } bool is_done = thread_is_done(&info->thread, (s32)remaining_timeout_ms); if (!is_done) all_threads_done = false; } } if (!all_threads_done) return false; for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; thread_deinit(&info->thread, false); destroy(&info->available); destroy(&info->completed); array_free(info->work_steal_indices); } array_free(group->worker_info); return true; } // Should have a shutdown_and_reset option too (see how I did it in prototyping-main) void add_work (Thread_Group* group, void* work) { Assert(group->worker_info.count > 0); push_allocator(group->allocator); // Make a work entry, a linked list node that lets us queue and unqueue Work_Entry* entry = New(); entry->work = work; entry->issue_time = GetUnixTimestamp(); // Choose which thread will run this work. s32 thread_index = group->next_worker_index; group->next_worker_index += 1; if (group->next_worker_index >= group->worker_info.count) { group->next_worker_index = 0; } entry->work_list_index = thread_index; // Add this node to the linked list of available work for that thread: Work_List* list = &group->worker_info[thread_index].available; add_work(list, entry); } ArrayView get_completed_work (Thread_Group* group) { Array results = Array(); results.allocator = temp(); push_allocator(group->allocator); // We iterate over every worker thread to see if anything has completed. // Note that if this Thread_Group is idle most of the time, and you call // get_completed_work once per frame, most of the time this loop will // just be waste. Hopefully it's not very much waste compared to everything else // your program is doing, but in the future we may add an early-out: // if all work was completed before, and we never added new work, obviously // we don't need to do anything, as there can't be any new work. for (s64 i = 0; i < group->worker_info.count; i += 1) { Worker_Info* info = &group->worker_info[i]; Work_List* list = &info->completed; Work_Entry* completed = nullptr; s32 new_count = 0; { lock(&list->mutex); new_count = list->count; completed = list->first; if (list->first) { list->first = nullptr; list->last = nullptr; list->count = 0; } unlock(&list->mutex); } if (!completed) continue; // #TODO: #Thread_Group #array_reserve - try to do this in two passes: // Note that we are maybe adding small numbers of results over a larger // number of cores. Really, if we want to be efficient here, we can build // a larger linked list out of the mini-lists we gather, and accumulate // the counts, then do the reserve all in one batch when we are done // looking at the threads. For simplicity this has not yet been done, // but it may not be much more complicated, actually. array_reserve(results, results.count + new_count); s64 old_count = results.count; while (completed) { array_add(results, completed->work); Work_Entry* next = completed->next; internal_free(completed); completed = next; } Assert(results.count == old_count + new_count); } return ArrayView(results); }