- Introduction
- Rules and Restrictions
- System.Classes.TThread
- System.Threading.pas
- TInterlocked class
- Freeze when use TThread.Synchronize with TParallel or TTask.WaitForAll
- Using TThread.Synchronize with TTask.WaitForAll
- Passing dynamic resources to parallel tasks
- Running a number of tasks in parallel with limited resources
- Collect exceptions raised in threaded tasks
- Information from Future
- Cancel running threaded task if exception raised
- Remove completed ITask instance reference from task list
- Reference
Introduction
A classical approach of implementing multi-threading in application is using TThread.
New Parallel Programming Library has introduced In recent version of RAD Studio with TParallel.For and TTask classes.
Rules and Restrictions
Keep in mind of the following rules and restrictions when programming multithreading application:
- Avoid sharing resources (variables or objects) among thread that may change in threading operation. It will cause unexpected errors. Use TInterlocked or
TThread.Synchronize
when necessary. - VCL or FMX libraries are not thread safe. Most GUI updates performed by thread shall invoke
TThread.Synchronize
orTThread.Queue
.
System.Classes.TThread
In most situation, operation that want to perform in thread shall define in TThread
descendant and override TThread.Execute
method.
TThread.CreateAnonymousThread
TThread.CreateAnonymousThread
is a class method allow the creation of simple task embed in an anonymous method to run in thread without the hazard to define a TThread
descendant.
TThread.ProcessorCount
TThread.ProcessorCount
is a property that return the number of virtual CPU cores of the runtime process in operating system. It may serves as a base measurement for application to determine total number of simultaneous running threads in a process.
TThread.Synchronize
TThread.Synchronize
execute codes in main thread if thread safe manner is a concern. TThread.Synchronize
is blocked until finish execution in the thread.
TThread.Queue
TThread.Queue
works similar to TThread.Synchronize
in thread safe manner with blocking execution in the executing thread.
System.Threading.pas
Parallel Programming Library
introduced in RAD Studio
are defined in unit System.Threading.pas
.
TParallel
TParallel.&For
method allow us to run a thread method in loop manner by a range of low and high bound.
TTask
In addition to TParallel
, class TTask
can be invoke for a more diversified job. One TTask.Run
for one job to run in parallel. System shall take care of resource allocation when a significant number of TTask.Run
was invoked.
Each TTask
job return a ITask
reference. If sequence of executed task is important for later operation, use TTask.WaitForAll
or TTask.WaitForAny
to check the task status first.
TThreadPool
The number of executing thread tasks is determined by available number of virtual CPU core (TThread.ProcessorCount
). The behaviour may alter by introducing a new TThreadPool
instance with new MaxWorkerThreads
and MinWorkerThreads
value to TTask
or TParallel
methods.
By default, MaxWorkerThreads
and MinWorkerThreads
has these value:
FMinLimitWorkerThreadCount := TThread.ProcessorCount;
FMaxLimitWorkerThreadCount := TThread.ProcessorCount * MaxThreadsPerCPU;
Use TThreadPool.SetMaxWorkerThreads
and TThread.SetMinWorkerThreads
method to adjust both worker values. TThreadPool.SetMaxWorkerThreads
shall invoke prior to TThreadPool.SetMinWorkerThreads
to avoid the restriction enforced in the method:
var Pool: TThreadPool;
begin
Pool := TThreadPool.Create;
Pool.SetMaxWorkerThreads(500);
Pool.SetMinWorkerThreads(200);
...
end;
TInterlocked class
TInterlocked
implements various common atomic opererations for the purpose of ensuring “thread” or “multi-core” safety when modifying variables that could be accessed from multiple threads simultaneously. TheTInterlocked
class is not intended to be instantiated nor derived from. All the methods are “class static” and are merely defined in a class as a way to group their like-functionality
TInterlocked
provide some class methods to let user change variable of simple data type (e.g.: Integer
or Int64
) in a thread with thread-safe manner:
var iCount: Integer;
begin
iCount := 0;
TParallel.&For(1, 10,
procedure (Current: Integer)
begin
TInterlocked.Add(iCount, Current);
end
);
end;
The above code return iCount
accumulated in threads with thread-safe manner. Each TInterlocked
invokes ensure ONLY ONE thread task access variable iCount
.
Freeze when use TThread.Synchronize
with TParallel
or TTask.WaitForAll
It is a common practice to update GUI control from a running thread to update status periodically using TThread.Synchronize
method when the GUI controls are not thread-safe (e.g.: VCL or FMX controls).
Both TParallel
and TTask.WaitForAll
are blocked and wait for a list of tasks to finish, invoke TThread.Synchronize
that blocked natively in thread will make the process freeze forever. For example:
TParallel.&For(1, 1,
procedure (Current: Integer)
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := Current.ToString;
end
);
end
);
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
TTask.WaitForAll(TaskList.ToArray);
...
end;
Use TThread.Queue
instead to avoid the blocking:
TParallel.&For(1, 1,
procedure (Current: Integer)
begin
TThread.Queue(nil,
procedure
begin
Application.MainForm.Caption := Current.ToString;
end
);
end
);
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Queue(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
TTask.WaitForAll(TaskList.ToArray);
...
end;
The following answer explain the mechanism works behind that cause the frozen:
When you call
TThread.Synchronize
the thread and method pointer are added to a globalSyncList: TList
inClasses.pas.
In the main exe’sTApplication.Idle
routine callsCheckSynchronize
, which checks theSyncList
, … End result, your synchronized methods are never called.
Using TThread.Synchronize
with TTask.WaitForAll
If using blocking TThread.Synchronize
is necessary in thread (e.g.: Waiting response from end user), using TTask.WaitForAll
will freeze the application. Consider using CheckSynchronize()
in a timeout TTask.WaitForAll
loop to process TThread.Synchronize
request:
var TaskList: TList<ITask>;
i: Integer;
T: ITask;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 10 do begin
T := TTask.Run(
procedure
begin
TThread.Synchronize(nil,
procedure
begin
Application.MainForm.Caption := GetTickCount.ToString;
end
);
end
);
TaskList.Add(T);
end;
while not TTask.WaitForAll(TaskList.ToArray, 500(* TimeOut *)) do begin
// process any pending TThread.Synchronize() and TThread.Queue() requests
CheckSynchronize(0);
// process any pending UI paint requests, but not other messages
Application.MainForm.Update;
// or make it more responsive
Application.ProcessMessages;
end;
...
end;
Passing dynamic resources to parallel tasks
Dynamic resources refer to variables, class instances, records or other means that are to be decided at runtime.
Due to the class design of TParallel.&For
and TTask.Run
. It is almost impossible to pass complex resources to the task for parallel execution. TParallel.&For
shed light on this problem by an Integer
index the task defined in TProc<Integer>
. However, this is not enough for complex problem domain that are difficult to decide the lower and upper bounds. For example, execute task in parallel for each rows in a uni-directional TDataSet with unknown record count:
var D: TDataSet; // D is uni-directional dataset
begin
TParallel.&For(1, D.RecordCount,
procedure (Index: Integer)
begin
D.RecNo := Index;
(* Perform some task *)
end
);
end;
The above code is problematic to work in TParallel.&For
with two problems:
- Uni-directional dataset fail with
D.RecordCount
. - Changing
D.RecNo
in thread will cause conflict and confuse other thread’s execution.
A simple solution is introduced a queue (or more precisely, a thread-safe queue, e.g.: TThreadedQueue<T>
) to enqueue resources required in task and dequeue the resource from task:
var D: TDataSet; // D is uni-directional dataset
Q: TThreadedQueue<TData>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount (* Init queue size *));
TaskList := TList<ITask>.Create;
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
begin
A := Q.PopItem;
(* Perform some task on A *)
...
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
Tips
TThreadedQueue<T>.PopItem
is blocked when the queue is empty. This behaviour frees our worry whenPopItem
from the queue in a thread. The thread will be blocked till item is available from the queue.
Running a number of tasks in parallel with limited resources
Some resources are limited or expensive to define in runtime. For example, database connections or remote connections.
Imagine there are a large number of tasks that plan to work in parallel and each task requires an independent resource to work with (e.g.: database connection). One may code in this way:
var D: TDataSet; // D is uni-directional dataset
Q: TThreadedQueue<TData>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount);
TaskList := TList<ITask>.Create;
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
C: TDatabaseConnection;
begin
C := TDatabaseConnection.Create(...);
A := Q.PopItem;
C.Execute(A); (* Perform some task on A *)
C.Free;
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
The code attempt to create an equal numbers of database connection dataset record. It is expensive and impractical to design in this approach.
There is a solution for the problem. Since tasks running in parallel is limited by TThreadPool.MinWorkerThreads
, we can define a resource pool that is large enough and consume in round-robin manner.
For example, there are 100 tasks to be executed in parallel but at any one time no more than 4 tasks are executing due to limited CPU cores. We may define 8 or more resources in a pool and each task will pick a resource for execution:
Task 1 use Resource 1
Task 2 use Resource 2
Task 3 use Resource 3
Task 4 use Resource 4
Task 5 use Resource 5
Task 6 use Resource 6
Task 7 use Resource 7
Task 8 use Resource 8
Task 9 use Resource 1
Task 10 use Resource 2
...
The scenario assumes these tasks run smoothly in sequence ideally. However, it is not the case in real runtime environment. The operating system cannot guarantee threaded tasks run or finish in the order it queue.
Let’s try to design a simple workflow:
- Define a pool work like queue to hold resources
- Enqueue resources to the queue pool
- For each running task, dequeue a resource for consumption
- For each running task, enqueue the resource back to queue pool for next consumption
var D: TDataSet; // D is uni-directional dataset
Q: TThreadedQueue<TData>;
Pool: TThreadedQueue<TDatabaseConnection>;
T: ITask;
TaskList: TList<ITask>;
Data: TData;
i: Integer;
begin
Q := TThreadedQueue<TData>.Create(TThread.ProcessorCount * 2);
TaskList := TList<ITask>.Create;
Pool := TThreadedQueue<TDatabaseConnection>.Create(TThread.ProcessorCount);
for i := 1 to TThread.ProcessorCount do
Pool.PushItem(TDatabaseConnection.Create());
D := Create_Uni_Directional_DataSet;
D.Open;
while not D.Eof do begin
Data := CreateDataFromDataSet(D);
Q.PushItem(Data);
T := TTask.Run(
procedure
var A: TData;
C: TDatabaseConnection;
begin
// Get a resource from pool
C := Pool.PopItem;
// Perform job using the resource
A := Q.PopupItem;
C.Execute(A); (* Perform some task on A *)
// Finish. Push the resource back to pool
Pool.PushItem(C);
end
);
TaskList.Add(T);
D.Next;
end;
...
end;
This design shall works as long as at any single time there is resource available in pool for each running task.
Collect exceptions raised in threaded tasks
If exception happens in threaded tasks:
var i: Integer;
TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 50 do begin
TaskList.Add(
TTask.Run(
procedure
begin
if Random(10) mod 5 = 0 then
raise Exception.Create('Error Message');
end
)
);
end;
TTask.WaitForAll(TaskList.ToArray);
TaskList.Free;
end;
A message dialog prompt a message
one or more errors occurred
without much detail information.
The exception raised is an instance of class EAggregateException
. The exception is easy to capture with a simple try..except..end
construct:
var X: Exception;
i: Integer;
TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
try
for i := 1 to 50 do begin
TaskList.Add(
TTask.Run(
procedure
begin
if Random(10) mod 5 = 0 then
raise Exception.Create('Error Message');
end
)
);
end;
try
TTask.WaitForAll(TaskList.ToArray);
except
on E: EAggregateException do begin
for X in E do begin
OutputDebugString(PChar(X.Message));
end;
end;
end;
finally
TaskList.Free;
end;
end;
There is a significant different between TParallel.&For
and TTask.Run
handing exception.
If exception happens in the middle of TParallel.&For
, it stop immediately without queuing more threaded task. it works in the manner is due to TParallel.&For
is blocked during execution.
TTask.Run
, on the other hand doesn’t block during execution. Exception occurs in a particular ITask
instance doesn’t stop other ITask
instance. The best spot to capture exceptions from ITask
reference is via TTask.WaitForAll(...)
;
Information from Future
TTask.Future<T>
defines a threaded task that return a generic IFuture<T>
reference. Once the threaded task complete execution, it’s value return via IFuture<T>.Value
is ready for reference:
var Msg: IFuture<string>;
begin
Msg := TTask.Future<string>(
function: string
begin
Sleep(2000);
Result := 'Message from future';
end
);
ShowMessage(Msg.Value);
end;
In the example, ShowMessage(Msg.Value)
is blocked until Msg
task complete it’s execution.
Cancel running threaded task if exception raised
To cancel running threaded task if exception raised is applicable to TTask.Run
usage only. It doesn’t apply to TParallel.&For
which is blocked during execution.
TTask.Run
is unblocked during execution. It is not easy to cancel other running ITask
instance if exception happens in any of the ITask
instance. The solution I can think of so far is:
- Keep all
ITask
instances reference to a list - If exception happen to a particular
ITask
execution, notify for a exhaustive checking for ofITask
instances in the list. - For each
ITask
instance, cancelITask
if still running
Remove completed ITask instance reference from task list
A straight solution is perform WaitForAll
for all tasks and remove from task list later:
var TaskList: TList<ITask>;
begin
TaskList := TList<ITask>.Create;
for i := 1 to 1000 do begin
TaskList.Add(
TTask.Run(
procedure
begin
...
end
)
);
end;
TTask.WaitForAll(TaskList.ToArray);
TaskList.Clear;
...
TaskList.Free;
end
However, if the ITask instance reference grow (e.g.: 50,000,000
to be stored in TaskList), system shall raise Out of Memory
exception.
A solution is introducing a cleaning of ITask
reference in batch (e.g.: Perform WaitForAll
for every 100,000
ITask
reference).
First define a method in TThreadedQueue<TArray<ITask>>
to perform threaded WaitForAll
for tasks:
type
TThreadedTasksQueue_Helper = class helper for TThreadedQueue<TArray<ITask>>
public
function WaitForAll(const aTasks: TArray<ITask>): IFuture<Integer>;
end;
function TThreadedTasksQueue_Helper.WaitForAll(const aTasks: TArray<ITask>): IFuture<Integer>;
begin
PushItem(aTasks);
Result := TTask.Future<Integer>(
function: Integer
var A: TArray<ITask>;
begin
A := PopItem;
TTask.WaitForAll(A);
Result := Length(A);
end
);
end;
Each WaitForAll
batch of 100,000
tasks (IFuture<Integer>
) shall keep in a list (Batches: TList<IFuture<Integer>>
) first. Remember there should have balance tasks in TaskList not able to group in 100,000
per batch after the lengthy for
loop.
Finally, query the each batch’s value (Batch.Value
) to make sure all tasks ended properly.
var i: Integer;
Q: TThreadedQueue<Integer>;
Batches: TList<IFuture<Integer>>;
Batch: IFuture<Integer>;
WaitForQ: TThreadedQueue<TArray<ITask>>;
TaskList: TList<ITask>;
begin
Q := TThreadedQueue<Integer>.Create;
WaitForQ := TThreadedQueue<TArray<ITask>>.Create;
Batches := TList<IFuture<Integer>>.Create;
TaskList := TList<ITask>.Create;
for i := 1 to 50000000 do begin
Q.PushItem(i);
TaskList.Add(
TTask.Run(
procedure
begin
Q.PopItem;
end
)
);
if i mod 100000 = 0 then begin
Batches.Add(WaitForQ.WaitForAll(TaskList.ToArray));
TaskList.Clear;
end;
end;
Batches.Add(WaitForQ.WaitForAll(TaskList.ToArray));
TaskList.Clear;
for Batch in Batches do Batch.Value;
Q.Free;
WaitForQ.Free;
TaskList.Free;
Batches.Free;
end;
No comments:
Post a Comment