好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Akka2使用探索5(TypedActors)

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。

有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API ( public接口的实例 ) 来将方法调用异步地委托给其实现类的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.

有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意

和普通Akka actor一样,有类型actor也一次处理一个消息。

什么时候使用有类型的Actor

有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。

工具箱

返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem

判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);

返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);

返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();

返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor. self();


返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());

具体例子及说明

 package  practise.akka.typedactors

 import  akka.dispatch.Future
 import  akka.japi.Option

 /**
 * 这个就是对外的接口,各函数就是Typed Actor的接口方法
 */ 
 public   interface  Squarer {
     void  squareDontCare( int  i);  //fire-forget 

    Future  square( int  i);  //non-blocking send-request-reply 

    Option  squareNowPlease( int  i); //blocking send-request-reply 

     int  squareNow( int  i);  //blocking send-request-reply 
} 


 package  practise.akka.typedactors

 import  akka.dispatch.Future
 import  akka.dispatch.Futures
 import  akka.actor.TypedActor
 import  akka.japi.Option
 import  akka.actor.ActorContext
 import  groovy.util.logging.Log4j
 import  akka.actor.ActorRef

 /**
 * 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
 */ 
@Log4j
 class  SquarerImpl  implements  Squarer, akka.actor.TypedActor.Receiver {
     private  String name;

     public  SquarerImpl() {
         this .name = " default ";
    }

     public  SquarerImpl(String name) {
         this .name = name;
    }

     public   void  squareDontCare( int  i) {
        log.debug(" squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致---- " + i)    //可以从线程号看出是异步处理的 
         int  sq = i * i;  //Nobody cares :( 

         //返回当前的ActorContext, 
         // 此方法仅在一个TypedActor 实现的方法中有效 
        ActorContext context = TypedActor.context();
        println " context ----  " + context

         //返回当前有类型actor的外部代理, 
         // 此方法仅在一个TypedActor 实现的方法中有效 
        Squarer mysq = TypedActor.  self();
        println " --self -- " + mysq

    }

     public  Future  square( int  i) {
        log.debug(" square send-request-reply Future---- " + i)    //可以从线程号看出是异步处理的 
         return  Futures.successful(i * i, TypedActor.dispatcher());
    }

     public  Option  squareNowPlease( int  i) {
        log.debug(" squareNowPlease send-request-reply Option---- " + i)    //可以从线程号看出是异步处理的 
         return  Option.some(i * i);
    }

     public   int  squareNow( int  i) {
        log.debug(" squareNow send-request-reply result---- " + i)    //可以从线程号看出是异步处理的 
         return  i * i;
    }

    @Override
     void  onReceive(Object o, ActorRef actorRef) {
        log.debug(" TypedActor收到消息----${o}---from:${actorRef} ")
    }
} 


 package  practise.akka.typedactors

 import  akka.actor.ActorSystem
 import  akka.actor.TypedActor
 import  akka.actor.TypedProps
 import  com.typesafe.config.ConfigFactory
 import  akka.japi.Creator
 import  groovy.util.logging.Log4j
 import  akka.actor.ActorContext

 /**
 * 这里创建Typed Actor.
 */ 
@Log4j
 class  TypedActorsFactory {

    ActorSystem system

     private   final  String config = """akka {
    loglevel = " ${log?.debugEnabled ?  "DEBUG"  :  "INFO" } "
    actor.provider = " akka.remote.RemoteActorRefProvider "
    remote.netty.hostname = " 127.0.0.1 "
    remote.netty.port = 2552
    remote.log-received-messages = on
    remote.log-sent-messages = on
}"""

    TypedActorsFactory(String sysName) {
         this .system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
    }


    Squarer getTypedActorDefault() {
        Squarer mySquarer =
            TypedActor.get(system).typedActorOf( new  TypedProps (Squarer. class , SquarerImpl. class ));
         //这里创建的是代理类型 
         return  mySquarer
    }

    Squarer getTypedActor(String name) {
        Squarer otherSquarer =
            TypedActor.get(system).typedActorOf( new  TypedProps (Squarer. class ,
                     new  Creator () {
                         public  SquarerImpl create() {  return   new  SquarerImpl(name); }   //这里创建的是具体的实现类型 
                    }),
                    name);   //这个name是actor的name:akka//sys@host:port/user/name 
         return  otherSquarer
    }

} 


下面用几个测试用例实验一下

 package  practise.akka.typedactors

 import  akka.actor.ActorRef
 import  akka.actor.TypedActor
 import  akka.actor.UntypedActorContext
 import  akka.dispatch.Future
 import  com.baoxian.akka.AkkaClientNoReply
 import  com.baoxian.akka.AkkaServerApp

 class  TestTypedActors  extends  GroovyTestCase {

    def testTypeActor() {
        println(" ---- ")
        TypedActorsFactory factory =  new  TypedActorsFactory(" typedServer ")
 //        Squarer squarer = factory?.getTypedActorDefault()   //创建代理 
        Squarer squarer = factory?.getTypedActor(" serv ")       //具体实现 
        squarer?.squareDontCare(10)
        Future future = squarer?.square(10)
        AkkaServerApp app =  new  AkkaServerApp(" tmp ", " 127.0.0.1 ", 6666, " result ")    //这是我自己构建的接收器 
        app.messageProcessor = {msg, UntypedActorContext context ->
            log.info("  
结果为 " + msg) } app.startup() akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回结果pipe到接收器中了,在log中能看到结果 println " ---- " + squarer?.squareNowPlease(10)?.get() println " ---- " + squarer?.squareNow(10) //返回有类型actor扩展 TypedActor.get(factory.system) //返回一个外部有类型actor代理所代表的Akka actor ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer); actor.tell(" 消息 ") //这个消息将会在SquarerImpl的onReceive方法中接收到 sleep(1000 * 60 * 10) // TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor TypedActor.get(factory.system).poisonPill(squarer); //这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它 } def testRemoteTypedActor() { AkkaClientNoReply client = new AkkaClientNoReply(" akka://typedServer@127.0.0.1:2552/user/serv ") client.send(" 远程消息 ") //这将会在SquarerImpl的onReceive方法中接收到 sleep(1000) client.shutdown() } }

查看更多关于Akka2使用探索5(TypedActors)的详细内容...

  阅读:45次