好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

搭建服务器处理系统(基于netty)

搭建服务器处理系统(基于netty)

搭建服务器处理系统(基于netty)

推荐:

  google rest 一个不错的http测试应用,google浏览器可以使用。做接口什么的,很有帮助。亲,还不快了解一下。

扯淡:

   现在我算是进入一个能带着你向前走的团队,但是产品设计太扯淡,互联网应用,开发周期异常的短,也高估了开发的能力,赶进度的开发bug很多啊。

  如果开发只是完成任务的行动,是不会感到痛苦的。所以说:想要做好产品的开发,痛苦才刚刚开始。原因就是开发无法左右产品的设计.....

 

主题:

  时刻关注排行的朋友注意啦,你们都得了排行强迫症啦,赶快找个护士就医吧。

  一个排行.....(我需要护士)

关于排名的详细: 摸我

好吧,据说 netty排在第一,那就学习一下吧!

更具公司很久以前的一个服务器框架代码,不断删减,然后得到一个很简单的服务器框架,分享一下。

自己画的流程图,流程比较简单,这方面比较弱,不太会用图表达:

1,启用netty

我们需要监听端口,这样就可以处理连接上来的tcp消息了。这一步netty用java 的NIO和OIO都封装了,我们自然选择NIO啦。

一下是启动服务器的代码:对于这个启动,你只要学习一下netty的手册例子,就马上明白了,它手册的例子也很好,建议大家看看。

 public   class   Start {

      public   static   void   main(String[] args) {
          //  ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); 
        System.out.println("=============show time!=============" );
        initNetty();
    }
      private   static   final   int  tcpSendBufferSize = 32768 ;
      private   static   final   int  tcpReceiveBufferSize = 32768 ;
    
      //   初始化端口的监听 
     public   static   void   initNetty(){
        InetSocketAddress addr  =  new  InetSocketAddress(8989); //  需要监听的端口,即tcp连接建立的端口
          //  Executors.newCachedThreadPool()的解释:
          //  缓冲线程执行器,产生一个大小可变的线程池。
          //  当线程池的线程多于执行任务所需要的线程的时候,对空闲线程(即60s没有任务执行)进行回收;
          //  当执行任务的线程数不足的时候,自动拓展线程数量。因此线程数量是JVM可创建线程的最大数目。 
        ServerSocketChannelFactory channelFactory =  new   NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());  //  It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. 
          //    Creates a new group with a generated name. 
        DefaultChannelGroup allChannels =  new  DefaultChannelGroup("pushServerChannelGroup" );
        
        ServerBootstrap bootstrap  =  new   ServerBootstrap(channelFactory);
        
          //   PushServerPipelineFactory作为一个ChannelPipelineFactory产生的工厂类,我们可以把需要执行的Handler进行配置 
        ChannelPipelineFactory pipelineFactory =  new   PushServerPipelineFactory(allChannels);
          //   Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory.
          //   服务器新连接建立的时候,新的ChannelPipeline会通过我们定义的ChannelPipelineFactory产生,其实是调用了getPipeline()方法。 
         bootstrap.setPipelineFactory(pipelineFactory);
        
          if  (tcpReceiveBufferSize != -1 ) {
            bootstrap.setOption( "child.receiveBufferSize" , tcpReceiveBufferSize);
        }
          if  (tcpSendBufferSize != -1 ) {
            bootstrap.setOption( "child.sendBufferSize" , tcpSendBufferSize);
        }
        
        bootstrap.setOption( "reuseAddress",  true  );
        bootstrap.setOption( "child.reuseAddress",  true  );
        bootstrap.setOption( "child.keepAlive",  false  );
        bootstrap.setOption( "child.tcpNoDelay",  true  );
        
        System.out.println( " ===================netty started=====================" );
        Channel serverChannel  =  bootstrap.bind(addr);
        allChannels.add(serverChannel);
    } 

PushServerPipelineFactory 其实就是配置了一下 Handler ,他叫 pushServerCommandHandler ,他的作用就是把接受到的信息放进 叫 receivedQueen的队列 去就好了,其实就是调用了 MessageManager 的 addSocketMessage 方法。

我们看一下他的 messageReceived 方法就明白了,netty是事件机制的, messageReceived 是重写的方法,只要是受到一个连接的消息,就会触发这个方法。

     public   void   messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent)
              throws   Exception {
        CommandMessage command  =  (CommandMessage) messageEvent.getMessage();
        
          if  (command.message.length() > 3 ) {
             Channel ch  =  channelHandlerContext.getChannel();
             ch.write( "---------message received-------------" );
               //   向消息队列里插消息包,通过handleMessage这个方法,
               //   插入的MessagePack其实已经更具消息的不同被选择成不同的子类
               //   我觉得这是很关键的设计,我们的业务逻辑就可以分成不同的MessagePack子类,然后实现它的onHandler方法 
              messageManager.addSocketMessage(handleMessage(command.message, messageEvent));
           
        }   else   {
             //   logger.warn("too short message."); 
         }
    }
      //  重点方法 
     public   MessagePack handleMessage(String msg, MessageEvent e) {
        MessagePack messagePack  =  null  ;

          int  fid =  SjsonUtil.getFIDFromMsg(msg);
 
          switch   (fid) {
          case  25:  //   调用TestCategoryMsg 
            messagePack =  new   ShowTimeMessage(msg, e.getChannel());
              break  ;
          case  26:  //   调用不同的业务逻辑 
            messagePack =  new   TestCategoryMsg(msg, e.getChannel());
              break  ;
          default  :
             //   logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); 
         }

          return   messagePack;
    }

PushServerPipelineFactory 除了配置好 Handler,还 把MessageManager启动起来了, MessageManager是spring的配置文件中配置的。注意他的 init-method ,就是实例化这个bean的时候会执行它的start方法,这个比较重要,因为 MessageManager 就是处理消息队列的模块,所以他需要在服务器启动时启动线程池去处理消息队列。 MessageManager 提供的方法就是用来维护一个叫 receivedQueen的队列。

   <  bean   id  ="messageManager"   class  ="netty.gate.message.MessageManager"   init-method  ="start"  > 
   </  bean  >  

PushServerPipelineFactory :

 public   class  PushServerPipelineFactory  implements   ChannelPipelineFactory {

      private   DefaultChannelGroup channelGroup;

      private   final   PushServerCommandHandler pushServerCommandHandler;

      private   final  PushServerEncoder pushServerEncoder =  new   PushServerEncoder();

      public   PushServerPipelineFactory(DefaultChannelGroup channelGroup) {
          this .channelGroup =  channelGroup;
        
          this .pushServerCommandHandler =  new  PushServerCommandHandler( this  .channelGroup);
        
        pushServerCommandHandler.setMessageManager((MessageManager) TaskBeanFactory.getBean( "messageManager" ));

    }

      public   final  ChannelPipeline getPipeline()  throws   Exception {
          return  Channels.pipeline( new   PushServerCommandDecoder(), pushServerCommandHandler, pushServerEncoder);
    }

      protected   ChannelPipelineFactory createPushServerPipelineFactory(DefaultChannelGroup allChannels) {
          return   new   PushServerPipelineFactory(allChannels);
    }

} 

很关键的 MessageManager: 维护的是receivedQueen队列

 public   class   MessageManager {

      //   MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的 
     private  LinkedBlockingQueue<MessagePack> receivedQueen =  new  LinkedBlockingQueue<MessagePack>(512 );

      private   ExecutorService pool;

      private   int  reStartThreadCount = 0 ;


      public   void   start() {
          this .pool =  Executors.newCachedThreadPool();
        pool.submit(  new   PushRecvThread());
    }

      private   class  PushRecvThread  implements   Runnable {

          public   void   run() {
              while  ( true  ) {
                MessagePack message  =  waitForProcessMessage();
                  if  (message !=  null  ) {
                      //   利用多态执行继承MessagePack的子类方法 
                     message.onHandler(TaskBeanFactory.getContextInstance());
                }
            }
        }
    }
    
      public   MessagePack waitForProcessMessage() {
        MessagePack message  =  null  ;
          while  (message ==  null  ) {
              try   {
                  //   从队列中取继承MessagePack的实例 
                message = receivedQueen.poll(10 , TimeUnit.SECONDS);
            }   catch   (InterruptedException e) {
                  //   TODO log 
             }
        }
          return   message;
    }
    
      public   void   addSocketMessage(MessagePack message) {
          if  (message !=  null  ) {
              try   {
                  boolean  success = receivedQueen.offer(message, 15 , TimeUnit.SECONDS);
                  if  ( false  ==  success) {
                      //   maybe PushRecvThread is break,restart the thread again 
                     if  (reStartThreadCount < 10 ) {
                        pool.submit(  new   PushRecvThread());
                        reStartThreadCount ++ ;
                    }
                }   else   {
                }
            }   catch   (InterruptedException e) {
                  //   TODO log 
             }
        }
          return  ;
    } 

正真的处理逻辑的代码是写在这些继承 MessagePack 的 抽象类 里的,里面的一个 onHandler 方法是必须实现的,所以使用了 抽象类 。

下面代码的 onHandler 中,就可以写那些调用service层的,处理数据库,发邮件,调用接口啊等各种需求操作了。

 public   class  TestCategoryMsg  extends   MessagePack {

      private   static   final  String MESSAGE_NAME = "TEST_MESSAGE";  //   消息名称 

     public   TestCategoryMsg(String msg, Channel channel) {
          super  (msg, channel);
    }

    @Override
      public   void   onHandler(ApplicationContext ctx) {
        channel.write( "---------------i dont know why--------------" );
    }

      public   String getName() {
          return   MESSAGE_NAME;
    }

} 


到此基本上一个服务器从接受数据,到回应数据的流程已经走完了。

我想上源码,可是看不到附件... 我表示无能为力啦 !

让我们继续前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不会成功。
共勉。

 

 

分类:  Java

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于搭建服务器处理系统(基于netty)的详细内容...

  阅读:33次