Introduction
A classical approach of implementing multi-threading in application is using TThread.
New Parallel Programming Library has introduced In recent version of RAD Studio with TParallel.For and TTask classes.
Rules and Restrictions
Keep in mind of the following rules and restrictions when programming multithreading application:
- Avoid sharing resources (variables or objects) among thread that may change in threading operation. It will cause unexpected errors. Use TInterlocked or
TThread.Synchronize when necessary.
- VCL or FMX libraries are not thread safe. Most GUI updates performed by thread shall invoke
TThread.Synchronize or TThread.Queue.
System.Classes.TThread
In most situation, operation that want to perform in thread shall define in TThread descendant and override TThread.Execute method.
TThread.CreateAnonymousThread
TThread.CreateAnonymousThread is a class method allow the creation of simple task embed in an anonymous method to run in thread without the hazard to define a TThread descendant.
TThread.ProcessorCount
TThread.ProcessorCount is a property that return the number of virtual CPU cores of the runtime process in operating system. It may serves as a base measurement for application to determine total number of simultaneous running threads in a process.
TThread.Synchronize
TThread.Synchronize execute codes in main thread if thread safe manner is a concern. TThread.Synchronize is blocked until finish execution in the thread.
TThread.Queue
TThread.Queue works similar to TThread.Synchronize in thread safe manner with blocking execution in the executing thread.
System.Threading.pas
Parallel Programming Library introduced in RAD Studio are defined in unit System.Threading.pas.
TParallel
TParallel.&For method allow us to run a thread method in loop manner by a range of low and high bound.
TTask
In addition to TParallel, class TTask can be invoke for a more diversified job. One TTask.Run for one job to run in parallel. System shall take care of resource allocation when a significant number of TTask.Run was invoked.
Each TTask job return a ITask reference. If sequence of executed task is important for later operation, use TTask.WaitForAll or TTask.WaitForAny to check the task status first.
TThreadPool
The number of executing thread tasks is determined by available number of virtual CPU core (TThread.ProcessorCount). The behaviour may alter by introducing a new TThreadPool instance with new MaxWorkerThreads and MinWorkerThreads value to TTask or TParallel methods.
By default, MaxWorkerThreads and MinWorkerThreads has these value:
FMinLimitWorkerThreadCount := TThread.ProcessorCount;
FMaxLimitWorkerThreadCount := TThread.ProcessorCount * MaxThreadsPerCPU;
Use TThreadPool.SetMaxWorkerThreads and TThread.SetMinWorkerThreads method to adjust both worker values. TThreadPool.SetMaxWorkerThreads shall invoke prior to TThreadPool.SetMinWorkerThreads to avoid the restriction enforced in the method:
var Pool: TThreadPool;
begin
Pool := TThreadPool.Create;
Pool.SetMaxWorkerThreads(500);
Pool.SetMinWorkerThreads(200);
...
end;
TInterlocked class
TInterlocked implements various common atomic opererations for the purpose of ensuring “thread” or “multi-core” safety when modifying variables that could be accessed from multiple threads simultaneously. The TInterlocked class is not intended to be instantiated nor derived from. All the methods are “class static” and are merely defined in a class as a way to group their like-functionality
TInterlocked provide some class methods to let user change variable of simple data type (e.g.: Integer or Int64) in a thread with thread-safe manner:
var iCount: Integer;
begin
iCount := 0;
TParallel.&For(1, 10,
procedure (Current: Integer)
begin
TInterlocked.Add(iCount, Current);
end
);
end;
The above code return iCount accumulated in threads with thread-safe manner. Each TInterlocked invokes ensure ONLY ONE thread task access variable iCount.
Freeze when use TThread.Synchronize with TParallel or TTask.WaitForAll
It is a common practice to update GUI control from a running thread to update status periodically using TThread.Synchronize method when the GUI controls are not thread-safe (e.g.: VCL or FMX controls).
Both TParallel and TTask.WaitForAll are blocked and wait for a list of tasks to finish, invoke TThread.Synchronize that blocked natively in thread will make the process freeze forever. For example:
TParallel.&For(1, 1,
procedure (Current: Integer)
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := Current.ToString;
end
);
end
);
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
TTask.WaitForAll(TaskList.ToArray);
...
end;
Use TThread.Queue instead to avoid the blocking:
TParallel.&For(1, 1,
procedure (Current: Integer)
begin
TThread.Queue(nil,
procedure
begin
Application.MainForm.Caption := Current.ToString;
end
);
end
);
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Queue(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
TTask.WaitForAll(TaskList.ToArray);
...
end;
The following answer explain the mechanism works behind that cause the frozen:
When you call TThread.Synchronize the thread and method pointer are added to a global SyncList: TList in Classes.pas. In the main exe’s TApplication.Idle routine calls CheckSynchronize, which checks the SyncList, … End result, your synchronized methods are never called.
Using TThread.Synchronize with TTask.WaitForAll
If using blocking TThread.Synchronize is necessary in thread (e.g.: Waiting response from end user), using TTask.WaitForAll will freeze the application. Consider using CheckSynchronize() in a timeout TTask.WaitForAll loop to process TThread.Synchronize request:
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
while not TTask.WaitForAll(TaskList.ToArray, 500) do begin
CheckSynchronize(0);
Application.MainForm.Update;
Application.ProcessMessages;
end;
...
end;
Passing dynamic resources to parallel tasks
Dynamic resources refer to variables, class instances, records or other means that are to be decided at runtime.
Due to the class design of TParallel.&For and TTask.Run. It is almost impossible to pass complex resources to the task for parallel execution. TParallel.&For shed light on this problem by an Integer index the task defined in TProc<Integer>. However, this is not enough for complex problem domain that are difficult to decide the lower and upper bounds. For example, execute task in parallel for each rows in a uni-directional TDataSet with unknown record count:
var D: TDataSet;
begin
TParallel.&For(1, D.RecordCount,
procedure (Index: Integer)
begin
D.RecNo := Index;
end
);
end;
The above code is problematic to work in TParallel.&For with two problems:
- Uni-directional dataset fail with
D.RecordCount.
- Changing
D.RecNo in thread will cause conflict and confuse other thread’s execution.
A simple solution is introduced a queue (or more precisely, a thread-safe queue, e.g.: TThreadedQueue<T>) to enqueue resources required in task and dequeue the resource from task:
var D: TDataSet;
Q: TThreadedQueue<TData>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount );
TaskList := TList<ITask>.Create;
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
begin
A := Q.PopItem;
...
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
Tips
TThreadedQueue<T>.PopItem is blocked when the queue is empty. This behaviour frees our worry when PopItem from the queue in a thread. The thread will be blocked till item is available from the queue.
Running a number of tasks in parallel with limited resources
Some resources are limited or expensive to define in runtime. For example, database connections or remote connections.
Imagine there are a large number of tasks that plan to work in parallel and each task requires an independent resource to work with (e.g.: database connection). One may code in this way:
var D: TDataSet;
Q: TThreadedQueue<TData>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount);
TaskList := TList<ITask>.Create;
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
C: TDatabaseConnection;
begin
C := TDatabaseConnection.Create(...);
A := Q.PopItem;
C.Execute(A);
C.Free;
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
The code attempt to create an equal numbers of database connection dataset record. It is expensive and impractical to design in this approach.
There is a solution for the problem. Since tasks running in parallel is limited by TThreadPool.MinWorkerThreads, we can define a resource pool that is large enough and consume in round-robin manner.
For example, there are 100 tasks to be executed in parallel but at any one time no more than 4 tasks are executing due to limited CPU cores. We may define 8 or more resources in a pool and each task will pick a resource for execution:
Task 1 use Resource 1
Task 2 use Resource 2
Task 3 use Resource 3
Task 4 use Resource 4
Task 5 use Resource 5
Task 6 use Resource 6
Task 7 use Resource 7
Task 8 use Resource 8
Task 9 use Resource 1
Task 10 use Resource 2
...
The scenario assumes these tasks run smoothly in sequence ideally. However, it is not the case in real runtime environment. The operating system cannot guarantee threaded tasks run or finish in the order it queue.
Let’s try to design a simple workflow:
- Define a pool work like queue to hold resources
- Enqueue resources to the queue pool
- For each running task, dequeue a resource for consumption
- For each running task, enqueue the resource back to queue pool for next consumption
var D: TDataSet;
Q: TThreadedQueue<TData>;
Pool: TThreadedQueue<TDatabaseConnection>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
i: Integer;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount * 2);
TaskList := TList<ITask>.Create;
Pool := TThreadedQueue<TDatabaseConnection>.Create(TThread.ProcessorCount);
for i := 1 to TThread.ProcessorCount do
Pool.PushItem(TDatabaseConnection.Create());
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
C: TDatabaseConnection;
begin
C := Pool.PopItem;
A := Q.PopupItem;
C.Execute(A);
Pool.PushItem(C);
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
This design shall works as long as at any single time there is resource available in pool for each running task.
Collect exceptions raised in threaded tasks
If exception happens in threaded tasks:
var i: Integer;
TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 50 do begin
TaskList.Add(
TTask.Run(
procedure
begin
if Random(10) mod 5 = 0 then
raise Exception.Create('Error Message');
end
)
);
end;
TTask.WaitForAll(TaskList.ToArray);
TaskList.Free;
end;
A message dialog prompt a message
one or more errors occurred
without much detail information.
The exception raised is an instance of class EAggregateException. The exception is easy to capture with a simple try..except..endconstruct:
var X: Exception;
i: Integer;
TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
try
for i := 1 to 50 do begin
TaskList.Add(
TTask.Run(
procedure
begin
if Random(10) mod 5 = 0 then
raise Exception.Create('Error Message');
end
)
);
end;
try
TTask.WaitForAll(TaskList.ToArray);
except
on E: EAggregateException do begin
for X in E do begin
OutputDebugString(PChar(X.Message));
end;
end;
end;
finally
TaskList.Free;
end;
end;
There is a significant different between TParallel.&For and TTask.Run handing exception.
If exception happens in the middle of TParallel.&For, it stop immediately without queuing more threaded task. it works in the manner is due to TParallel.&For is blocked during execution.
TTask.Run, on the other hand doesn’t block during execution. Exception occurs in a particular ITask instance doesn’t stop other ITask instance. The best spot to capture exceptions from ITask reference is via TTask.WaitForAll(...);
TTask.Future<T> defines a threaded task that return a generic IFuture<T> reference. Once the threaded task complete execution, it’s value return via IFuture<T>.Value is ready for reference:
var Msg: IFuture<string>;
begin
Msg := TTask.Future<string>(
function: string
begin
Sleep(2000);
Result := 'Message from future';
end
);
ShowMessage(Msg.Value);
end;
In the example, ShowMessage(Msg.Value) is blocked until Msg task complete it’s execution.
Cancel running threaded task if exception raised
To cancel running threaded task if exception raised is applicable to TTask.Run usage only. It doesn’t apply to TParallel.&For which is blocked during execution.
TTask.Run is unblocked during execution. It is not easy to cancel other running ITask instance if exception happens in any of the ITask instance. The solution I can think of so far is:
- Keep all
ITask instances reference to a list
- If exception happen to a particular
ITask execution, notify for a exhaustive checking for of ITask instances in the list.
- For each
ITask instance, cancel ITask if still running
Remove completed ITask instance reference from task list
A straight solution is perform WaitForAll for all tasks and remove from task list later:
var TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 1000 do begin
TaskList.Add(
TTask.Run(
procedure
begin
...
end
)
);
end;
TTask.WaitForAll(TaskList.ToArray);
TaskList.Clear;
...
TaskList.Free;
end
However, if the ITask instance reference grow (e.g.: 50,000,000 to be stored in TaskList), system shall raise Out of Memory exception.
A solution is introducing a cleaning of ITask reference in batch (e.g.: Perform WaitForAll for every 100,000 ITask reference).
First define a method in TThreadedQueue<TArray<ITask>> to perform threaded WaitForAll for tasks:
type
TThreadedTasksQueue_Helper = class helper for TThreadedQueue<TArray<ITask>>
public
function WaitForAll(const aTasks: TArray<ITask>): IFuture<Integer>;
end;
function TThreadedTasksQueue_Helper.WaitForAll(const aTasks: TArray<ITask>): IFuture<Integer>;
begin
PushItem(aTasks);
Result := TTask.Future<Integer>(
function: Integer
var A: TArray<ITask>;
begin
A := PopItem;
TTask.WaitForAll(A);
Result := Length(A);
end
);
end;
Each WaitForAll batch of 100,000 tasks (IFuture<Integer>) shall keep in a list (Batches: TList<IFuture<Integer>>) first. Remember there should have balance tasks in TaskList not able to group in 100,000 per batch after the lengthy for loop.
Finally, query the each batch’s value (Batch.Value) to make sure all tasks ended properly.
var i: Integer;
Q: TThreadedQueue<Integer>;
Batches: TList<IFuture<Integer>>;
Batch: IFuture<Integer>;
WaitForQ: TThreadedQueue<TArray<ITask>>;
TaskList: TList<ITask>;
begin
Q := TThreadedQueue<Integer>.Create;
WaitForQ := TThreadedQueue<TArray<ITask>>.Create;
Batches := TList<IFuture<Integer>>.Create;
TaskList := TList<ITask>.Create;
for i := 1 to 50000000 do begin
Q.PushItem(i);
TaskList.Add(
TTask.Run(
procedure
begin
Q.PopItem;
end
)
);
if i mod 100000 = 0 then begin
Batches.Add(WaitForQ.WaitForAll(TaskList.ToArray));
TaskList.Clear;
end;
end;
Batches.Add(WaitForQ.WaitForAll(TaskList.ToArray));
TaskList.Clear;
for Batch in Batches do Batch.Value;
Q.Free;
WaitForQ.Free;
TaskList.Free;
Batches.Free;
end;
Reference
- stackoverflow.com: Synchronize() hangs up the thread
- stackoverflow.com: Delphi TTask WaitForAll vs. Synchronise
- Rob’s Technology Corner: PPL - TTask an example in how not to use.