java actor模型和消息传递实现分析

2017-12-30 11:32:05来源:oschina作者:xiaomin0322人点击

分享
java actor模型和消息传递实现分析 博客分类: java

Actor模型是一种基于协程的消息传递模型,在并行计算和并发的消息传递中有很好的性能表现。一般的actor模块框架提供了超轻量级的线程和工具,可以在这些线程之间进行快速、安全、零复制的消息传递。在elang,ruby,lua等语言中都是直接在VM级别支持协程,VM帮你做context的保存和恢复。而在java中,却没有内置actor模型实现,但是有几个开源框架也模拟了actor模型的实现。



基于 actor 的系统通过实现一种消息传递模式,使并行处理更容易编码。在此模式中,系统中的每个 actor 都可接收消息;执行该消息所表示的操作;然后将消息发送给其他 actor(包括它们自己)以执行复杂的操作序列。actor 之间的所有消息是异步的,这意味着发送者会在收到任何回复之前继续进行处理。因此,一个 actor 可能终生都陷入接收和处理消息的无限循环中。


当使用多个 actor 时,独立的活动可轻松分配到多个可并行执行消息的线程上(进而分配在多个处理器上)。一般而言,每个 actor 都在一个独立线程上处理消息。一些 actor 系统静态地向 actor 分配线程;而其他系统(比如本文中介绍的系统)则会动态地分配它们。




下面我们会分析下java中的一个actor模型框架的实现:



我们先看下elang中的actor模型的实现:



以Erlang为例子,介绍一下简单的Actor模型


1.首先建立一个Actor,在erlang中,起一个进程(这个是erlang虚拟机进程,跟os进程不同),这个进程就是actor了,可以用来接收和发送各种消息了


Pid = spawn(Mod,func,Args) %起一个进程


2.处理收到的消息


func()->


receive


{From,Msg}-> %收到一个消息


%%do something


func();


3.要对这个actor发送消息,也非常简单


Pid ! {From,Msg}




ujavaactor框架:



下面摘自ibm developer的一段介绍



μJavaActors 是 actor 系统的一个简单的 Java 实现。只有 1,200 行代码,μJavaActors 虽然很小,但很强大。在下面的练习中,您将学习如何使用 μJavaActors 动态地创建和管理 actor,将消息传送给它们。


μJavaActors 围绕 3 个核心界面而构建:

消息是在 actor 之间发送的消息。Message是 3 个(可选的)值和一些行为的容器:source是发送 actor。
subject是定义消息含义的字符串(也称为命令)。
data是消息的任何参数数据;通常是一个映射、列表或数组。参数可以是要处理和/或其他 actor 要与之交互的数据。
subjectMatches()检查消息主题是否与字符串或正则表达式匹配。
μJavaActors 包的默认消息类是DefaultMessage。
ActorManager是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。ActorManager拥有以下关键行为或特征:createActor()创建一个 actor 并将它与此管理器相关联。
startActor()启动一个 actor。
detachActor()停止一个 actor 并将它与此管理器断开。
send()/broadcast()将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或所有 actor。
在大部分程序中,只有一个ActorManager,但如果您希望管理多个线程和/或 actor 池,也可以有多个ActorManager。此接口的默认实现是DefaultActorManager。
Actor是一个执行单元,一次处理一条消息。Actor具有以下关键行为或特征: 每个 actor 有一个name,该名称在每个ActorManager中必须是惟一的。
每个 actor 属于一个category;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。
只要ActorManager可以提供一个执行 actor 的线程,系统就会调用receive()。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(比如等待人为输入)。
willReceive()允许 actor 过滤潜在的消息主题。
peek()允许该 actor 和其他 actor 查看是否存在挂起的消息(或许是为了选择主题)。
remove()允许该 actor 和其他 actor 删除或取消任何尚未处理的消息。
getMessageCount()允许该 actor 和其他 actor 获取挂起的消息数量。
getMaxMessageCount()允许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。
大部分程序都有许多 actor,这些 actor 常常具有不同的类型。actor 可在程序启动时创建或在程序执行时创建(和销毁)。本文中的actor 包包含一个名为AbstractActor的抽象类,actor 实现基于该类。





Java代码
收藏代码

publicabstractclassAbstractActorextendsUtilsimplementsActor{
publicstaticfinalintDEFAULT_MAX_MESSAGES=100;
protectedDefaultActorManagermanager;publicActorManagergetManager(){
returnmanager;
}publicvoidsetManager(DefaultActorManagermanager){
if(this.manager!=null&&manager!=null){
thrownewIllegalStateException(
"cannotchangemanagerofattachedactor");
}
this.manager=manager;
}protectedStringname;@Override
publicStringgetName(){
returnname;
}@Override
publicvoidsetName(Stringname){
if(manager!=null){
thrownewIllegalStateException("cannotchangenameifmanagerset");
}
this.name=name;
}protectedStringcategory=DEFAULT_CATEGORY;@Override
publicStringgetCategory(){
returncategory;
}@Override
publicvoidsetCategory(Stringcategory){
this.category=category;
}/**
*Processamessageconditionally.IftestMessage()returnsnullnomessage
*willbeconsumed.
*
*@seeAbstractActor#testMessage()
*/
@Override
publicbooleanreceive(){
Messagem=testMessage();
booleanres=m!=null;
if(res){
booleanf=remove(m);
if(!f){
logger.warning("receivemessagenotremoved:%s",m);
}
DefaultMessagedm=(DefaultMessage)m;
try{
dm.fireMessageListeners(newMessageEvent(this,dm,MessageEvent.MessageStatus.DELIVERED));
//logger.trace("receive%sprocessing%s",this.getName(),m);
loopBody(m);
dm.fireMessageListeners(newMessageEvent(this,dm,MessageEvent.MessageStatus.COMPLETED));
}catch(Exceptione){
dm.fireMessageListeners(newMessageEvent(this,dm,MessageEvent.MessageStatus.FAILED));
logger.error("loopexception",e);
}
}
manager.awaitMessage(this);
returnres;
}/**
*Testtoseeifamessageshouldbeprocessed.Subclassesshouldoverride
*/
@Override
publicbooleanwillReceive(Stringsubject){
return!isEmpty(subject);//defaultreceiveallsubjects
}/**Testthecurrentmessage.Defaultactionistoacceptall.*/
protectedMessagetestMessage(){
returngetMatch(null,false);
}/**Processtheacceptedsubject.*/
abstractprotectedvoidloopBody(Messagem);/**Testamessageagainstadefinedsubjectpattern.*/
protectedDefaultMessagegetMatch(Stringsubject,booleanisRegExpr){
DefaultMessageres=null;
synchronized(messages){
res=(DefaultMessage)peekNext(subject,isRegExpr);
}
returnres;
}protectedListmessages=newLinkedList();publicDefaultMessage[]getMessages(){
returnmessages.toArray(newDefaultMessage[messages.size()]);
}@Override
publicintgetMessageCount(){
synchronized(messages){
returnmessages.size();
}
}/**
*Limitthenumberofmessagesthatcanbereceived.Subclassesshouldoverride.
*/
@Override
publicintgetMaxMessageCount(){
returnDEFAULT_MAX_MESSAGES;
}/**Queueamessagedtobeprocessedlater.*/
publicvoidaddMessage(DefaultMessagemessage){
if(message!=null){
synchronized(messages){
if(messages.size() messages.add(message);
//messages.notifyAll();
}else{
thrownewIllegalStateException("toomanymessages,cannotadd");
}
}
}
}@Override
publicMessagepeekNext(){
returnpeekNext(null);
}@Override
publicMessagepeekNext(Stringsubject){
returnpeekNext(subject,false);
}/**
*Seeifamessageexiststhatmeetstheselectioncriteria.
**/
@Override
publicMessagepeekNext(Stringsubject,booleanisRegExpr){
Messageres=null;
if(isActive){
Patternp=subject!=null?(isRegExpr?Pattern.compile(subject)
:null):null;
longnow=newDate().getTime();
synchronized(messages){
for(DefaultMessagem:messages){
if(m.getDelayUntil()<=now){
booleanmatch=subject==null
||(isRegExpr?m.subjectMatches(p):m
.subjectMatches(subject));
if(match){
res=m;
break;
}
}
}
}
}
//logger.trace("peekNext%s,%b:%s",subject,isRegExpr,res);
returnres;
}@Override
publicbooleanremove(Messagemessage){
synchronized(messages){
returnmessages.remove(message);
}
}protectedbooleanisActive;publicbooleanisActive(){
returnisActive;
}@Override
publicvoidactivate(){
isActive=true;
}@Override
publicvoiddeactivate(){
isActive=false;
}/**Dostartupprocessing.*/
protectedvoidrunBody(){
DefaultMessagem=newDefaultMessage("init");
getManager().send(m,null,this);
}@Override
publicvoidrun(){
runBody();
((DefaultActorManager)getManager()).awaitMessage(this);
}protectedbooleanhasThread;publicbooleangetHasThread(){
returnhasThread;
}protectedvoidsetHasThread(booleanhasThread){
this.hasThread=hasThread;
}@Override
publicStringtoString(){
returngetClass().getSimpleName()+"["+bodyString()+"]";
}protectedStringbodyString(){
return"name="+name+",category="+category+",messages="
+messages.size();
}volatileprotectedbooleanshutdown;@Override
publicbooleanisShutdown(){
returnshutdown;
}@Override
publicvoidshutdown(){
shutdown=true;
}volatileprotectedbooleansuspended;@Override
publicvoidsetSuspended(booleanf){
suspended=f;
}@Override
publicbooleanisSuspended(){
returnsuspended;
}
}



我们需要继承AbstractActor抽象类,实现loopBody方法来处理消息内容。


其中receive方法是接收一条消息并进行处理,首先取出一条消息,从消息列表中删除,然后出发消息接收前的MessageEvent.MessageStatus.DELIVERED事件,我们可以实现MessageListener接口来监听消息处理的时间,然后通过抽象方法loopBody来处理消息,最后触发消息接收后的MessageEvent.MessageStatus.COMPLETED事件。


peekNext只是取出消息。


其中isActive表示是否actor已经激活,hasThread表示是否已经分配线程在执行,shutdown是否已经关闭,suspended是否已经挂起。



我们看下defaultMessage消息体:




Java代码
收藏代码

publicclassDefaultMessageextendsUtilsimplementsMessage{......protectedActorsource;protectedStringsubject;protectedObjectdata;protectedListlisteners=newLinkedList();publicvoidaddMessageListener(MessageListenerl){
if(!listeners.contains(l)){
listeners.add(l);
}
}publicvoidremoveMessageListener(MessageListenerl){
listeners.remove(l);
}publicvoidfireMessageListeners(MessageEvente){
for(MessageListenerl:listeners){
l.onMessage(e);
}
}
......
}


可以给每个消息增加一些消息监听事件。



然后重点再看下DefaultActorManager的实现:



初始化:


初始化会创建ActorManager.properties读取配置的启动线程数参数,然后创建一个线程组,创建getThreadCount()个ActorRunnable的线程(用来执行runnables保存的待执行actor队列中的任务),


最后还启动一个Counter的线程,用于计算每个调度时间(1秒)的执行actor数。





Java代码
收藏代码

/**
*Getthedefaultinstance.UsesActorManager.propertiesforconfiguration.
*
*@returnsharedinstance
*/
publicstaticDefaultActorManagergetDefaultInstance(){
if(instance==null){
instance=newDefaultActorManager();
Mapoptions=null;
//ConfigUtilsconfigUtils=newConfigUtils();
//Propertiesp=configUtils
//.loadProperties("ActorManager.properties");
Propertiesp=newProperties();
try{
p.load(newFileInputStream("ActorManager.properties"));
}catch(IOExceptione){
try{
p.load(newFileInputStream("/resource/ActorManager.properties"));
}catch(IOExceptione1){
logger.warning("DefaultActorManager:noconfigutration:"+e);
}
}
if(!isEmpty(p)){
options=newHashMap();
for(Objectkey:p.keySet()){
Stringskey=(String)key;
options.put(skey,p.getProperty(skey));
}
}
instance.initialize(options);
}
returninstance;
}protectedThreadGroupthreadGroup;protectedstaticintgroupCount;protectedListthreads=newLinkedList();//保存线程池的线程列表
/**
*Initializethismanager.Callonlyonce.
*
*@paramoptions
*mapofoptions
*/
@Override
publicvoidinitialize(Mapoptions){
if(!initialized){
initialized=true;
intcount=getThreadCount(options);
ThreadGrouptg=newThreadGroup("ActorManager"+groupCount++);
threadGroup=tg;
for(inti=0;i createThread(i);
}
running=true;
for(Threadt:threads){
//logger.trace("procesNextActorstarting%s",t);
t.start();
}ThreadCounter=newThread(newRunnable(){
@Override
publicvoidrun(){
while(running){
try{
trendValue=sendCount-dispatchCount;
//logger.trace("Counterthread:sc=%d,dc=%d,t=%d",
//sendCount,dispatchCount,trendValue);
lastSendCount=sendCount;
sendCount=0;
updateLastDispatchCount();
Thread.sleep(1000);
}catch(InterruptedExceptione){
break;
}
}
sendCount=lastSendCount=0;
clearDispatchCount();
}
});
Counter.setDaemon(true);
lastDispatchTime=lastSendTime=newDate().getTime();
Counter.start();
}
}
protectedvoidcreateThread(inti){
addThread("actor"+i);
}
/**
*Addadynamicthread.
*
*@paramname
*@return
*/
publicThreadaddThread(Stringname){
Threadt=null;
synchronized(actors){
if(trunnables.containsKey(name)){
thrownewIllegalStateException("alreadyexists:"+name);
}
ActorRunnabler=newActorRunnable();
trunnables.put(name,r);
t=newThread(threadGroup,r,name);
threads.add(t);
//System.out.printf("addThread:%s",name);
}
t.setDaemon(true);
t.setPriority(getThreadPriority());
returnt;
}







Java代码
收藏代码

/**Configurationkeyforthreadcount.*/
publicstaticfinalStringACTOR_THREAD_COUNT="threadCount";protectedMapactors=newLinkedHashMap();protectedMaprunnables=newLinkedHashMap();protectedMapwaiters=newLinkedHashMap();



actors存放所有的actor对象,runnables存放待发送消息(发送方)的actor,waiters存放等待(接收方接收消息)的actors。




创建一个actor实例:




Java代码
收藏代码

/**
*Createanactorandassociateitwiththismanager.
*
*@paramclazz
*theactorclass
*@paramthe
*actorname;mustbeunique
*@paramoptions
*actoroptions
*/
@Override
publicActorcreateActor(Class<?extendsActor>clazz,Stringname,Mapoptions){
AbstractActora=null;
synchronized(actors){
if(!actors.containsKey(name)){
try{
a=(AbstractActor)clazz.newInstance();
a.setName(name);
a.setManager(this);
}catch(Exceptione){
throweinstanceofRuntimeException?(RuntimeException)e:newRuntimeException(
"mappedexception:"+e,e);
}
}else{
thrownewIllegalArgumentException("namealreadyinuse:"+name);
}
}
returna;
}/**
*Startanactor.Musthavebeencreatedbythismanager.
*
*@paramactor
*theactor
*/
@Override
publicvoidstartActor(Actoractor){
if(((AbstractActor)actor).getManager()!=this){
thrownewIllegalStateException("actornotownedbythismanager");
}
Stringname=actor.getName();
synchronized(actors){
if(actors.containsKey(name)){
thrownewIllegalStateException("alreadystarted");
}
((AbstractActor)actor).shutdown=false;
actors.put(name,(AbstractActor)actor);
runnables.put(name,(AbstractActor)actor);
}
actor.activate();
}

启动一个actor实例会把actor放入actors队列和runnables等待执行队列(发送方发送消息)中,然后调用actor的activate()方法做初始化。




我们再看看ActorRunnable线程池中的线程执行actor的过程:



actorRunnable.java是DefaultActorManager的内部类,可以用于访问DefaultActorManager的actor实例列表



procesNextActor 从runnables的待执行actor列表中取出第一个(fifo)actor,如果不为空,则执行该actor线程的run方法,发送消息,actor接口是继承了runable接口的,如果runnables列表为空,则从waiters列表中取出一个actor执行,接收消息,接收到消息,响应的增加dispatchCount的值,表示已经消费的消息数。如果procesNextActor执行失败,则主线程等待100ms,否则循环执行。





Java代码
收藏代码

/**publicintendedonlyfor"friend"access.*/
publicclassActorRunnableimplementsRunnable{
publicbooleanhasThread;
publicAbstractActoractor;publicvoidrun(){
//logger.trace("procesNextActorstarting");
intdelay=1;
while(running){
try{
if(!procesNextActor()){
//logger.trace("procesNextActorwaitingonactor");
//sleep(delay*1000);
synchronized(actors){
//TOOD:adjustthisdelay;possibleparameter
//wewanttominizmizeoverhead(makebigger);
//butithasabigimpactonmessageprocessing
//rate(makesmaller)
//actors.wait(delay*1000);
actors.wait(100);
}
delay=Math.max(5,delay+1);
}else{
delay=1;
}
}catch(InterruptedExceptione){
}catch(Exceptione){
logger.error("procesNextActorexception",e);
}
}
//logger.trace("procesNextActorended");
}protectedbooleanprocesNextActor(){
booleanrun=false,wait=false,res=false;
actor=null;
synchronized(actors){
for(Stringkey:runnables.keySet()){
actor=runnables.remove(key);
break;
}
}
if(actor!=null){
//firstrunneverstarted
run=true;
actor.setHasThread(true);
hasThread=true;
try{
actor.run();
}finally{
actor.setHasThread(false);
hasThread=false;
}
}else{
synchronized(actors){
for(Stringkey:waiters.keySet()){
actor=waiters.remove(key);
break;
}
}
if(actor!=null){
//thenwaitingforresponses
wait=true;
actor.setHasThread(true);
hasThread=true;
try{
res=actor.receive();
if(res){
incDispatchCount();
}
}finally{
actor.setHasThread(false);
hasThread=false;
}
}
}
//if(!(!run&&wait&&!res)&&a!=null){
//logger.trace("procesNextActor%b/%b/%b:%s",run,wait,res,a);
//}
returnrun||res;
}
}



接下来就是调用发送消息的api了,send将消息message从from发送到to actor中,sentMessages保存了每个actor的消息列表,还有批量发送到多个actor中,send(Message message, Actor from, String category)是将消息发送到分类为category的actor中。broadcast是将消息发送到所有的actor中。





Java代码
收藏代码

/**
*Sendamessage.
*
*@parammessage
*messageto
*@paramfrom
*sourceactor
*@paramto
*targetactor
*@returnnumberofreceivingactors
*/
@Override
publicintsend(Messagemessage,Actorfrom,Actorto){
intcount=0;
if(message!=null){
AbstractActoraa=(AbstractActor)to;
if(aa!=null){
if(!aa.isShutdown()&&!aa.isSuspended()&&aa.willReceive(message.getSubject())){
DefaultMessagexmessage=(DefaultMessage)((DefaultMessage)message).assignSender(from);
//logger.trace("%sto%s",xmessage,to);
aa.addMessage(xmessage);
xmessage.fireMessageListeners(newMessageEvent(aa,xmessage,MessageEvent.MessageStatus.SENT));
sendCount++;
lastSendTime=newDate().getTime();
if(recordSentMessages){
synchronized(sentMessages){
Stringaname=aa.getName();
Listl=sentMessages.get(aname);
if(l==null){
l=newLinkedList();
sentMessages.put(aname,l);
}
//keepfromgettingtoobig
if(l.size()<100){
l.add(xmessage);
}
}
}
count++;
synchronized(actors){
actors.notifyAll();
}
}
}
}
returncount;
}/**
*Sendamessage.
*
*@parammessage
*messageto
*@paramfrom
*sourceactor
*@paramto
*targetactors
*@returnnumberofreceivingactors
*/
@Override
publicintsend(Messagemessage,Actorfrom,Actor[]to){
intcount=0;
for(Actora:to){
count+=send(message,from,a);
}
returncount;
}/**
*Sendamessage.
*
*@parammessage
*messageto
*@paramfrom
*sourceactor
*@paramto
*targetactors
*@returnnumberofreceivingactors
*/
@Override
publicintsend(Messagemessage,Actorfrom,Collectionto){
intcount=0;
for(Actora:to){
count+=send(message,from,a);
}
returncount;
}/**
*Sendamessage.
*
*@parammessage
*messageto
*@paramfrom
*sourceactor
*@paramcategory
*targetactorcategory
*@returnnumberofreceivingactors
*/
@Override
publicintsend(Messagemessage,Actorfrom,Stringcategory){
intcount=0;
Mapxactors=cloneActors();
ListcatMembers=newLinkedList();
for(Stringkey:xactors.keySet()){
Actorto=xactors.get(key);
if(category.equals(to.getCategory())&&(to.getMessageCount() catMembers.add(to);
}
}
//findanactorwithlowestmessagecount
intmin=Integer.MAX_VALUE;
Actoramin=null;
for(Actora:catMembers){
intmcount=a.getMessageCount();
if(mcount min=mcount;
amin=a;
}
}
if(amin!=null){
count+=send(message,from,amin);
//}else{
//thrownew
//IllegalStateException("nocapableactorsforcategory:"+
//category);
}
returncount;
}/**
*Sendamessagetoallactors.
*
*@parammessage
*messageto
*@paramfrom
*sourceactor
*@returnnumberofreceivingactors
*/
@Override
publicintbroadcast(Messagemessage,Actorfrom){
intcount=0;
Mapxactors=cloneActors();
for(Stringkey:xactors.keySet()){
Actorto=xactors.get(key);
count+=send(message,from,to);
}
returncount;
}


这里每次获取所有的actor列表都是采用clone方式复制当前的actor列表,如果长时间加锁,则会降低并发能力。





Java代码
收藏代码

protectedMapcloneActors(){
Mapxactors;
synchronized(actors){
xactors=newHashMap(actors);
}
returnxactors;
}


当然这里还有termina所有的线程





Java代码
收藏代码

/**
*Terminateprocessingandwaitforallthreadstostop.
*/
@Override
publicvoidterminateAndWait(){
logger.trace("terminateAndWaitwaitingonterminationof%dthreads",threads.size());
terminate();
waitForThreads();
}/**
*Waitforallthreadstostop.Musthaveissuedterminate.
*/
publicvoidwaitForThreads(){
if(!terminated){
thrownewIllegalStateException("notterminated");
}
for(Threadt:threads){
try{
//logger.info("terminateAndWaitwaitingfor%s...",t);
t.join();
}catch(InterruptedExceptione){
//logger.info("terminateAndWaitinterrupt");
}
}
}booleanrunning,terminated;/**
*Terminateprocessing.
*/
@Override
publicvoidterminate(){
terminated=true;
running=false;
for(Threadt:threads){
t.interrupt();
}
synchronized(actors){
for(Stringkey:actors.keySet()){
actors.get(key).deactivate();
}
}
sentMessages.clear();
sendCount=lastSendCount=0;
clearDispatchCount();
}


也可以挂起某个actor,直到有消息达到,这里就是讲actor加入waiters列表,注意上面send发送消息之后,会调用acoters.nofityAll唤醒等待线程





Java代码
收藏代码

/**
*Suspendanactoruntilithasareadmessage.
*
*@paramactor
*receivingactor
*/
publicvoidawaitMessage(AbstractActoractor){
synchronized(actors){
waiters.put(actor.getName(),actor);
//actors.notifyAll();
//logger.trace("awaitMessagewaiters=%d:%s",waiters.size(),a);
}
}


startActor时才将actor加入runnables队列。



示例代码:





Java代码
收藏代码

importjava.util.Date;
importjava.util.HashMap;
importjava.util.LinkedList;
importjava.util.List;
importjava.util.Map;
importjava.util.Random;
importjava.util.concurrent.ConcurrentHashMap;importjavax.swing.event.ChangeEvent;
importjavax.swing.event.ChangeListener;importcom.ibm.actor.Actor;
importcom.ibm.actor.DefaultActorManager;
importcom.ibm.actor.DefaultMessage;
importcom.ibm.actor.logging.DefaultLogger;
importcom.ibm.actor.utils.Utils;/**
*Asetofruntimeservicesfortestingactorsandatestcasedriver.
*
*@authorBFEIGENB
*
*/
publicclassDefaultActorTestextendsUtils{publicstaticfinalintMAX_IDLE_SECONDS=10;//publicstaticfinalintSTEP_COUNT=3*60;
publicstaticfinalintTEST_VALUE_COUNT=1000;//TODO:makebiggerpublicDefaultActorTest(){
super();
}privateMaptestActors=newConcurrentHashMap();staticRandomrand=newRandom();publicstaticintnextInt(intlimit){
returnrand.nextInt(limit);
}protectedDefaultActorManagergetManager(){
DefaultActorManageram=actorManager!=null?actorManager:newDefaultActorManager();
returnam;
}protectedintstepCount=120;publicvoidsetStepCount(intstepCount){
this.stepCount=stepCount;
}publicintgetStepCount(){
returnstepCount;
}protectedintthreadCount=10;publicintgetThreadCount(){
returnthreadCount;
}publicvoidsetThreadCount(intthreadCount){
this.threadCount=threadCount;
}publicvoidsetTestActors(MaptestActors){
this.testActors=testActors;
}publicMapgetTestActors(){
returntestActors;
}publicstaticfinalintCOMMON_ACTOR_COUNT=10;
publicstaticfinalintTEST_ACTOR_COUNT=25;
publicstaticfinalintPRODUCER_ACTOR_COUNT=25;publicstaticvoidsleeper(intseconds){
intmillis=seconds*1000+-50+nextInt(100);//alittle
//variation
//logger.trace("sleep:%dms",millis);
sleep(millis);
}publicstaticvoiddumpMessages(Listmessages){
synchronized(messages){
if(messages.size()>0){
for(DefaultMessagem:messages){
logger.info("%s",m);
}
}
}
}protectedListlisteners=newLinkedList();publicvoidaddChangeListener(ChangeListenerl){
if(!listeners.contains(l)){
listeners.add(l);
}
}publicvoidremoveChangeListener(ChangeListenerl){
listeners.remove(l);
}protectedvoidfireChangeListeners(ChangeEvente){
for(ChangeListenerl:listeners){
l.stateChanged(e);
}
}protectedstaticString[]types=newString[]{"widget","framit","frizzle","gothca","splat"};publicstaticString[]getItemTypes(){
returntypes;
}publicstaticvoidmain(String[]args){
DefaultActorTestat=newDefaultActorTest();
at.run(args);
logger.trace("Done");
}protectedStringtitle;publicStringgetTitle(){
returntitle;
}
volatileprotectedbooleandone;publicvoidterminateRun(){
done=true;
}publicstaticString[]getTestNames(){
returnnewString[]{"Countdown","ProducerConsumer",/*"Quicksort",*/"MapReduce","VirusScan","All"};
}DefaultActorManageractorManager;publicDefaultActorManagergetActorManager(){
returnactorManager;
}publicvoidsetActorManager(DefaultActorManageractorManager){
this.actorManager=actorManager;
}publicvoidrun(String[]args){
done=false;
//DefaultLogger.getDefaultInstance().setIncludeDate(false);
DefaultLogger.getDefaultInstance().setIncludeContext(false);
DefaultLogger.getDefaultInstance().setIncludeCaller(false);
//DefaultLogger.getDefaultInstance().setIncludeThread(false);
DefaultLogger.getDefaultInstance().setLogToFile(false);
DefaultLogger.getDefaultInstance().setThreadFieldWidth(10);intsc=stepCount;
inttc=threadCount;
booleandoTest=false;
title="";if(!doTest){
doTest=true;
}
if(doTest){
if(title.length()>0){
title+="";
}
title+="(CountdownTest)";
}DefaultActorManageram=getManager();
try{
Mapoptions=newHashMap();
options.put(DefaultActorManager.ACTOR_THREAD_COUNT,tc);
am.initialize(options);
if(doTest){
for(inti=0;i Actora=am.createActor(TestActor.class,String.format("common%02d",i));
if(ainstanceofTestableActor){
TestableActorta=(TestableActor)a;
ta.setActorTest(this);
}
a.setCategory(TestActor.class.getSimpleName());
getTestActors().put(a.getName(),a);
//logger.trace("created:%s",a);
}
for(inti=0;i Actora=am.createActor(TestActor.class,String.format("actor%02d",i));
if(ainstanceofTestableActor){
TestableActorta=(TestableActor)a;
ta.setActorTest(this);
}
getTestActors().put(a.getName(),a);
//logger.trace("created:%s",a);
}
}for(Stringkey:getTestActors().keySet()){
am.startActor(getTestActors().get(key));
}for(inti=sc;i>0;i--){
if(done){
break;
}
//seeifidleawhile
longnow=newDate().getTime();
if(am.getActiveRunnableCount()==0){
if(now-am.getLastDispatchTime()>MAX_IDLE_SECONDS*1000
&&now-am.getLastSendTime()>MAX_IDLE_SECONDS*1000){
break;
}
}
setStepCount(i);
fireChangeListeners(newChangeEvent(this));
if(i<10||i%10==0){
logger.trace("mainwaiting:%d...",i);
}
sleeper(1);
}
setStepCount(0);
fireChangeListeners(newChangeEvent(this));//logger.trace("mainterminating");
am.terminateAndWait();
}catch(Exceptione){
e.printStackTrace();
}
}
}



Java代码
收藏代码

/**
*Anactorthatsendsmessageswhilecountingdownasendcount.
*
*@authorBFEIGENB
*
*/
publicclassTestActorextendsTestableActor{@Override
publicvoidactivate(){
logger.trace("TestActoractivate:%s",this);
super.activate();
}@Override
publicvoiddeactivate(){
logger.trace("TestActordeactivate:%s",this);
super.deactivate();
}@Override
protectedvoidrunBody(){
//logger.trace("TestActor:%srunBody:%s",getName(),this);
DefaultActorTest.sleeper(1);
DefaultMessagem=newDefaultMessage("init",8);
getManager().send(m,null,this);
}@Override
protectedvoidloopBody(Messagem){
//logger.trace("TestActor:%sloopBody%s:%s",getName(),m,this);
DefaultActorTest.sleeper(1);
Stringsubject=m.getSubject();
if("repeat".equals(subject)){
intcount=(Integer)m.getData();
logger.trace("TestActor:%srepeat(%d)%s:%s",getName(),count,m,
this);
if(count>0){
m=newDefaultMessage("repeat",count-1);
//logger.trace("TestActorloopBodysend%s:%s",m,this);
StringtoName="actor"
+DefaultActorTest
.nextInt(DefaultActorTest.TEST_ACTOR_COUNT);
Actorto=actorTest.getTestActors().get(toName);
if(to!=null){
getManager().send(m,this,to);
}else{
logger.warning("repeat:%stoisnull:%s",getName(),
toName);
}
}
}elseif("init".equals(subject)){
intcount=(Integer)m.getData();
count=DefaultActorTest.nextInt(count)+1;
logger.trace("TestActor:%sinit(%d):%s",getName(),count,this);
for(inti=0;i DefaultActorTest.sleeper(1);
m=newDefaultMessage("repeat",count);
//logger.trace("TestActorrunBodysend%s:%s",m,this);
StringtoName="actor"
+DefaultActorTest
.nextInt(DefaultActorTest.TEST_ACTOR_COUNT);
Actorto=actorTest.getTestActors().get(toName);
if(to!=null){
getManager().send(m,this,to);
}else{
logger.warning("init:%stoisnull:%s",getName(),toName);
}
DefaultMessagedm=newDefaultMessage("repeat",count);
dm.setDelayUntil(newDate().getTime()
+(DefaultActorTest.nextInt(5)+1)*1000);
getManager().send(dm,this,this.getClass().getSimpleName());
}
}else{
logger.warning("TestActor:%sloopBodyunknownsubject:%s",
getName(),subject);
}
}}



输出:





Java代码
收藏代码

12:59:38.883T[main]-TestActoractivate:TestActor[name=common06,category=TestActor,messages=0]
12:59:38.886T[main]-TestActoractivate:TestActor[name=common05,category=TestActor,messages=0]
12:59:38.887T[main]-TestActoractivate:TestActor[name=common08,category=TestActor,messages=0]
12:59:38.889T[main]-TestActoractivate:TestActor[name=common07,category=TestActor,messages=0]
12:59:38.890T[main]-TestActoractivate:TestActor[name=common09,category=TestActor,messages=0]
12:59:38.891T[main]-TestActoractivate:TestActor[name=common00,category=TestActor,messages=0]
12:59:38.892T[main]-TestActoractivate:TestActor[name=common01,category=TestActor,messages=0]
12:59:38.893T[main]-TestActoractivate:TestActor[name=common02,category=TestActor,messages=0]
12:59:38.895T[main]-TestActoractivate:TestActor[name=common03,category=TestActor,messages=0]
12:59:38.896T[main]-TestActoractivate:TestActor[name=common04,category=TestActor,messages=0]
12:59:38.897T[main]-TestActoractivate:TestActor[name=actor24,category=default,messages=0]
12:59:38.899T[main]-TestActoractivate:TestActor[name=actor11,category=default,messages=0]
12:59:38.904T[main]-TestActoractivate:TestActor[name=actor23,category=default,messages=0]
12:59:38.905T[main]-TestActoractivate:TestActor[name=actor10,category=default,messages=0]
12:59:38.906T[main]-TestActoractivate:TestActor[name=actor22,category=default,messages=0]
12:59:38.907T[main]-TestActoractivate:TestActor[name=actor13,category=default,messages=0]
12:59:38.908T[main]-TestActoractivate:TestActor[name=actor21,category=default,messages=0]
12:59:38.909T[main]-TestActoractivate:TestActor[name=actor12,category=default,messages=0]
12:59:38.910T[main]-TestActoractivate:TestActor[name=actor20,category=default,messages=0]
12:59:38.911T[main]-TestActoractivate:TestActor[name=actor02,category=default,messages=0]
12:59:38.912T[main]-TestActoractivate:TestActor[name=actor01,category=default,messages=0]
12:59:38.914T[main]-TestActoractivate:TestActor[name=actor00,category=default,messages=0]
12:59:38.915T[main]-TestActoractivate:TestActor[name=actor19,category=default,messages=0]
12:59:38.916T[main]-TestActoractivate:TestActor[name=actor06,category=default,messages=0]
12:59:38.917T[main]-TestActoractivate:TestActor[name=actor18,category=default,messages=0]
12:59:38.917T[main]-TestActoractivate:TestActor[name=actor05,category=default,messages=0]
12:59:38.918T[main]-TestActoractivate:TestActor[name=actor04,category=default,messages=0]
12:59:38.919T[main]-TestActoractivate:TestActor[name=actor03,category=default,messages=0]
12:59:38.920T[main]-TestActoractivate:TestActor[name=actor15,category=default,messages=0]
12:59:38.921T[main]-TestActoractivate:TestActor[name=actor14,category=default,messages=0]
12:59:38.922T[main]-TestActoractivate:TestActor[name=actor09,category=default,messages=0]
12:59:38.923T[main]-TestActoractivate:TestActor[name=actor17,category=default,messages=0]
12:59:38.923T[main]-TestActoractivate:TestActor[name=actor08,category=default,messages=0]
12:59:38.924T[main]-TestActoractivate:TestActor[name=actor16,category=default,messages=0]
12:59:38.925T[main]-TestActoractivate:TestActor[name=actor07,category=default,messages=0]
12:59:38.926T[main]-mainwaiting:120...
12:59:42.970T[actor3]-TestActor:common06init(4):TestActor[name=common06,category=TestActor,messages=0]
12:59:43.028T[actor2]-TestActor:common03init(8):TestActor[name=common03,category=TestActor,messages=0]
12:59:43.048T[actor1]-TestActor:common01init(6):TestActor[name=common01,category=TestActor,messages=0]
12:59:43.054T[actor8]-TestActor:common05init(4):TestActor[name=common05,category=TestActor,messages=0]
12:59:43.064T[actor6]-TestActor:common09init(8):TestActor[name=common09,category=TestActor,messages=0]
12:59:43.873T[actor0]-TestActor:common02init(7):TestActor[name=common02,category=TestActor,messages=0]
12:59:43.898T[actor9]-TestActor:common08init(1):TestActor[name=common08,category=TestActor,messages=0]
12:59:43.993T[actor7]-TestActor:common04init(2):TestActor[name=common04,category=TestActor,messages=1]
12:59:43.994T[actor5]-TestActor:common00init(1):TestActor[name=common00,category=TestActor,messages=0]
12:59:43.996W[actor2]-init:common03toisnull:actor5
12:59:44.039W[actor8]-init:common05toisnull:actor9
12:59:44.052W[actor1]-init:common01toisnull:actor9
12:59:44.060T[actor4]-TestActor:common07init(5):TestActor[name=common07,category=TestActor,messages=0]
12:59:44.912W[actor0]-init:common02toisnull:actor6
12:59:44.968W[actor5]-init:common00toisnull:actor9
12:59:44.986W[actor3]-init:common06toisnull:actor1
12:59:44.986W[actor7]-init:common04toisnull:actor9
12:59:45.108W[actor4]-init:common07toisnull:actor8
12:59:45.947T[actor9]-TestActor:actor13init(7):TestActor[name=actor13,category=default,messages=0]
12:59:45.951T[actor5]-TestActor:actor22init(1):TestActor[name=actor22,category=default,messages=1]
12:59:45.976W[actor3]-init:common06toisnull:actor1
12:59:46.016W[actor2]-init:common03toisnull:actor6
12:59:46.052W[actor1]-init:common01toisnull:actor6
12:59:46.115W[actor4]-init:common07toisnull:actor9
12:59:46.932W[actor3]-init:common06toisnull:actor5
12:59:46.940W[actor5]-init:actor22toisnull:actor3
12:59:47.039T[actor7]-TestActor:actor24init(8):TestActor[name=actor24,category=default,messages=1]
12:59:47.042W[actor1]-init:common01toisnull:actor6
12:59:47.059W[actor2]-init:common03toisnull:actor0
12:59:47.904T[actor8]-TestActor:actor21init(7):TestActor[name=actor21,category=default,messages=1]
12:59:47.964T[actor3]-TestActor:actor10init(4):TestActor[name=actor10,category=default,messages=0]
12:59:47.988T[actor5]-TestActor:actor23init(7):TestActor[name=actor23,category=default,messages=2]
12:59:48.003W[actor6]-init:common09toisnull:actor3
12:59:48.042W[actor1]-init:common01toisnull:actor6
12:59:48.072W[actor2]-init:common03toisnull:actor8
12:59:48.101W[actor4]-init:common07toisnull:actor6
12:59:48.880W[actor8]-init:actor21toisnull:actor8
12:59:48.903W[actor9]-init:actor13toisnull:actor3
12:59:48.952T[main]-mainwaiting:110...
12:59:49.024W[actor2]-init:common03toisnull:actor3
12:59:49.037W[actor6]-init:common09toisnull:actor9
12:59:49.085W[actor4]-init:common07toisnull:actor6

http://zhwj184.iteye.com/blog/1613351



http://www.ibm.com/developerworks/cn/java/j-javaactors/

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台