IOCP线程池的开发-(2)

2016-08-22 10:21:16来源:http://fxh7622.blog.51cto.com/63841/482723作者:fxh7622人点击


下来我们看一下如何使用Delphi编写一个IOCP的线程池。
创建一个IOCP线程池,至少需要2个基本类。首先:我们定义一个管理工作线程的线程池类(TTheadPool),这个类用来将接收到的数据转发给相应的工作线程类,同时将工作线程类返回的数据进行排队处理。其次:我们定义真正处理数据的工作线程类,此类用来对线程池类发来的数据进行相应的处理,同时将处理返回的数据通过线程池类返回。
现在我们分别来看一下这两个类如何定义和编写。TTheadPool的作用是用来管理工作线程类,那么它至少需要 创建工作线程函数和释放工作线程函数。//IOCP管理m_hCompPort: THandle;m_iWorkThreads: Word;m_pWorkThreads: array of TWorkThread;function CreateWorkThreads: Boolean;function DestroyWorkThreads: Boolean;procedure CreateCompPort;procedure CloseCompPort;
m_hCompPort是完成端口
m_iWorkThreads是用来记录创建工作线程的数量
m_pWorkThreads是工作线程的动态数组
CreateWorkThreads函数用来创建工作线程
DestroyWorkThreads函数用来销毁工作线程
CreateCompPort函数用来创建完成端口
CloseCompPort函数用来关闭完成端口
下来看一下以上函数的实现function TTheadPool.CreateWorkThreads: Boolean;var I: Integer;begin if m_pWorkThreads <> nil then begin Result:=True; Exit; end;
m_iWorkThreads:=0;
if m_WorkCount = 0 then SetLength(m_pWorkThreads, getNumberOfProcessor * 2 + 2) else SetLength(m_pWorkThreads, m_WorkCount);
for I:=Low(m_pWorkThreads) to High(m_pWorkThreads) do begin m_pWorkThreads[i]:=TWorkThread.Create(Self); Inc(m_iWorkThreads); end;
if m_iWorkThreads = 0 then begin Result:=False; Exit;end; Result:=True;end;
function TTheadPool.DestroyWorkThreads: Boolean;var I: Integer;begin if m_pWorkThreads = nil then begin Result:=True; Exit; end;
for i:=0 to m_iWorkThreads - 1 do begin PostQueuedCompletionStatus(m_hCompPort,0, 0, Pointer(SHUTDOWN_FLAG)); end;
Sleep(0);
for I:=Low(m_pWorkThreads) to High(m_pWorkThreads) do begin m_pWorkThreads[i].Destroy; end;
Result:=True;end;
procedure TTheadPool.CreateCompPort;begin if m_hCompPort = 0 then begin m_hCompPort:=CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); end;end;
procedure TTheadPool.CloseCompPort;begin if m_hCompPort <> 0 then beginCloseHandle(m_hCompPort);m_hCompPort:=0;end;end;
//数据合并与管理QueueCS: TRTLCriticalSection;First, Last: PMsgBuffer;procedure Reponse;procedure FreeQueue;这段代码是用来加载消息队列,
Reponse函数对消息队列的消息进行处理
FreeQueue函数是用来当程序结束时,销毁未处理的队列消息
procedure TTheadPool.Reponse;var pWork, pNext: PMsgBuffer;begin pWork:=nil; pNext:=nil; EnterCriticalSection(QueueCS); try if not Assigned(pWork) then begin pWork:=First; First:=nil; Last:=nil; end; finally LeaveCriticalSection(QueueCS); end;
while Assigned(pWork) do begin pNext:=pWork.Next; PostQueuedCompletionStatus(m_hCompPort, 0, 0, Pointer(pWork)); pWork:=pNext; end;end;
procedure TTheadPool.FreeQueue;var pNext: PMsgBuffer;begin while Assigned(First) do begin pNext:=First.Next; if Assigned(First.Buffer) then begin FreeMem(First.Buffer); end; Dispose(First); First:=pNext; end; First:=nil; Last:=nil;end;
以上就是线程池管理类(TTheadPool)的处理过程和主要函数。
对于工作线程类来说处理起来就比较简单了。TWorkThread = class(TThread) protected procedure Execute; override; public procedure Reponse(pMsg: PMsgBuffer); procedure TestWork(pBuf: Pchar; iBufLen, iSocketHandle: Integer); public Parent: TTheadPool; Constructor Create(TheadPool: TTheadPool); Destructor Destroy; Override; end; Reponse函数是用来对于线程池管理类发送过来的消息进行分发的函数。TestWork函数是对于线程池管理类发送过来的数据进行处理的实际的函数。
procedure TWorkThread.Execute;var bRet: BOOL; iRecvlen: Cardinal; pMsg: PCRCMsgBuffer; iBufferlen: Cardinal;begin while not Terminated do begin try pMsg:=nil; bRet:=GetQueuedCompletionStatus(Parent.m_hCompPort, iBufferlen, iRecvlen, POverlapped(pMsg), INFINITE);
//主动断开 if (Cardinal(pMsg) = SHUTDOWN_FLAG) then begin Terminate; end;
//关闭 if Terminated then begin Break; end;
if bRet and Assigned(pMsg) then begin Reponse(pMsg); end; except Continue; end; end;end;
procedure TCRCWorkThread.Reponse(pMsg: PCRCMsgBuffer);begin case pMsg.MainType of USER_TEST: //用户发送来的相关协议 TestWork(pMsg.pData, pMsg.iDatalen, pMsg.SocketHandle); else g_MainThread.AddLog(Format('未知的信息:%d',[pMsg.MainType])); end;
if Assigned(pMsg.pData) then begin FreeMem(pMsg.pData); end; Dispose(pMsg);end;
这样我们就将一个IOCP的线程池编写完成,其中需要注意的2点。1:SHUTDOWN_FLAG是自己定义的一个常量,当线程池类需要关闭的时候,线程池类可以通过PostQueuedCompletionStatus向每一个工作线程发送消息,通知工作线程关闭2:在线程池类中申请的内存PMsgBuffer 是在工作线程中进行释放的。

以上就是IOCP线程池的相关内容,希望通过我的BLOG,可以让大家能够熟知IOCP线程池其实没有那么的神秘。


最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台