好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

PHP memcache实现消息队列实例 - php高级应用

PHP memcache实现消息队列实例

memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志,然后通过定时程序将内容落地到文件或者数据库.

php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列.

方便实现队列的轻量级队列服务器是:

starling支持memcache协议的轻量级持久化服务器

https://github测试数据/starling/starling

Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列

http://kr.github测试数据/beanstalkd/

php中也可以使用memcache/memcached来实现消息队列,代码如下:

<?php  /**   * Memcache 消息队列类   */     class  QMC {  const  PREFIX =  'ASDFASDFFWQKE' ;    /**   * 初始化mc   * @staticvar string $mc   * @return Memcache   */   static   private   function  mc_init() {  static   $mc  = null;  if  ( is_null ( $mc )) {  $mc  =  new  Memcache;  $mc ->connect( '127.0.0.1' , 11211);  }  return   $mc ;  }  /**   * mc 计数器,增加计数并返回新的计数   * @param string $key   计数器   * @param int $offset   计数增量,可为负数.0为不改变计数   * @param int $time     时间   * @return int/false    失败是返回false,成功时返回更新计数器后的计数   */   static   public   function  set_counter(  $key ,  $offset ,  $time =0 ){  $mc  = self::mc_init();  $val  =  $mc ->get( $key );  if ( ! is_numeric ( $val ) ||  $val  < 0 ){  $ret  =  $mc ->set(  $key , 0,  $time  );  if ( ! $ret  )  return  false;  $val  = 0;  }  $offset  =  intval (  $offset  );  if (  $offset  > 0 ){  return   $mc ->increment(  $key ,  $offset  );  } elseif (  $offset  < 0 ){  return   $mc ->decrement(  $key , - $offset  );  }  return   $val ;  }    /**   * 写入队列   * @param string $key   * @param mixed $value   * @return bool   */   static   public   function  input(  $key ,  $value  ){  $mc  = self::mc_init();  $w_key  = self::PREFIX. $key . 'W' ;  $v_key  = self::PREFIX. $key .self::set_counter( $w_key , 1);  return   $mc ->set(  $v_key ,  $value  );  }  /**   * 读取队列里的数据   * @param string $key   * @param int $max  最多读取条数   * @return array   */   static   public   function  output(  $key ,  $max =100 ){  $out  =  array ();  $mc  = self::mc_init();  $r_key  = self::PREFIX. $key . 'R' ;  $w_key  = self::PREFIX. $key . 'W' ;  $r_p    = self::set_counter(  $r_key , 0 ); //读指针   $w_p    = self::set_counter(  $w_key , 0 ); //写指针   if (  $r_p  == 0 )  $r_p  = 1;  while (  $w_p  >=  $r_p  ){  if ( -- $max  < 0 )  break ;  $v_key  = self::PREFIX. $key . $r_p ;  $r_p  = self::set_counter(  $r_key , 1 );  $out [] =  $mc ->get(  $v_key  );  $mc -> delete ( $v_key );  } //开源代码phpfensi测试数据   return   $out ;  }  }  /**   使用方法:   QMC::input($key, $value );//写入队列   $list = QMC::output($key);//读取队列   */   ?> 

基于PHP共享内存实现的消息队列,代码如下:

<?php  /**   * 使用共享内存的PHP循环内存队列实现   * 支持多进程, 支持各种数据类型的存储   * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区   *   * @author wangbinandi@gmail测试数据   * @created 2009-12-23   */   class  ShmQueue  {  private   $maxQSize  = 0;  // 队列最大长度     private   $front  = 0;  // 队头指针   private   $rear  = 0;   // 队尾指针     private   $blockSize  = 256;   // 块的大小(byte)   private   $memSize  = 25600;   // 最大共享内存(byte)   private   $shmId  = 0;    private   $filePtr  =  './shmq.ptr' ;    private   $semId  = 0;  public   function  __construct()  {  $shmkey  =  ftok ( __FILE__ ,  't' );    $this ->shmId = shmop_open( $shmkey ,  "c" , 0644,  $this ->memSize );  $this ->maxQSize =  $this ->memSize /  $this ->blockSize;    //一个信号量   $this ->semId = sem_get( $shmkey , 1);  sem_acquire( $this ->semId);  // 申请进入临界区     $this ->init();  }    private   function  init()  {  if  (  file_exists ( $this ->filePtr) ){  $contents  =  file_get_contents ( $this ->filePtr);  $data  =  explode (  '|' ,  $contents  );  if  ( isset( $data [0]) && isset( $data [1])){  $this ->front = (int) $data [0];  $this ->rear  = (int) $data [1];  }  }  }    public   function  getLength()  {  return  (( $this ->rear -  $this ->front +  $this ->memSize) % ( $this ->memSize) )/ $this ->blockSize;  }    public   function  enQueue(  $value  )  {  if  (  $this ->ptrInc( $this ->rear) ==  $this ->front ){  // 队满   return  false;  }    $data  =  $this ->encode( $value );  shmop_write( $this ->shmId,  $data ,  $this ->rear );  $this ->rear =  $this ->ptrInc( $this ->rear);  return  true;  }    public   function  deQueue()  {  if  (  $this ->front ==  $this ->rear ){  // 队空   return  false;  }  $value  = shmop_read( $this ->shmId,  $this ->front,  $this ->blockSize-1);  $this ->front =  $this ->ptrInc( $this ->front);  return   $this ->decode( $value );  }    private   function  ptrInc(  $ptr  )  {  return  ( $ptr  +  $this ->blockSize) % ( $this ->memSize);  }    private   function  encode(  $value  )  {  $data  = serialize( $value ) .  "__eof" ;  echo   '' ;    echo   strlen ( $data );  echo   '' ;    echo   $this ->blockSize -1;  echo   '' ;    if  (  strlen ( $data ) >  $this ->blockSize -1 ){  throw   new  Exception( strlen ( $data ). " is overload block size!" );  }  return   $data ;  }    private   function  decode(  $value  )  {  $data  =  explode ( "__eof" ,  $value );  return  unserialize( $data [0]);  }    public   function  __destruct()  {  $data  =  $this ->front .  '|'  .  $this ->rear;  file_put_contents ( $this ->filePtr,  $data );    sem_release( $this ->semId);  // 出临界区, 释放信号量   }  }    /*   // 进队操作   $shmq = new ShmQueue();   $data = 'test data';   $shmq->enQueue($data);   unset($shmq);   // 出队操作   $shmq = new ShmQueue();   $data = $shmq->deQueue();   unset($shmq);   */   ?> 

对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。 

如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作,下面是代码:

class  Memcache_Queue   {   private   $memcache ;   private   $name ;   private   $prefix ;   function  __construct( $maxSize ,  $name ,  $memcache ,  $prefix  =  "__memcache_queue__" )   {   if  ( $memcache  == null) {   throw   new  Exception( "memcache object is null, new the object first." );   }   $this ->memcache =  $memcache ;   $this ->name =  $name ;   $this ->prefix =  $prefix ;   $this ->maxSize =  $maxSize ;   $this ->front = 0;   $this ->real = 0;   $this ->size = 0;   }   function  __get( $name )   {   return   $this ->get( $name );   }   function  __set( $name ,  $value )   {   $this ->add( $name ,  $value );   return   $this ;   }   function  isEmpty()   {   return   $this ->size == 0;   }   function  isFull()   {   return   $this ->size ==  $this ->maxSize;   }   function  enQueue( $data )   {   if  ( $this ->isFull()) {   throw   new  Exception( "Queue is Full" );   }   $this ->increment( "size" );   $this ->set( $this ->real,  $data );   $this ->set( "real" , ( $this ->real + 1) %  $this ->maxSize);   return   $this ;   }   function  deQueue()   {   if  ( $this ->isEmpty()) {   throw   new  Exception( "Queue is Empty" );   }   $this ->decrement( "size" );   $this -> delete ( $this ->front);   $this ->set( "front" , ( $this ->front + 1) %  $this ->maxSize);   return   $this ;   }   function  getTop()   {   return   $this ->get( $this ->front);   }   function  getAll()   {   return   $this ->getPage();   }   function  getPage( $offset  = 0,  $limit  = 0)   {   if  ( $this ->isEmpty() ||  $this ->size <  $offset ) {   return  null;   }   $keys [] =  $this ->getKeyByPos(( $this ->front +  $offset ) %  $this ->maxSize);   $num  = 1;   for  ( $pos  = ( $this ->front +  $offset  + 1) %  $this ->maxSize;  $pos  !=  $this ->real;  $pos  = ( $pos  + 1) %  $this ->maxSize)   {   $keys [] =  $this ->getKeyByPos( $pos );   $num ++;   if  ( $limit  > 0 &&  $limit  ==  $num ) {   break ;   }   }   return   array_values ( $this ->memcache->get( $keys ));   }   function  makeEmpty()   {   $keys  =  $this ->getAllKeys();   foreach  ( $keys   as   $value ) {   $this -> delete ( $value );   }   $this -> delete ( "real" );   $this -> delete ( "front" );   $this -> delete ( "size" );   $this -> delete ( "maxSize" );   }   private   function  getAllKeys()   {   if  ( $this ->isEmpty())   {   return   array ();   }   $keys [] =  $this ->getKeyByPos( $this ->front);   for  ( $pos  = ( $this ->front + 1) %  $this ->maxSize;  $pos  !=  $this ->real;  $pos  = ( $pos  + 1) %  $this ->maxSize)   {   $keys [] =  $this ->getKeyByPos( $pos );   }   return   $keys ;   }   private   function  add( $pos ,  $data )   {   $this ->memcache->add( $this ->getKeyByPos( $pos ),  $data );   return   $this ;   }   private   function  increment( $pos )   {   return   $this ->memcache->increment( $this ->getKeyByPos( $pos ));   }   private   function  decrement( $pos )   {   $this ->memcache->decrement( $this ->getKeyByPos( $pos ));   }   private   function  set( $pos ,  $data )   {   $this ->memcache->set( $this ->getKeyByPos( $pos ),  $data );   return   $this ;   }   private   function  get( $pos )   {   return   $this ->memcache->get( $this ->getKeyByPos( $pos ));   }   private   function   delete ( $pos )   {   return   $this ->memcache-> delete ( $this ->getKeyByPos( $pos ));   }   private   function  getKeyByPos( $pos )   {   return   $this ->prefix .  $this ->name .  $pos ;   }   } 

查看更多关于PHP memcache实现消息队列实例 - php高级应用的详细内容...

  阅读:44次