Originally Posted by
slacky
I would only be glad to receive a PR with some more proper implementation of a threading wrappers (or whatever else), I haven't had to use Windows' own methods for creating and ending threads before as most languages I use has their own implementations for just this, result is that I was not aware of the unsafe nature of TerminateThread, but it makes perfect sense that it will forcefully terminate the thread and this alone can have ugly side-effects. @
Brandon; @
the bank;
Edit: So I have updated the timer to not use TerminateThread and instead just use a "Enabled" flag, and rely on ending the thread by simply returning and WaitForSingleObject.
For regular threads I guess it's up to the user to manage it.
Perfect. That is how it should be done. Let the user handle the termination with booleans and locks OR even better with pools and queues. If for some reason they absolutely need to kill the thread because they've lost control of it, then terminate should still be an option.. albeit a last resort.
Good update!
Idea:
Something like the below is what users will have to implement themselves or you can include such functionality.. but the user will still have to make sure that they aren't running an infinite loop on the thread.. You can stop them by throwing an exception when the thread won't terminate gracefully:
Pascal Code:
$i atomic
$i windows
$i threads
type Task = record
stopped: boolean;
funcPtr: procedure(t: Task);
end;
function Task.Create(funcToExecute: Procedure(t: Task)): Task;
begin
self.funcPtr := funcToExecute;
result := self;
end;
procedure Task.Destroy();
begin
self.funcPtr := nil;
end;
procedure Task.run();
begin
if (self.funcPtr) then
begin
self.funcPtr(self);
end;
end;
procedure Task.stop();
begin
self.stopped := true;
end;
function Task.isStopped(): Boolean;
begin
result := self.stopped;
end;
type ThreadPool = record
stop: AtomicBool;
lock: Mutex;
tasks: Queue;
tasksInProgress: Queue;
cpu_cores: Integer;
workers: array of Thread;
end;
function ThreadPool.Create(): ThreadPool;
var
sysInfo: SYSTEM_INFO;
i: Integer;
threadID: DWORD;
begin
windows.GetSystemInfo(@sysInfo);
self.cpu_cores = sysInfo.dwNumberOfProcessors;
self.stop.setValue(false);
self.lock = windows.CreateMutex(nil, false, nil);
SetLength(self.workers, self.cpu_cores);
for i := 0 to high(self.workers) do
begin
self.workers[i] := windows.CreateThread(nil, 0, InternalThreadFunc, self, 0, @threadID);
end;
result := self;
end;
procedure ThreadPool.Destroy()
var
i, exitcode: Integer;
begin
for i := 0 to high(self.tasksInProgress) do
begin
self.tasksInProgress[i].stop();
end;
self.stop.setValue(true);
windows.WaitForMultipleObjects(self.max_cpu_cores, workers, true, 5 * 1000);
for i := 0 to high(self.workers) do
begin
windows.GetExitCodeThread(self.workers[i], @exitcode);
if (exitcode == STILL_ACTIVE) then
begin
//raiseException('Cannot Terminate Thread.. Gracefully.. Stop running infinite loops');
windows.TerminateThread(self.workers[i]);
end;
windows.CloseHandle(self.workers[i]);
end;
SetLength(self.workers, 0);
SetLength(self.tasks, 0);
SetLength(self.tasksInProgress, 0);
end;
procedure ThreadPool.AddTask(funcToExecute: procedure);
var
locked: DWORD;
begin
locked := windows.WaitForSingleObject(self.lock, INFINITE);
if (locked == WAIT_OBJECT_0) then
begin
self.tasks.enqueue(Task.create(funcToExecute));
windows.ReleaseMutex(self.lock);
end;
end;
procedure ThreadPool.ExecuteTask();
var
locked: DWORD;
tsk: Task;
begin
while (not self.stop.getValue()) do
begin
locked := windows.WaitForSingleObject(self.lock, INFINITE);
if (locked == WAIT_OBJECT_0) then
begin
if (self.tasks.count()) then
begin
tsk := self.tasks.dequeue();
self.tasksInProgress.enqueue(t);
end;
windows.ReleaseMutex(self.lock);
tsk.run();
locked := windows.WaitForSingleObject(self.lock, INFINITE);
if (locked == WAIT_OBJECT_0) then
begin
tsk := self.tasksInProgress.dequeue();
tsk.stop();
windows.ReleaseMutex(self.lock);
end;
end;
end;
end;
function __InternalThreadFunc(ptr: Pointer): DWORD; __stdcall;
begin
ThreadPool(ptr)^.ExecuteTask();
result := 0;
end;
C++ Code:
#include <atomic>
#include <windows.h>
class Task
{
private:
bool stopped;
void(*funcPtr)(Task* t);
public:
Task(void(*funcToExecute)(Task* t))
{
this->funcPtr = funcToExecute;
}
~Task()
{
this->stopped = true;
funcPtr = nullptr;
}
void run()
{
if (funcPtr)
{
funcPtr(this);
}
}
void stop()
{
this->stopped = true;
}
bool isStopped()
{
return this->stopped;
}
};
class ThreadPool
{
private:
atomic_bool stop = false;
HANDLE lock;
Queue<Task*> tasks = {};
Queue<Task*> tasksInProgress = {};
Handle* workers;
int max_cpu_cores;
static DWORD __stdcall InternalThreadFunc(void* ptr)
{
ThreadPool* pool = reinterpret_cast<ThreadPool*>(ptr);
pool->ExecuteTask();
return 0;
}
void ExecuteTask()
{
while(!this->stop) //Atomic :)
{
//Lock first before accessing tasks.
DWORD locked = WaitForSingleObject(this->lock, INFINITE);
if (locked == WAIT_OBJECT_0)
{
Task *t = nullptr;
if (tasks.count()) //if there is a task.
{
t = tasks.dequeue(); //get the task
this->tasksInProgress.enqueue(t); //Task is now in progress.
}
ReleaseMutex(this->lock); //Unlock.
t->run(); //run the task.
DWORD locked = WaitForSingleObject(this->lock, INFINITE);
if (locked == WAIT_OBJECT_0)
{
t = this->tasksInProgress.dequeue();
ReleaseMutex(this->lock); //Unlock.
t->stop();
delete t; //Free the task.
}
}
}
}
public:
ThreadPool()
{
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
this->max_cpu_cores = sysinfo.dwNumberOfProcessors; //Rule of thumb: Have as many threads as we have cores. Either 4 or 8.
//Create a lock for safety.
this->lock = CreateMutex(nullptr, false, nullptr);
this->workers = new HANDLE[this->max_cpu_cores]; //Allocate space for worker thread handles.
//Thread the worker threads.. they will each wait on tasks..
for (int i = 0; i < this->max_cpu_cores; ++i)
{
DWORD ThreadID = 0;
workers[i] = CreateThread(nullptr, 0, InternalThreadFunc, reinterpret_cast<void*>(this), 0, &ThreadID);
}
}
~ThreadPool()
{
for (Task* t : this->tasksInProgress)
{
t->stop(); //Ask the tasks to quit/break their loops or hurry up and finish..
}
this->stop = true; //Ask all threads to stop.. We've already asked the tasks to hurry up..
WaitForMultipleObjects(this->max_cpy_cores, workers, TRUE, 5 * 1000); //Wait 5 seconds for ALL threads and tasks to gracefully terminate..
//Check which threads are still running and forcefully stop them if necessary.. Otherwise just gracefully close the thread handles.
for (int i = 0; i < this->max_cpu_cores; ++i)
{
DWORD ExitCode = 0;
GetExitCodeThread(this->workers[i], &ExitCode);
if (ExitCode == STILL_ACTIVE)
{
TerminateThread(this->workers[i]); //Bad idea but we don't have many options here.. We can try again and ask it to stop again or wait longer as well..
}
CloseHandle(this->workers[i]); //Close the handle to the threads.
}
//De-allocate the space for the threads.
delete[] workers;
workers = nullptr;
//Delete all incomplete tasks..
for (Task* task : this->queue)
{
delete task;
}
//Delete all tasks in progress..
for (Task* task : this->tasksInProgress)
{
delete task;
}
this->queue.drain();
this->tasksInProgress.drain();
}
void AddTask(void(*SomeTaskFunction)())
{
//Aquire the lock.
DWORD locked = WaitForSingleObject(this->lock, INFINITE);
if (locked == WAIT_OBJECT_0)
{
tasks.enqueue(new Task(SomeTaskFunction)); //Create a new tasks and enqueue it. The workers will execute it whenever they can.
ReleaseMutex(this->lock); //Release the lock.
}
}
};
Usage:
Pascal Code:
procedure Foo(t: Task);
begin
while(not t.isStopped());
begin
//Run some intense code here..
end;
end;
procedure Bar(t: Task);
begin
while(not t.isStopped());
begin
//Run some intense code here..
end;
end;
var
pool: ThreadPool;
begin
pool.addTask(@Foo);
pool.addTask(@Bar);
end;
C++ Code:
void Foo(Task* t)
{
while(!t->isStopped())
{
//Run some intense code here..
}
}
void Bar(Task* t)
{
while(!t->isStopped())
{
//Run some intense code here..
}
}
int main()
{
ThreadPool pool;
pool.AddTask(&Foo);
pool.AddTask(&Bar);
}