搭建服务器处理系统(基于netty)
搭建服务器处理系统(基于netty)
推荐:
google rest 一个不错的http测试应用,google浏览器可以使用。做接口什么的,很有帮助。亲,还不快了解一下。
扯淡:
现在我算是进入一个能带着你向前走的团队,但是产品设计太扯淡,互联网应用,开发周期异常的短,也高估了开发的能力,赶进度的开发bug很多啊。
如果开发只是完成任务的行动,是不会感到痛苦的。所以说:想要做好产品的开发,痛苦才刚刚开始。原因就是开发无法左右产品的设计.....
主题:
时刻关注排行的朋友注意啦,你们都得了排行强迫症啦,赶快找个护士就医吧。
一个排行.....(我需要护士)
关于排名的详细: 摸我
好吧,据说 netty排在第一,那就学习一下吧!
更具公司很久以前的一个服务器框架代码,不断删减,然后得到一个很简单的服务器框架,分享一下。
自己画的流程图,流程比较简单,这方面比较弱,不太会用图表达:
1,启用netty
我们需要监听端口,这样就可以处理连接上来的tcp消息了。这一步netty用java 的NIO和OIO都封装了,我们自然选择NIO啦。
一下是启动服务器的代码:对于这个启动,你只要学习一下netty的手册例子,就马上明白了,它手册的例子也很好,建议大家看看。
public class Start { public static void main(String[] args) { // ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!=============" ); initNetty(); } private static final int tcpSendBufferSize = 32768 ; private static final int tcpReceiveBufferSize = 32768 ; // 初始化端口的监听 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989); // 需要监听的端口,即tcp连接建立的端口 // Executors.newCachedThreadPool()的解释: // 缓冲线程执行器,产生一个大小可变的线程池。 // 当线程池的线程多于执行任务所需要的线程的时候,对空闲线程(即60s没有任务执行)进行回收; // 当执行任务的线程数不足的时候,自动拓展线程数量。因此线程数量是JVM可创建线程的最大数目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); // It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup" ); ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作为一个ChannelPipelineFactory产生的工厂类,我们可以把需要执行的Handler进行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 服务器新连接建立的时候,新的ChannelPipeline会通过我们定义的ChannelPipelineFactory产生,其实是调用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1 ) { bootstrap.setOption( "child.receiveBufferSize" , tcpReceiveBufferSize); } if (tcpSendBufferSize != -1 ) { bootstrap.setOption( "child.sendBufferSize" , tcpSendBufferSize); } bootstrap.setOption( "reuseAddress", true ); bootstrap.setOption( "child.reuseAddress", true ); bootstrap.setOption( "child.keepAlive", false ); bootstrap.setOption( "child.tcpNoDelay", true ); System.out.println( " ===================netty started=====================" ); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }
PushServerPipelineFactory 其实就是配置了一下 Handler ,他叫 pushServerCommandHandler ,他的作用就是把接受到的信息放进 叫 receivedQueen的队列 去就好了,其实就是调用了 MessageManager 的 addSocketMessage 方法。
我们看一下他的 messageReceived 方法就明白了,netty是事件机制的, messageReceived 是重写的方法,只要是受到一个连接的消息,就会触发这个方法。
public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3 ) { Channel ch = channelHandlerContext.getChannel(); ch.write( "---------message received-------------" ); // 向消息队列里插消息包,通过handleMessage这个方法, // 插入的MessagePack其实已经更具消息的不同被选择成不同的子类 // 我觉得这是很关键的设计,我们的业务逻辑就可以分成不同的MessagePack子类,然后实现它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } // 重点方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null ; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 调用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break ; case 26: // 调用不同的业务逻辑 messagePack = new TestCategoryMsg(msg, e.getChannel()); break ; default : // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }
PushServerPipelineFactory 除了配置好 Handler,还 把MessageManager启动起来了, MessageManager是spring的配置文件中配置的。注意他的 init-method ,就是实例化这个bean的时候会执行它的start方法,这个比较重要,因为 MessageManager 就是处理消息队列的模块,所以他需要在服务器启动时启动线程池去处理消息队列。 MessageManager 提供的方法就是用来维护一个叫 receivedQueen的队列。
< bean id ="messageManager" class ="netty.gate.message.MessageManager" init-method ="start" > </ bean >
PushServerPipelineFactory :
public class PushServerPipelineFactory implements ChannelPipelineFactory { private DefaultChannelGroup channelGroup; private final PushServerCommandHandler pushServerCommandHandler; private final PushServerEncoder pushServerEncoder = new PushServerEncoder(); public PushServerPipelineFactory(DefaultChannelGroup channelGroup) { this .channelGroup = channelGroup; this .pushServerCommandHandler = new PushServerCommandHandler( this .channelGroup); pushServerCommandHandler.setMessageManager((MessageManager) TaskBeanFactory.getBean( "messageManager" )); } public final ChannelPipeline getPipeline() throws Exception { return Channels.pipeline( new PushServerCommandDecoder(), pushServerCommandHandler, pushServerEncoder); } protected ChannelPipelineFactory createPushServerPipelineFactory(DefaultChannelGroup allChannels) { return new PushServerPipelineFactory(allChannels); } }
很关键的 MessageManager: 维护的是receivedQueen队列
public class MessageManager { // MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的 private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>(512 ); private ExecutorService pool; private int reStartThreadCount = 0 ; public void start() { this .pool = Executors.newCachedThreadPool(); pool.submit( new PushRecvThread()); } private class PushRecvThread implements Runnable { public void run() { while ( true ) { MessagePack message = waitForProcessMessage(); if (message != null ) { // 利用多态执行继承MessagePack的子类方法 message.onHandler(TaskBeanFactory.getContextInstance()); } } } } public MessagePack waitForProcessMessage() { MessagePack message = null ; while (message == null ) { try { // 从队列中取继承MessagePack的实例 message = receivedQueen.poll(10 , TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO log } } return message; } public void addSocketMessage(MessagePack message) { if (message != null ) { try { boolean success = receivedQueen.offer(message, 15 , TimeUnit.SECONDS); if ( false == success) { // maybe PushRecvThread is break,restart the thread again if (reStartThreadCount < 10 ) { pool.submit( new PushRecvThread()); reStartThreadCount ++ ; } } else { } } catch (InterruptedException e) { // TODO log } } return ; }
正真的处理逻辑的代码是写在这些继承 MessagePack 的 抽象类 里的,里面的一个 onHandler 方法是必须实现的,所以使用了 抽象类 。
下面代码的 onHandler 中,就可以写那些调用service层的,处理数据库,发邮件,调用接口啊等各种需求操作了。
public class TestCategoryMsg extends MessagePack { private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名称 public TestCategoryMsg(String msg, Channel channel) { super (msg, channel); } @Override public void onHandler(ApplicationContext ctx) { channel.write( "---------------i dont know why--------------" ); } public String getName() { return MESSAGE_NAME; } }
到此基本上一个服务器从接受数据,到回应数据的流程已经走完了。
我想上源码,可是看不到附件... 我表示无能为力啦 !
让我们继续前行
----------------------------------------------------------------------
努力不一定成功,但不努力肯定不会成功。
共勉。
分类: Java
作者: Leo_wl
出处: http://www.cnblogs.com/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息查看更多关于搭建服务器处理系统(基于netty)的详细内容...