Delphi线程池
unit?uThreadPool;
{???aPool.AddRequest(TMyRequest.Create(RequestParam1,?RequestParam2,?...));?}
interface
uses
??Windows,
??Classes;
//?是否記錄日志
//?{$DEFINE?NOLOGS}
type
??TCriticalSection?=?class(TObject)
??protected
????FSection:?TRTLCriticalSection;
??public
????constructor?Create;
????destructor?Destroy;?override;
????//?進(jìn)入臨界區(qū)
????procedure?Enter;
????//?離開臨界區(qū)
????procedure?Leave;
????//?嘗試進(jìn)入
????function?TryEnter:?Boolean;
??end;
type
??//?儲(chǔ)存請(qǐng)求數(shù)據(jù)的基本類
??TWorkItem?=?class(TObject)
??public
????//?是否有重復(fù)任務(wù)
????function?IsTheSame(DataObj:?TWorkItem):?Boolean;?virtual;
????//?如果?NOLOGS?被定義,則禁用。
????function?TextForLog:?string;?virtual;
??end;
type
??TThreadsPool?=?class;
??//線程狀態(tài)
??TThreadState?=?(tcsInitializing,?tcsWaiting,?tcsGetting,?tcsProcessing,
????tcsProcessed,?tcsTerminating,?tcsCheckingDown);
??//?工作線程僅用于線程池內(nèi),?不要直接創(chuàng)建并調(diào)用它。
??TProcessorThread?=?class(TThread)
??private
????//?創(chuàng)建線程時(shí)臨時(shí)的Event對(duì)象,?阻塞線程直到初始化完成
????hInitFinished:?THandle;
????//?初始化出錯(cuò)信息
????sInitError:?string;
????//?記錄日志
????procedure?WriteLog(const?Str:?string;?Level:?Integer?=?0);
??protected
????//?線程臨界區(qū)同步對(duì)像
????csProcessingDataObject:?TCriticalSection;
????//?平均處理時(shí)間
????FAverageProcessing:?Integer;
????//?等待請(qǐng)求的平均時(shí)間
????FAverageWaitingTime:?Integer;
????//?本線程實(shí)例的運(yùn)行狀態(tài)
????FCurState:?TThreadState;
????//?本線程實(shí)例所附屬的線程池
????FPool:?TThreadsPool;
????//?當(dāng)前處理的數(shù)據(jù)對(duì)像。
????FProcessingDataObject:?TWorkItem;
????//?線程停止?Event,?TProcessorThread.Terminate?中開綠燈
????hThreadTerminated:?THandle;
????uProcessingStart:?DWORD;
????//?開始等待的時(shí)間,?通過(guò)?GetTickCount?取得。
????uWaitingStart:?DWORD;
????//?計(jì)算平均工作時(shí)間
????function?AverageProcessingTime:?DWORD;
????//?計(jì)算平均等待時(shí)間
????function?AverageWaitingTime:?DWORD;
????procedure?Execute;?override;
????function?IamCurrentlyProcess(DataObj:?TWorkItem):?Boolean;
????//?轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
????function?InfoText:?string;
????//?線程是否長(zhǎng)時(shí)間處理同一個(gè)請(qǐng)求?(已死掉?)
????function?IsDead:?Boolean;
????//?線程是否已完成當(dāng)成任務(wù)
????function?isFinished:?Boolean;
????//?線程是否處于空閑狀態(tài)
????function?isIdle:?Boolean;
????//?平均值校正計(jì)算。
????function?NewAverage(OldAvg,?NewVal:?Integer):?Integer;
??public
????Tag:?Integer;
????constructor?Create(APool:?TThreadsPool);
????destructor?Destroy;?override;
????procedure?Terminate;
??end;
??//?線程初始化時(shí)觸發(fā)的事件
??TProcessorThreadInitializing?=?procedure(Sender:?TThreadsPool;?aThread:
????TProcessorThread)?of?object;
??//?線程結(jié)束時(shí)觸發(fā)的事件
??TProcessorThreadFinalizing?=?procedure(Sender:?TThreadsPool;?aThread:
????TProcessorThread)?of?object;
??//?線程處理請(qǐng)求時(shí)觸發(fā)的事件
??TProcessRequest?=?procedure(Sender:?TThreadsPool;?WorkItem:?TWorkItem;
????aThread:?TProcessorThread)?of?object;
??TEmptyKind?=?(
????ekQueueEmpty,?//任務(wù)被取空后
????ekProcessingFinished?//?最后一個(gè)任務(wù)處理完畢后
????);
??//?任務(wù)隊(duì)列空時(shí)觸發(fā)的事件
??TQueueEmpty?=?procedure(Sender:?TThreadsPool;?EmptyKind:?TEmptyKind)?of
????object;
??TThreadsPool?=?class(TComponent)
??private
????csQueueManagment:?TCriticalSection;
????csThreadManagment:?TCriticalSection;
????FProcessRequest:?TProcessRequest;
????FQueue:?TList;
????FQueueEmpty:?TQueueEmpty;
????//?線程超時(shí)閥值
????FThreadDeadTimeout:?DWORD;
????FThreadFinalizing:?TProcessorThreadFinalizing;
????FThreadInitializing:?TProcessorThreadInitializing;
????//?工作中的線程
????FThreads:?TList;
????//?執(zhí)行了?terminat?發(fā)送退出指令,?正在結(jié)束的線程.
????FThreadsKilling:?TList;
????//?最少,?最大線程數(shù)
????FThreadsMax:?Integer;
????//?最少,?最大線程數(shù)
????FThreadsMin:?Integer;
????//?池平均等待時(shí)間
????function?PoolAverageWaitingTime:?Integer;
????procedure?WriteLog(const?Str:?string;?Level:?Integer?=?0);
??protected
????FLastGetPoint:?Integer;
????//?Semaphore,?統(tǒng)計(jì)任務(wù)隊(duì)列
????hSemRequestCount:?THandle;
????//?Waitable?timer.?每30觸發(fā)一次的時(shí)間量同步
????hTimCheckPoolDown:?THandle;
????//?線程池停機(jī)(檢查并清除空閑線程和死線程)
????procedure?CheckPoolDown;
????//?清除死線程,并補(bǔ)充不足的工作線程
????procedure?CheckThreadsForGrow;
????procedure?DoProcessed;
????procedure?DoProcessRequest(aDataObj:?TWorkItem;?aThread:?TProcessorThread);
??????virtual;
????procedure?DoQueueEmpty(EmptyKind:?TEmptyKind);?virtual;
????procedure?DoThreadFinalizing(aThread:?TProcessorThread);?virtual;
????//?執(zhí)行事件
????procedure?DoThreadInitializing(aThread:?TProcessorThread);?virtual;
????//?釋放?FThreadsKilling?列表中的線程
????procedure?FreeFinishedThreads;
????//?申請(qǐng)任務(wù)
????procedure?GetRequest(out?Request:?TWorkItem);
????//?清除死線程
????procedure?KillDeadThreads;
??public
????constructor?Create(AOwner:?TComponent);?override;
????destructor?Destroy;?override;
????//?就進(jìn)行任務(wù)是否重復(fù)的檢查,?檢查發(fā)現(xiàn)重復(fù)就返回?False
????function?AddRequest(aDataObject:?TWorkItem;?CheckForDoubles:?Boolean?=
??????False):?Boolean;?overload;
????//?轉(zhuǎn)換枚舉類型的線程狀態(tài)為字串類型
????function?InfoText:?string;
??published
????//?線程處理任務(wù)時(shí)觸發(fā)的事件
????property?OnProcessRequest:?TProcessRequest?read?FProcessRequest?write
??????FProcessRequest;
????//?任務(wù)列表為空時(shí)解發(fā)的事件
????property?OnQueueEmpty:?TQueueEmpty?read?FQueueEmpty?write?FQueueEmpty;
????//?線程結(jié)束時(shí)觸發(fā)的事件
????property?OnThreadFinalizing:?TProcessorThreadFinalizing?read
??????FThreadFinalizing?write?FThreadFinalizing;
????//?線程初始化時(shí)觸發(fā)的事件
????property?OnThreadInitializing:?TProcessorThreadInitializing?read
??????FThreadInitializing?write?FThreadInitializing;
????//?線程超時(shí)值(毫秒),?如果處理超時(shí),將視為死線程
????property?ThreadDeadTimeout:?DWORD?read?FThreadDeadTimeout?write
??????FThreadDeadTimeout?default?0;
????//?最大線程數(shù)
????property?ThreadsMax:?Integer?read?FThreadsMax?write?FThreadsMax?default?1;
????//?最小線程數(shù)
????property?ThreadsMin:?Integer?read?FThreadsMin?write?FThreadsMin?default?0;
??end;
type
??//日志記志函數(shù)
??TLogWriteProc?=?procedure(
????const?Str:?string;?//日志
????LogID:?Integer?=?0;
????Level:?Integer?=?0?//Level?=?0?-?跟蹤信息,?10?-?致命錯(cuò)誤
????);
var
??WriteLog:?TLogWriteProc;?//?如果存在實(shí)例就寫日志
implementation
uses
??SysUtils;
//?儲(chǔ)存請(qǐng)求數(shù)據(jù)的基本類
{
**********?TWorkItem?**********
}
function?TWorkItem.IsTheSame(DataObj:?TWorkItem):?Boolean;
begin
??Result?:=?False;
end;?{?TWorkItem.IsTheSame?}
function?TWorkItem.TextForLog:?string;
begin
??Result?:=?'Request';
end;?{?TWorkItem.TextForLog?}
{
**********?TThreadsPool?**********
}
constructor?TThreadsPool.Create(AOwner:?TComponent);
var
??DueTo:?Int64;
begin
{$IFNDEF?NOLOGS}
??WriteLog('創(chuàng)建線程池',?5);
{$ENDIF}
??inherited;
??csQueueManagment?:=?TCriticalSection.Create;
??FQueue?:=?TList.Create;
??csThreadManagment?:=?TCriticalSection.Create;
??FThreads?:=?TList.Create;
??FThreadsKilling?:=?TList.Create;
??FThreadsMin?:=?0;
??FThreadsMax?:=?1;
??FThreadDeadTimeout?:=?0;
??FLastGetPoint?:=?0;
??//
??hSemRequestCount?:=?CreateSemaphore(nil,?0,?$7FFFFFFF,?nil);
??DueTo?:=?-1;
??//可等待的定時(shí)器(只用于Window?NT4或更高)
??hTimCheckPoolDown?:=?CreateWaitableTimer(nil,?False,?nil);
??if?hTimCheckPoolDown?=?0?then?//?Win9x不支持
????//?In?Win9x?number?of?thread?will?be?never?decrised
????hTimCheckPoolDown?:=?CreateEvent(nil,?False,?False,?nil)
??else
????SetWaitableTimer(hTimCheckPoolDown,?DueTo,?30000,?nil,?nil,?False);
end;?{?TThreadsPool.Create?}
destructor?TThreadsPool.Destroy;
var
??n,?i:?Integer;
??Handles:?array?of?THandle;
begin
{$IFNDEF?NOLOGS}
??WriteLog('線程池銷毀',?5);
{$ENDIF}
??csThreadManagment.Enter;
??SetLength(Handles,?FThreads.Count);
??n?:=?0;
??for?i?:=?0?to?FThreads.Count?-?1?do
????if?FThreads[i]?<>?nil?then
????begin
??????Handles[n]?:=?TProcessorThread(FThreads[i]).Handle;
??????TProcessorThread(FThreads[i]).Terminate;
??????Inc(n);
????end;
??csThreadManagment.Leave;??//?lixiaoyu?添加于?2009.1.6,如沒有此行代碼無(wú)法成功釋放正在執(zhí)行中的工作者線程,死鎖。
??WaitForMultipleObjects(n,?@Handles[0],?True,?30000);??//?等待工作者線程執(zhí)行終止??lixiaoyu?注釋于?2009.1.6
??csThreadManagment.Enter;??//?lixiaoyu?添加于?2009.1.6?再次進(jìn)入鎖定,并釋放資源
??for?i?:=?0?to?FThreads.Count?-?1?do
????TProcessorThread(FThreads[i]).Free;
??FThreads.Free;
??FThreadsKilling.Free;
??csThreadManagment.Free;
??csQueueManagment.Enter;
??for?i?:=?FQueue.Count?-?1?downto?0?do
????TObject(FQueue[i]).Free;
??FQueue.Free;
??csQueueManagment.Free;
??CloseHandle(hSemRequestCount);
??CloseHandle(hTimCheckPoolDown);
??inherited;
end;?{?TThreadsPool.Destroy?}
function?TThreadsPool.AddRequest(aDataObject:?TWorkItem;?CheckForDoubles:
??Boolean?=?False):?Boolean;
var
??i:?Integer;
begin
{$IFNDEF?NOLOGS}
??WriteLog('AddRequest('?+?aDataObject.TextForLog?+?')',?2);
{$ENDIF}
??Result?:=?False;
??csQueueManagment.Enter;
??try
????//?如果?CheckForDoubles?=?TRUE
????//?則進(jìn)行任務(wù)是否重復(fù)的檢查
????if?CheckForDoubles?then
??????for?i?:=?0?to?FQueue.Count?-?1?do
????????if?(FQueue[i]?<>?nil)
??????????and?aDataObject.IsTheSame(TWorkItem(FQueue[i]))?then
??????????Exit;?//?發(fā)現(xiàn)有相同的任務(wù)
????csThreadManagment.Enter;
????try
??????//?清除死線程,并補(bǔ)充不足的工作線程
??????CheckThreadsForGrow;
??????//?如果?CheckForDoubles?=?TRUE
??????//?則檢查是否有相同的任務(wù)正在處理中
??????if?CheckForDoubles?then
????????for?i?:=?0?to?FThreads.Count?-?1?do
??????????if?TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject)?then
??????????Exit;?//?發(fā)現(xiàn)有相同的任務(wù)
????finally
??????csThreadManagment.Leave;
????end;
????//將任務(wù)加入隊(duì)列
????FQueue.Add(aDataObject);
????//釋放一個(gè)同步信號(hào)量
????ReleaseSemaphore(hSemRequestCount,?1,?nil);
{$IFNDEF?NOLOGS}
????WriteLog('釋放一個(gè)同步信號(hào)量)',?1);
{$ENDIF}
????Result?:=?True;
??finally
????csQueueManagment.Leave;
??end;
{$IFNDEF?NOLOGS}
??//調(diào)試信息
??WriteLog('增加一個(gè)任務(wù)('?+?aDataObject.TextForLog?+?')',?1);
{$ENDIF}
end;?{?TThreadsPool.AddRequest?}
{
函?數(shù)?名:TThreadsPool.CheckPoolDown
功能描述:線程池停機(jī)(檢查并清除空閑線程和死線程)
輸入?yún)?shù):無(wú)
返?回?值:?無(wú)
創(chuàng)建日期:2006.10.22?11:31
修改日期:2006.
作????者:Kook
附加說(shuō)明:
}
procedure?TThreadsPool.CheckPoolDown;
var
??i:?Integer;
begin
{$IFNDEF?NOLOGS}
??WriteLog('TThreadsPool.CheckPoolDown',?1);
{$ENDIF}
??csThreadManagment.Enter;
??try
{$IFNDEF?NOLOGS}
????WriteLog(InfoText,?2);
{$ENDIF}
????//?清除死線程
????KillDeadThreads;
????//?釋放?FThreadsKilling?列表中的線程
????FreeFinishedThreads;
????//?如果線程空閑,就終止它
????for?i?:=?FThreads.Count?-?1?downto?FThreadsMin?do
??????if?TProcessorThread(FThreads[i]).isIdle?then
??????begin
????????//發(fā)出終止命令
????????TProcessorThread(FThreads[i]).Terminate;
????????//加入待清除隊(duì)列
????????FThreadsKilling.Add(FThreads[i]);
????????//從工作隊(duì)列中除名
????????FThreads.Delete(i);
????????//todo:???
????????Break;
??????end;
??finally
????csThreadManagment.Leave;
??end;
end;?{?TThreadsPool.CheckPoolDown?}
{
函?數(shù)?名:TThreadsPool.CheckThreadsForGrow
功能描述:清除死線程,并補(bǔ)充不足的工作線程
輸入?yún)?shù):無(wú)
返?回?值:?無(wú)
創(chuàng)建日期:2006.10.22?11:31
修改日期:2006.
作????者:Kook
附加說(shuō)明:
}
procedure?TThreadsPool.CheckThreadsForGrow;
var
??AvgWait:?Integer;
??i:?Integer;
begin
??{
????New?thread?created?if:
????新建線程的條件:
??????1.?工作線程數(shù)小于最小線程數(shù)
??????2.?工作線程數(shù)小于最大線程數(shù)?and?線程池平均等待時(shí)間?<?100ms(系統(tǒng)忙)
??????3.?任務(wù)大于工作線程數(shù)的4倍
??}
??csThreadManagment.Enter;
??try
????KillDeadThreads;
????if?FThreads.Count?<?FThreadsMin?then
????begin
{$IFNDEF?NOLOGS}
??????WriteLog('工作線程數(shù)小于最小線程數(shù)',?4);
{$ENDIF}
??????for?i?:=?FThreads.Count?to?FThreadsMin?-?1?do
??????try
????????FThreads.Add(TProcessorThread.Create(Self));
??????except
????????on?e:?Exception?do
??????????WriteLog(
??????????'TProcessorThread.Create?raise:?'?+?e.ClassName?+?#13#10#9'Message:?'
??????????+?e.Message,
??????????9
??????????);
??????end
????end
????else?if?FThreads.Count?<?FThreadsMax?then
????begin
{$IFNDEF?NOLOGS}
??????WriteLog('工作線程數(shù)小于最大線程數(shù)?and?線程池平均等待時(shí)間?<?100ms',?3);
{$ENDIF}
??????AvgWait?:=?PoolAverageWaitingTime;
{$IFNDEF?NOLOGS}
??????WriteLog(Format(
????????'FThreads.Count?(%d)<FThreadsMax(%d),?AvgWait=%d',
????????[FThreads.Count,?FThreadsMax,?AvgWait]),
????????4
????????);
{$ENDIF}
??????if?AvgWait?<?100?then
??????try
????????FThreads.Add(TProcessorThread.Create(Self));
??????except
????????on?e:?Exception?do
??????????WriteLog(
??????????'TProcessorThread.Create?raise:?'?+?e.ClassName?+
??????????#13#10#9'Message:?'?+?e.Message,
??????????9
??????????);
??????end;
????end;
??finally
????csThreadManagment.Leave;
??end;
end;?{?TThreadsPool.CheckThreadsForGrow?}
procedure?TThreadsPool.DoProcessed;
var
??i:?Integer;
begin
??if?(FLastGetPoint?<?FQueue.Count)?then
????Exit;
??csThreadManagment.Enter;
??try
????for?i?:=?0?to?FThreads.Count?-?1?do
??????if?TProcessorThread(FThreads[i]).FCurState?in?[tcsProcessing]?then
????????Exit;
??finally
????csThreadManagment.Leave;
??end;
??DoQueueEmpty(ekProcessingFinished);
end;?{?TThreadsPool.DoProcessed?}
procedure?TThreadsPool.DoProcessRequest(aDataObj:?TWorkItem;?aThread:
??TProcessorThread);
begin
??if?Assigned(FProcessRequest)?then
????FProcessRequest(Self,?aDataObj,?aThread);
end;?{?TThreadsPool.DoProcessRequest?}
procedure?TThreadsPool.DoQueueEmpty(EmptyKind:?TEmptyKind);
begin
??if?Assigned(FQueueEmpty)?then
????FQueueEmpty(Self,?EmptyKind);
end;?{?TThreadsPool.DoQueueEmpty?}
procedure?TThreadsPool.DoThreadFinalizing(aThread:?TProcessorThread);
begin
??if?Assigned(FThreadFinalizing)?then
????FThreadFinalizing(Self,?aThread);
end;?{?TThreadsPool.DoThreadFinalizing?}
procedure?TThreadsPool.DoThreadInitializing(aThread:?TProcessorThread);
begin
??if?Assigned(FThreadInitializing)?then
????FThreadInitializing(Self,?aThread);
end;?{?TThreadsPool.DoThreadInitializing?}
{
函?數(shù)?名:TThreadsPool.FreeFinishedThreads
功能描述:釋放?FThreadsKilling?列表中的線程
輸入?yún)?shù):無(wú)
返?回?值:?無(wú)
創(chuàng)建日期:2006.10.22?11:34
修改日期:2006.
作????者:Kook
附加說(shuō)明:
}
procedure?TThreadsPool.FreeFinishedThreads;
var
??i:?Integer;
begin
??if?csThreadManagment.TryEnter?then
??try
????for?i?:=?FThreadsKilling.Count?-?1?downto?0?do
??????if?TProcessorThread(FThreadsKilling[i]).isFinished?then
??????begin
????????TProcessorThread(FThreadsKilling[i]).Free;
????????FThreadsKilling.Delete(i);
??????end;
??finally
????csThreadManagment.Leave
??end;
end;?{?TThreadsPool.FreeFinishedThreads?}
{
函?數(shù)?名:TThreadsPool.GetRequest
功能描述:申請(qǐng)任務(wù)
輸入?yún)?shù):out?Request:?TRequestDataObject
返?回?值:?無(wú)
創(chuàng)建日期:2006.10.22?11:34
修改日期:2006.
作????者:Kook
附加說(shuō)明:
}
procedure?TThreadsPool.GetRequest(out?Request:?TWorkItem);
begin
{$IFNDEF?NOLOGS}
??WriteLog('申請(qǐng)任務(wù)',?2);
{$ENDIF}
??csQueueManagment.Enter;
??try
????//跳過(guò)空的隊(duì)列元素
????while?(FLastGetPoint?<?FQueue.Count)?and?(FQueue[FLastGetPoint]?=?nil)?do
??????Inc(FLastGetPoint);
????Assert(FLastGetPoint?<?FQueue.Count);
????//壓縮隊(duì)列,清除空元素
????if?(FQueue.Count?>?127)?and?(FLastGetPoint?>=?(3?*?FQueue.Count)?div?4)?then
????begin
{$IFNDEF?NOLOGS}
??????WriteLog('FQueue.Pack',?1);
{$ENDIF}
??????FQueue.Pack;
??????FLastGetPoint?:=?0;
????end;
????Request?:=?TWorkItem(FQueue[FLastGetPoint]);
????FQueue[FLastGetPoint]?:=?nil;
????inc(FLastGetPoint);
????if?(FLastGetPoint?=?FQueue.Count)?then?//如果隊(duì)列中無(wú)任務(wù)
????begin
??????DoQueueEmpty(ekQueueEmpty);
??????FQueue.Clear;
??????FLastGetPoint?:=?0;
????end;
??finally
????csQueueManagment.Leave;
??end;
end;?{?TThreadsPool.GetRequest?}
function?TThreadsPool.InfoText:?string;
begin
??Result?:=?'';
??//end;
??//{$ELSE}
??//var
??//??i:?Integer;
??//begin
??//??csQueueManagment.Enter;
??//??csThreadManagment.Enter;
??//??try
??//????if?(FThreads.Count?=?0)?and?(FThreadsKilling.Count?=?1)?and
??//??????TProcessorThread(FThreadsKilling[0]).isFinished?then
??//??????FreeFinishedThreads;
??//
??//????Result?:=?Format(
??//??????'Pool?thread:?Min=%d,?Max=%d,?WorkingThreadsCount=%d,?TerminatedThreadCount=%d,?QueueLength=%d'#13#10,
??//??????[ThreadsMin,?ThreadsMax,?FThreads.Count,?FThreadsKilling.Count,
??//??????FQueue.Count]
??//????????);
??//????if?FThreads.Count?>?0?then
??//??????Result?:=?Result?+?'Working?threads:'#13#10;
??//????for?i?:=?0?to?FThreads.Count?-?1?do
??//??????Result?:=?Result?+?TProcessorThread(FThreads[i]).InfoText?+?#13#10;
??//????if?FThreadsKilling.Count?>?0?then
??//??????Result?:=?Result?+?'Terminated?threads:'#13#10;
??//????for?i?:=?0?to?FThreadsKilling.Count?-?1?do
??//??????Result?:=?Result?+?TProcessorThread(FThreadsKilling[i]).InfoText?+?#13#10;
??//??finally
??//????csThreadManagment.Leave;
??//????csQueueManagment.Leave;
??//??end;
??//end;
??//{$ENDIF}
end;?{?TThreadsPool.InfoText?}
{
函?數(shù)?名:TThreadsPool.KillDeadThreads
功能描述:清除死線程
輸入?yún)?shù):無(wú)
返?回?值:?無(wú)
創(chuàng)建日期:2006.10.22?11:32
修改日期:2006.
作????者:Kook
附加說(shuō)明:
}
procedure?TThreadsPool.KillDeadThreads;
var
??i:?Integer;
begin
??//?Check?for?dead?threads
??if?csThreadManagment.TryEnter?then
??try
????for?i?:=?0?to?FThreads.Count?-?1?do
??????if?TProcessorThread(FThreads[i]).IsDead?then
??????begin
????????//?Dead?thread?moverd?to?other?list.
????????//?New?thread?created?to?replace?dead?one
????????TProcessorThread(FThreads[i]).Terminate;
????????FThreadsKilling.Add(FThreads[i]);
????????try
??????????FThreads[i]?:=?TProcessorThread.Create(Self);
????????except
??????????on?e:?Exception?do
??????????begin
??????????FThreads[i]?:=?nil;
{$IFNDEF?NOLOGS}
??????????WriteLog(
??????????'TProcessorThread.Create?raise:?'?+?e.ClassName?+
??????????#13#10#9'Message:?'?+?e.Message,
??????????9
??????????);
{$ENDIF}
??????????end;
????????end;
??????end;
??finally
????csThreadManagment.Leave
??end;
end;?{?TThreadsPool.KillDeadThreads?}
function?TThreadsPool.PoolAverageWaitingTime:?Integer;
var
??i:?Integer;
begin
??Result?:=?0;
??if?FThreads.Count?>?0?then
??begin
????for?i?:=?0?to?FThreads.Count?-?1?do
??????Inc(result,?TProcessorThread(FThreads[i]).AverageWaitingTime);
????Result?:=?Result?div?FThreads.Count
??end
??else
????Result?:=?1;
end;?{?TThreadsPool.PoolAverageWaitingTime?}
procedure?TThreadsPool.WriteLog(const?Str:?string;?Level:?Integer?=?0);
begin
{$IFNDEF?NOLOGS}
??uThreadPool.WriteLog(Str,?0,?Level);
{$ENDIF}
end;?{?TThreadsPool.WriteLog?}
//?工作線程僅用于線程池內(nèi),?不要直接創(chuàng)建并調(diào)用它。
{
**********?TProcessorThread?**********
}
constructor?TProcessorThread.Create(APool:?TThreadsPool);
begin
??WriteLog('創(chuàng)建工作線程',?5);
??inherited?Create(True);
??FPool?:=?aPool;
??FAverageWaitingTime?:=?1000;
??FAverageProcessing?:=?3000;
??sInitError?:=?'';
??{
??各參數(shù)的意義如下:
??
???參數(shù)一:填上?nil?即可。
???參數(shù)二:是否采用手動(dòng)調(diào)整燈號(hào)。
???參數(shù)三:燈號(hào)的起始狀態(tài),False?表示紅燈。
???參數(shù)四:Event?名稱,?對(duì)象名稱相同的話,會(huì)指向同一個(gè)對(duì)象,所以想要有兩個(gè)Event對(duì)象,便要有兩個(gè)不同的名稱(這名稱以字符串來(lái)存.為NIL的話系統(tǒng)每次會(huì)自己創(chuàng)建一個(gè)不同的名字,就是被次創(chuàng)建的都是新的EVENT)。
???傳回值:Event?handle。
??}
??hInitFinished?:=?CreateEvent(nil,?True,?False,?nil);
??hThreadTerminated?:=?CreateEvent(nil,?True,?False,?nil);
??csProcessingDataObject?:=?TCriticalSection.Create;
??try
????WriteLog('TProcessorThread.Create::Resume',?3);
????Resume;
????//阻塞,?等待初始化完成
????WaitForSingleObject(hInitFinished,?INFINITE);
????if?sInitError?<>?''?then
??????raise?Exception.Create(sInitError);
??finally
????CloseHandle(hInitFinished);
??end;
??WriteLog('TProcessorThread.Create::Finished',?3);
end;?{?TProcessorThread.Create?}
destructor?TProcessorThread.Destroy;
begin
??WriteLog('工作線程銷毀',?5);
??CloseHandle(hThreadTerminated);
??csProcessingDataObject.Free;
??inherited;
end;?{?TProcessorThread.Destroy?}
function?TProcessorThread.AverageProcessingTime:?DWORD;
begin
??if?(FCurState?in?[tcsProcessing])?then
????Result?:=?NewAverage(FAverageProcessing,?GetTickCount?-?uProcessingStart)
??else
????Result?:=?FAverageProcessing
end;?{?TProcessorThread.AverageProcessingTime?}
function?TProcessorThread.AverageWaitingTime:?DWORD;
begin
??if?(FCurState?in?[tcsWaiting,?tcsCheckingDown])?then
????Result?:=?NewAverage(FAverageWaitingTime,?GetTickCount?-?uWaitingStart)
??else
????Result?:=?FAverageWaitingTime
end;?{?TProcessorThread.AverageWaitingTime?}
procedure?TProcessorThread.Execute;
type
??THandleID?=?(hidTerminateThread,?hidRequest,?hidCheckPoolDown);
var
??WaitedTime:?Integer;
??Handles:?array[THandleID]?of?THandle;
begin
??WriteLog('工作線程進(jìn)常運(yùn)行',?3);
??//當(dāng)前狀態(tài):初始化
??FCurState?:=?tcsInitializing;
??try
????//執(zhí)行外部事件
????FPool.DoThreadInitializing(Self);
??except
????on?e:?Exception?do
??????sInitError?:=?e.Message;
??end;
??//初始化完成,初始化Event綠燈
??SetEvent(hInitFinished);
??WriteLog('TProcessorThread.Execute::Initialized',?3);
??//引用線程池的同步?Event
??Handles[hidTerminateThread]?:=?hThreadTerminated;
??Handles[hidRequest]?:=?FPool.hSemRequestCount;
??Handles[hidCheckPoolDown]?:=?FPool.hTimCheckPoolDown;
??//時(shí)間戳,
??//todo:?好像在線程中用?GetTickCount;?會(huì)不正常
??uWaitingStart?:=?GetTickCount;
??//任務(wù)置空
??FProcessingDataObject?:=?nil;
??//大巡環(huán)
??while?not?terminated?do
??begin
????//當(dāng)前狀態(tài):等待
????FCurState?:=?tcsWaiting;
????//阻塞線程,使線程休眠
????case?WaitForMultipleObjects(Length(Handles),?@Handles,?False,?INFINITE)?-
??????WAIT_OBJECT_0?of
??????WAIT_OBJECT_0?+?ord(hidTerminateThread):
????????begin
??????????WriteLog('TProcessorThread.Execute::?Terminate?event?signaled?',?5);
??????????//當(dāng)前狀態(tài):正在終止線程
??????????FCurState?:=?tcsTerminating;
??????????//退出大巡環(huán)(結(jié)束線程)
??????????Break;
????????end;
??????WAIT_OBJECT_0?+?ord(hidRequest):
????????begin
??????????WriteLog('TProcessorThread.Execute::?Request?semaphore?signaled?',?3);
??????????//等待的時(shí)間
??????????WaitedTime?:=?GetTickCount?-?uWaitingStart;
??????????//重新計(jì)算平均等待時(shí)間
??????????FAverageWaitingTime?:=?NewAverage(FAverageWaitingTime,?WaitedTime);
??????????//當(dāng)前狀態(tài):申請(qǐng)任務(wù)
??????????FCurState?:=?tcsGetting;
??????????//如果等待時(shí)間過(guò)短,則檢查工作線程是否足夠
??????????if?WaitedTime?<?5?then
??????????FPool.CheckThreadsForGrow;
??????????//從線程池的任務(wù)隊(duì)列中得到任務(wù)
??????????FPool.GetRequest(FProcessingDataObject);
??????????//開始處理的時(shí)間戳
??????????uProcessingStart?:=?GetTickCount;
??????????//當(dāng)前狀態(tài):執(zhí)行任務(wù)
??????????FCurState?:=?tcsProcessing;
??????????try
{$IFNDEF?NOLOGS}
??????????WriteLog('Processing:?'?+?FProcessingDataObject.TextForLog,?2);
{$ENDIF}
??????????//執(zhí)行任務(wù)
??????????FPool.DoProcessRequest(FProcessingDataObject,?Self);
??????????except
??????????on?e:?Exception?do
??????????WriteLog(
??????????'OnProcessRequest?for?'?+?FProcessingDataObject.TextForLog?+
??????????#13#10'raise?Exception:?'?+?e.Message,
??????????8
??????????);
??????????end;
??????????//釋放任務(wù)對(duì)象
??????????csProcessingDataObject.Enter;
??????????try
??????????FProcessingDataObject.Free;
??????????FProcessingDataObject?:=?nil;
??????????finally
??????????csProcessingDataObject.Leave;
??????????end;
??????????//重新計(jì)算
??????????FAverageProcessing?:=?NewAverage(FAverageProcessing,?GetTickCount?-
??????????uProcessingStart);
??????????//當(dāng)前狀態(tài):執(zhí)行任務(wù)完畢
??????????FCurState?:=?tcsProcessed;
??????????//執(zhí)行線程外事件
??????????FPool.DoProcessed;
??????????uWaitingStart?:=?GetTickCount;
????????end;
??????WAIT_OBJECT_0?+?ord(hidCheckPoolDown):
????????begin
??????????//?!!!?Never?called?under?Win9x
??????????WriteLog('TProcessorThread.Execute::?CheckPoolDown?timer?signaled?',
??????????4);
??????????//當(dāng)前狀態(tài):線程池停機(jī)(檢查并清除空閑線程和死線程)
??????????FCurState?:=?tcsCheckingDown;
??????????FPool.CheckPoolDown;
????????end;
????end;
??end;
??FCurState?:=?tcsTerminating;
??FPool.DoThreadFinalizing(Self);
end;?{?TProcessorThread.Execute?}
function?TProcessorThread.IamCurrentlyProcess(DataObj:?TWorkItem):?Boolean;
begin
??csProcessingDataObject.Enter;
??try
????Result?:=?(FProcessingDataObject?<>?nil)?and
??????DataObj.IsTheSame(FProcessingDataObject);
??finally
????csProcessingDataObject.Leave;
??end;
end;?{?TProcessorThread.IamCurrentlyProcess?}
function?TProcessorThread.InfoText:?string;
const
??ThreadStateNames:?array[TThreadState]?of?string?=
??(
????'tcsInitializing',
????'tcsWaiting',
????'tcsGetting',
????'tcsProcessing',
????'tcsProcessed',
????'tcsTerminating',
????'tcsCheckingDown'
????);
begin
{$IFNDEF?NOLOGS}
??Result?:=?Format(
????'%5d:?%15s,?AverageWaitingTime=%6d,?AverageProcessingTime=%6d',
????[ThreadID,?ThreadStateNames[FCurState],?AverageWaitingTime,
????AverageProcessingTime]
??????);
??case?FCurState?of
????tcsWaiting:
??????Result?:=?Result?+?',?WaitingTime='?+?IntToStr(GetTickCount?-
????????uWaitingStart);
????tcsProcessing:
??????Result?:=?Result?+?',?ProcessingTime='?+?IntToStr(GetTickCount?-
????????uProcessingStart);
??end;
??csProcessingDataObject.Enter;
??try
????if?FProcessingDataObject?<>?nil?then
??????Result?:=?Result?+?'?'?+?FProcessingDataObject.TextForLog;
??finally
????csProcessingDataObject.Leave;
??end;
{$ENDIF}
end;?{?TProcessorThread.InfoText?}
function?TProcessorThread.IsDead:?Boolean;
begin
??Result?:=
????Terminated?or
????(FPool.ThreadDeadTimeout?>?0)?and?(FCurState?=?tcsProcessing)?and
????(GetTickCount?-?uProcessingStart?>?FPool.ThreadDeadTimeout);
??if?Result?then
????WriteLog('Thread?dead',?5);
end;?{?TProcessorThread.IsDead?}
function?TProcessorThread.isFinished:?Boolean;
begin
??Result?:=?WaitForSingleObject(Handle,?0)?=?WAIT_OBJECT_0;
end;?{?TProcessorThread.isFinished?}
function?TProcessorThread.isIdle:?Boolean;
begin
??//?如果線程狀態(tài)是?tcsWaiting,?tcsCheckingDown
??//?并且?空間時(shí)間?>?100ms,
??//?并且?平均等候任務(wù)時(shí)間大于平均工作時(shí)間的?50%
??//?則視為空閑。
??Result?:=
????(FCurState?in?[tcsWaiting,?tcsCheckingDown])?and
????(AverageWaitingTime?>?100)?and
????(AverageWaitingTime?*?2?>?AverageProcessingTime);
end;?{?TProcessorThread.isIdle?}
function?TProcessorThread.NewAverage(OldAvg,?NewVal:?Integer):?Integer;
begin
??Result?:=?(OldAvg?*?2?+?NewVal)?div?3;
end;?{?TProcessorThread.NewAverage?}
procedure?TProcessorThread.Terminate;
begin
??WriteLog('TProcessorThread.Terminate',?5);
??inherited?Terminate;
??SetEvent(hThreadTerminated);
end;?{?TProcessorThread.Terminate?}
procedure?TProcessorThread.WriteLog(const?Str:?string;?Level:?Integer?=?0);
begin
{$IFNDEF?NOLOGS}
??uThreadPool.WriteLog(Str,?ThreadID,?Level);
{$ENDIF}
end;?{?TProcessorThread.WriteLog?}
{
**********?TCriticalSection?**********
}
constructor?TCriticalSection.Create;
begin
??InitializeCriticalSection(FSection);
end;?{?TCriticalSection.Create?}
destructor?TCriticalSection.Destroy;
begin
??DeleteCriticalSection(FSection);
end;?{?TCriticalSection.Destroy?}
procedure?TCriticalSection.Enter;
begin
??EnterCriticalSection(FSection);
end;?{?TCriticalSection.Enter?}
procedure?TCriticalSection.Leave;
begin
??LeaveCriticalSection(FSection);
end;?{?TCriticalSection.Leave?}
function?TCriticalSection.TryEnter:?Boolean;
begin
??Result?:=?TryEnterCriticalSection(FSection);
end;?{?TCriticalSection.TryEnter?}
procedure?NoLogs(const?Str:?string;?LogID:?Integer?=?0;?Level:?Integer?=?0);
begin
end;
initialization
??WriteLog?:=?NoLogs;
end.
總結(jié)
- 上一篇: javascript DOM 遍历
- 下一篇: 作者:郑勇,山东省农业信息中心副主任、高