好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

epoll + 多线程实现并发网络连接处理

epoll + 多线程实现并发网络连接处理

epoll + 多线程实现并发网络连接处理

简介

触发方式

  条件触发

  边沿触发

主要的应用接口

1 Epoll 的创建

  根据man手册介绍, epoll_create(int size) 用来创建一个epoll实例,向内核申请支持size个句柄的资源(存储)。Size的大小不代表epoll支持的最大句柄个数,而隐射了内核扩展句柄存储的尺寸,也就是说当后面需要再向epoll中添加句柄遇到存储不够的时候,内核会按照size追加分配。在2.6以后的内核中,该值失去了意义,但必须大于0。

epoll_create执行成功,返回一个非负的epoll描述句柄,用来指定该资源,否则返回-1。

例子:

         int epoll_fd = epoll_create(1);

2 Epoll 的控制

         Epoll的控制主要通过epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)完成。控制对象是用户申请的句柄,即fd;Epfd指定所控制的epoll资源;op指对fd的动作,包括向epoll中添加一个句柄EPOLL_CTL_ADD,删除一个句柄EPOLL_CTL_DEL,修改epoll对一个存在句柄的监控模式EPOLL_CTL_MOD;event指出需要让epoll对fd的监控模式(收、发、触发方式等)。epoll_ctl执行成功返回0, 否则返回-1。在介绍该接口之前,我们先看看内核对epoll的事件类型的定义

?

typedef   union   epoll_data {

 

           void      *ptr;

           int        fd;

           uint32_t u32;

           uint64_t u64;

 

} epoll_data_t;

 

struct   epoll_event {

 

          uint32_t   events;    /* Epoll events */

          epoll_data_t data;      /* User data variable */

 

};

该结构中我们主要看epoll_event。epoll_event->data涵盖了调用epoll_ctl增加或者修改某指定句柄时写入的信息,epoll_event->event,则包含了返回事件的位域。

例子:

2.1 向epoll 中增加句柄

2.1.1 增加新的常规句柄:

?

struct   epoll_event ev;

 

if (边沿触发)

          ev.events = EPOLIN | EPOLLOUT |EPOLLLET

else

 

条件触发(默认)

 

          ev.events = EPOLIN | EPOLLOUT

          ev.data = newfd;

 

          epoll_ctl(epoll_fd, EPOLL_CTL_ADD, newfd, &ev);

 2.1.2 增加网络监听句柄

?

Struct epoll_event ev;

 

if (边沿触发)

          ev.events = EPOLIN | EPOLLLET     (监听句柄只关心输入)

else

 

条件触发(默认)

 

          ev.events = EPOLIN;

          ev.data = newfd;

 

          epoll_ctl(epoll_fd, EPOLL_CTL_ADD, newfd, &ev);

 2.2   修改某个句柄的模式

?

struct   epoll_event newev;

 

newev.events = NEWMOD;(新的触发方式可通过该接口修改)

newev.data = oldfd;

 

epoll_ctl(epoll_fd, EPOLL_CTL_MOD, oldfd, &newev);

 2.3   删除某个句柄

?

struct   epoll_event newev;

 

epoll_ctl(epoll_fd,EPOLL_CTL_DEL,oldfd,& newev);

 

注:在2.6.9以前的内核中,当执行EPOLL_CTL_DEL时,event须非空,但在之后,event可空,一般为了兼容以前的内核版本,我们最好将event非空。

3 Epoll 的监控

         当向epoll中添加若干句柄后,就要进入监控状态,此时通过系统调用epoll_wait(int epfd, struct epoll_event *events,  int maxevents,  int timeout)完成。epoll_wait在执行的时候,在timeout内,将有动作的句柄的信息填充到event,event和maxevents决定了epoll监控句柄的上限。timeout的单位是微妙级别,当为-1时,除非内部句柄有动作,否则持续等待。epoll_wait执行成功返回有动作的句柄的总数,句柄信息在events中包含;如果在超时timeout内返回零,表示没有io请求的句柄;否则返回-1。

例程

下面是一个结合网上我做了修整的例子贴出来,简单总结一下 epoll 的用处。该例子是一个网络环回测试例程,服务器的地址默认,请求连接的端口号是 11111 。

?

#include "xs_epoll.h"   /* epoll releated include file */

/* create an epoll 实例 */

int   xs_epoll_create( int   __c)

{

     XSOCKET __efd__ = -1;

 

     __efd__ = epoll_create(__c);

     if (__efd__ < 0){

         xs_dump_error_str( errno );

         return   EXIT_ERROR;

     }

          

     return   (__efd__);

}

int   xs_epoll_ctl(XSOCKET __efd, int   __method, XSOCKET __fd, struct   epoll_event* __p)

{

     int   __result__ = -1;

 

     __result__ = (epoll_ctl(__efd, __method, __fd, __p) < 0);

     if (__result__ < 0){

         xs_dump_error_str( errno );

         return   EXIT_ERROR;

     }

      

     return   EXIT_OK;

}

int   xs_epoll_wait(XSOCKET __efd, struct   epoll_event* __ev, int   __c, int   __tw)

{

     int   __num__ = -1;

 

     __num__ = epoll_wait(__efd, __ev, __c, __tw);

     if (__num__ < 0)

         __num__ = EXIT_ERROR;

 

     return   (__num__);

}

int   xs_epoll_close(XSOCKET __f)

{

     return   xs_release_socket(__f);

}

 

/*

     Add fd to epollfd

     Register the target file descriptor fd on the epoll instance referred to by the file descriptor epfd

     and associate the event with the internal file linked to fd.

*/

int   xs_epoll_add( int   __epollfd, int   __fd, int   __mod)

{

     xs_epoll_event __xs_ev__;

      

     __xs_ev__.events = __mod;

     __xs_ev__.data.fd = __fd;

 

     if (xs_epoll_ctl(__epollfd, EPOLL_CTL_ADD, __fd, &__xs_ev__) < 0)

             return   EXIT_ERROR; 

 

     return   EXIT_OK;

}

 

/* 

     Remove  (deregister) the target file descriptor fd from the epoll instance referred to by epollfd. 

     The event is ignored and can be NULL.

     In kernel versions before 2.6.9, the EPOLL_CTL_DEL operation required a non-NULL pointer in event

     Since Linux 2.6.9, event can be specified as NULL when using EPOLL_CTL_DEL.

*/

int   xs_epoll_del( int   __epollfd, int   __fd)

{

     xs_epoll_event __xs_ev__;

 

     /* We should better set event not NULL */

     __xs_ev__.events = __xs_ev__.events;

     __xs_ev__.data.fd = __fd;

 

     if (xs_epoll_ctl(__epollfd, EPOLL_CTL_DEL, __fd, &__xs_ev__) < 0)

             return   EXIT_ERROR; 

 

     return   EXIT_OK;

}

 

/* Change the event associated with the target file descriptor __fd. */

int   xs_epoll_mode( int   __epollfd, int   __fd, int   __newmod)

{

     xs_epoll_event __xs_ev__;

 

     __xs_ev__.events = __newmod;

     __xs_ev__.data.fd = __fd;

 

     if (xs_epoll_ctl(__epollfd, EPOLL_CTL_MOD, __fd, &__xs_ev__) < 0)

             return   EXIT_ERROR; 

 

     return   EXIT_OK;

}

 

/*

     xs_epoll_init: creates  an epoll "instance", requesting the kernel to allocate an

     event backing store dimensioned for size descriptors.

     Since Linux 2.6.8, the size argument is unused, but must be greater than zero.

     (The kernel  dynamically  sizes  the  required  data

        structures without needing this initial hint.)

*/ <br> int   xs_epoll_init( int   size)

{

     int   epollfd = -1;

      

     if (size == 0)

         epollfd = xs_epoll_create(EPOLL_DEFAULT_SIZE);

     else   epollfd = xs_epoll_create(size);

      

 

     if (epollfd < 0) return   EXIT_ERROR;

 

     xs_logd( "epoll create success -> fd : %d" , epollfd);

 

     return   epollfd;

}

 

 

//#define __VECTOR__

#define xs_defult_thread_size   10

Typedef pthread_cond_t  xs_pthread_cond_t;

Typedef pthread_mutex_t xs_pthread_mutex_t;

Typedef int           XSOCKET;

#define EPOLL_DEFAULT_SIZE  10

typedef   struct   block_queue 

#ifdef __VECTOR__

     vector queue;

#else

     int   queue[xs_defult_thread_size];

     long   size;   

#endif

     xs_pthread_cond_t cond; 

     xs_pthread_mutex_t mutex; 

}block_queue_t; 

block_queue_t *bq; 

 

typedef   struct   block_queue_param 

     void * func; 

     void * queue;    /* Point to block queue structure */

}block_queue_param_t; 

block_queue_param_t bqp;

 

int   g_xs_thread_count = xs_defult_thread_size;

#define BUFFER_SIZE         1024

#include <sys/resource.h>

int   g_epoll_fd = -1;

xs_epoll_event xs_ev, xs_events[EPOLL_DEFAULT_SIZE];

int   g_epoll_size = EPOLL_DEFAULT_SIZE;

int   g_serv_fd = -1;

 

static   int   xs_queue_init( block_queue_t *__q)

{

     if (__q == NULL) return   (-1);

      

#ifdef __VECTOR__

     __q->queue = vector_init(xs_defult_thread_size);

#else

     __q->size = 0;

#endif

 

     xs_pthread_cond_init(&(__q->cond), NULL); 

     xs_pthread_mutex_init(&(__q->mutex), NULL); 

 

     return   0;

}

 

block_queue_t *xs_epoll_queue_create( void   )

{

     block_queue_t *__q;

 

     __q = xs_malloc( sizeof (block_queue_t));

 

     assert (__q);

 

     return   ((xs_queue_init(__q) == 0) ? __q : NULL);

}

static   inline   void   xs_network_epoll_loop( void * data)

{

      int   socket;

 

       socket = *( int   *)data;

 

     xs_logd( "%d !\n" , socket);

     char   buffer[BUFFER_SIZE];

     xs_pthread_t id = pthread_self();

     xs_logd( "thread id is: %ld" , id);

 

     /* We only send what recevied just now */

     int     length = xs_net_recv(socket, buffer, BUFFER_SIZE);

     if (length){

         xs_net_send(socket, buffer, strlen (buffer));

         memset (buffer, 0, BUFFER_SIZE);

     }

}

 

void   *xs_handle_queue( void   *param) 

     void (* func)( void * ); 

     int   fd;

 

     block_queue_t* bque = ((block_queue_param_t*)param)->queue; 

     func = ((block_queue_param_t*)param)->func;

 

     xs_pthread_cond_init(&bque->cond,  NULL);

     xs_pthread_mutex_init(&bque->mutex,  NULL);

      

     for (;;) 

     { 

         if (xs_pthread_mutex_lock(&bque->mutex) == EXIT_OK) {

              

             xs_pthread_cond_wait(&bque->cond, &bque->mutex); 

#ifdef __VECTOR__

             if (bque->queue->active == 0){

                 xs_pthread_mutex_unlock(&bque->mutex);

                 continue ;

             } else   {

                 fd = *( int   *)(vector_lookup(bque->queue, 0));

             }

#else

             if (bque->size==0)  { 

                 xs_pthread_mutex_unlock(&bque->mutex);

                 continue ;

             } else   {    

                  

                 int   i;

                 fd = bque->queue[0]; 

 

                 for (i = 0; i < bque->size - 1; ++i) 

                     bque->queue[i] = bque->queue[i + 1]; 

                  

                 bque->queue[bque->size-1] = 0; 

                 bque->size--; 

             } 

#endif

                 xs_pthread_mutex_unlock(&bque->mutex); 

         }

          

         func(( void   *)&fd); 

     } 

 

int   xs_init_threads( void   ) 

     int   i = 0, ret; 

     xs_pthread_t child_thread[g_xs_thread_count]; 

     xs_pthread_attr_t child_thread_attr[g_xs_thread_count]; 

 

     bqp.func = ( void *)xs_network_epoll_loop;

     bqp.queue = ( void   *)bq;

 

     for ( i = 0; i < g_xs_thread_count; ++i)  { 

         ret = xs_pthread_attr_init(&child_thread_attr[i]); 

         if (ret != 0) xs_logd( "error to init attr !\n" );

         pthread_attr_setdetachstate(&child_thread_attr[i], PTHREAD_CREATE_DETACHED); 

         if ( pthread_create(&child_thread[i],

             &child_thread_attr[i], xs_handle_queue, ( void   *)&bqp) < 0 ) { 

                 xs_logd( "pthread_create Failed : %s - %m\n" , strerror ( errno )); 

                 return   1; 

         } 

         } 

     return   0;

int   xs_init_server( const   char   *name, short   int   port)

{

     struct   rlimit rt; 

     int   server_socket = -1;

      

     server_socket = xs_create_server(name, port);

 

     rt.rlim_max = rt.rlim_cur = g_epoll_size; 

     if   (setrlimit(RLIMIT_NOFILE, &rt) == -1) { 

         xs_logd( "setrlimit - %m" ); 

         exit (1); 

     } 

 

     return   server_socket; 

}

 

static   void   xs_insert_queue(block_queue_t *bque, int   *fd) 

     xs_pthread_mutex_lock(&bque->mutex); 

      

#ifdef __VECTOR__

     vector_set(bque->queue, fd);

#else

     if (bque->size == g_xs_thread_count) 

         return ; 

 

     bque->queue[bque->size] = *fd; 

     bque->size++;

     if (bque->size > g_xs_thread_count)  { 

         fprintf (stderr, "Queue size over folow.%ld" , bque->size); 

         exit   (1); 

     } 

#endif

 

     xs_pthread_cond_signal(&bque->cond); 

     xs_pthread_mutex_unlock(&bque->mutex); 

 

}

 

static   inline   void   xs_handler( void * fd) 

     printf ( "handler:fd => %d\n" , *( int   *)(fd)); 

     xs_insert_queue(bq, fd);

 

int   xs_epoll_entry()

{

     int   nfds, n;

      

     g_serv_fd = xs_init_server(NULL, 11111);

     xs_logd( "server thread [FD:%d] is ready for ..." , g_serv_fd);

 

      

     bq = xs_epoll_queue_create();

     assert (bq);

      if (xs_init_threads() == 0) 

             xs_logd( "Threads ready to use !" ); 

 

     g_epoll_fd = xs_epoll_init(g_epoll_size);

     xs_epoll_add(g_epoll_fd, g_serv_fd, EPOLLIN | EPOLLET);

 

      for (;;) { 

             struct   sockaddr_in local; 

             socklen_t length = sizeof (local); 

             int   client = -1; 

                

             nfds = xs_epoll_wait(g_epoll_fd, xs_events,

EPOLL_DEFAULT_SIZE, epoll_wait_indefinite); 

 

             for (n = 0; n < nfds; ++n) { 

 

             if (xs_events[n].data.fd == g_serv_fd) { 

                 client = xs_net_accept(g_serv_fd, ( struct   sockaddr *)&local, &length); 

                 if (client < 0) { 

                     xs_logd( "%s" , strerror ( errno )); 

                     continue ; 

                 }  else   {

                     xs_logd( "add socket pool : %d" , client);

                     set_nonblocking(client); 

                     xs_epoll_add(g_epoll_fd, client, EPOLLIN | EPOLLOUT | EPOLLET);

                     client = -1;

                 }

             } 

             else    /* It's a client fd that needed to process */

                 xs_handler(( void   *)&xs_events[n].data.fd);

             } 

         } 

 

     xs_pthread_mutex_destroy(&bq->mutex);

     xs_pthread_cond_destroy(&bq->cond);

     xs_close_socket(g_serv_fd);

     xs_epoll_close(g_epoll_fd);

#ifdef __VECTOR__

     vector_free(bq->queue);

#endif

 

     xs_free(bq);

     return   0;

}

 

int   main( int   argc, char   *argv[])

{

     argc = argc;

     argv = argv;

 

     xs_epoll_entry();

 

     return   0;

}

【本博客 http://www.cnblogs.com/iTsihang 中原创之博文,版权属作者所有,欢迎转载。转载之时请保留本段内容,否则作者将保留追究版权之权利】

 

分类:  C语言编程 ,  嵌入式 ,  网络

标签:  epoll ,  句柄 ,  套接字 ,  网络连接

作者: Leo_wl

    

出处: http://www.cnblogs.com/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于epoll + 多线程实现并发网络连接处理的详细内容...

  阅读:51次