好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Rocksdb iterator和snapshot 接口

rocksdb : : Iterator * it = db -> NewIterator ( rocksdb : : ReadOptions ( ) ) ; for ( it -> SeekToFirst ( ) ; it -> Valid ( ) ; it -> Next ( ) ) { cout << it -> key ( ) . ToString ( ) << ": " << it -> value ( ) . ToString ( ) << endl ; } assert ( it -> status ( ) . ok ( ) ) ; // Check for any errors found during the scan delete it ; 输出一个范围内的key-value,[small, big)
  for   ( it ->  Seek  ( small )  ; 
   it ->  Valid  (  )   &&  it ->  key  (  )  .  ToString  (  )   <  big ; 
   it ->  Next  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 
反向遍历db中的元素
  for   ( it ->  SeekToLast  (  )  ;  it ->  Valid  (  )  ;  it ->  Prev  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 
反向遍历一个指定范围的key,如(small, big]
  for   ( it ->  SeekForPrev  ( start )  ; 
   it ->  Valid  (  )   &&  it ->  key  (  )  .  ToString  (  )   >  limit ; 
   it ->  Prev  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 

迭代器的接口可以算是 rocksdb针对客户端的核心接口,主要是提供排序以及高效查找的功能。

测试代码如下:

  # include   <iostream>  
 # include   <string>  
 # include   <rocksdb/db.h>  
 # include   <rocksdb/iterator.h>  
 # include   <rocksdb/table.h>  
 # include   <rocksdb/options.h>  
 # include   <rocksdb/env.h>  

using namespace std ; 


 static  string  rand_key  (  unsigned   long   long  key_range )   { 
     char  buff [  30  ]  ; 
     unsigned   long   long  n  =   1  ; 

     for   (  int  i  =  1  ;  i  <=   4  ;   ++ i )   { 
        n  *  =   (  unsigned   long   long   )   rand  (  )  ; 
     } 

     sprintf  ( buff ,   "%llu"  ,  n  %  key_range )  ; 

    string  k  ( buff )  ; 
     return  k ; 
 } 

 int   main  (  )   { 
    rocksdb :  : DB  * db ; 
    rocksdb :  : Options option ; 

    option . create_if_missing  =  true ; 
    option . compression  =  rocksdb :  : CompressionType :  : kNoCompression ; 

    rocksdb :  : Status s  =  rocksdb :  : DB :  :  Open  ( option ,   "./iterator_db"  ,   & db )  ; 
     if   (  ! s .  ok  (  )  )   { 
        cout  <<   "Open failed with "   <<  s .  ToString  (  )   <<  endl ; 
         exit  (  1  )  ; 
     } 

    rocksdb :  :  DestroyDB  (  "./iterator_db"  ,  option )  ; 

	cout  <<   "seek all keys : "   <<  endl ; 
     for  (  int  i  =   0  ;  i  <   5  ;  i  ++  )   { 
        rocksdb :  : Status s  =  db ->  Put  ( rocksdb :  :  WriteOptions  (  )  ,  
                                 rand_key  (  9  )  ,   string  (  10  ,   ‘a‘   +   ( i  %   26  )  )   )  ; 

         if   (  ! s .  ok  (  )  )   { 
            cout  <<   "Put failed with "   <<  s .  ToString  (  )   <<  endl ; 
             exit  (  1  )  ; 
         } 
     }    
    
     /* traverse rocksdb key-value */ 
    rocksdb :  : Iterator  * it  =  db ->  NewIterator  ( rocksdb :  :  ReadOptions  (  )  )  ; 
     for   ( it ->  SeekToFirst  (  )  ;  it ->  Valid  (  )  ;  it ->  Next  (  )  )   { 
        cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
     } 

    string limit =  "4"  ; 
    string start =  "2"  ; 
    cout  <<   "seek from ‘2‘ to ‘4‘ : "   <<  endl ; 
     for  ( it ->  Seek  ( start )  ;  it ->  Valid  (  )  && it ->  key  (  )  .  ToString  (  )   <  limit ; 
        it ->  Next  (  )  )   { 
            cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
         }  
     assert  ( it ->  status  (  )  .  ok  (  )  )  ; 

    cout  <<   "seek from last to start :"   <<  endl ; 
     for   ( it ->  SeekToLast  (  )  ;  it ->  Valid  (  )  ;  it ->  Prev  (  )  )   { 
        cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
     } 
     assert  ( it ->  status  (  )  .  ok  (  )  )  ; 

    cout  <<   "seek from ‘4‘ to ‘2‘ :"   <<  endl ; 
     for  ( it ->  SeekForPrev  ( limit )  ;  it ->  Valid  (  )  && it ->  key  (  )  .  ToString  (  )   >  start ; 
        it ->  Prev  (  )  )   { 
            cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
         }  
     assert  ( it ->  status  (  )  .  ok  (  )  )  ; 
        delete it ; 

    db ->  Close  (  )  ; 
    delete db ; 

     return   0  ; 
 } 
 

输出如下:

 seek all keys  :  
3: cccccccccc
4: dddddddddd
7: bbbbbbbbbb
8: eeeeeeeeee
seek from  ‘2‘  to  ‘4‘   :  
3: cccccccccc
seek from last to start  : 
8: eeeeeeeeee
7: bbbbbbbbbb
4: dddddddddd
3: cccccccccc
seek from  ‘4‘  to  ‘2‘   : 
4: dddddddddd
3: cccccccccc
 

且上层使用rocksdb迭代器接口时一般会和 snapshot 接口一同使用,用来实现MVCC的版本控制功能。
关于snapshot的实现,我们在Rocksdb事务:隔离性的实现中有提到,感兴趣的可以看看。

关于snapshot的客户端接口主要有:

sp1 = db->GetSnapshot(); 在当前db状态下创建一个snapshot,添加到内部维护的一个全局的snapshotImpl的双向链表中,并返回该snapshot的对象 read_option.snapshot = sp1; 将获取到的snapshot 传给read_option,进行Get操作 db->ReleaseSnapshot(sp1); 释放snapshot相关的资源(从双向链表中删除该节点)

隔离性的测试代码如下:

  # include   <iostream>  
 # include   <string>  
 # include   <rocksdb/db.h>  
 # include   <rocksdb/iterator.h>  
 # include   <rocksdb/table.h>  
 # include   <rocksdb/options.h>  
 # include   <rocksdb/env.h>  

using namespace std ; 

 int   main  (  )   { 
    rocksdb :  : DB  * db ; 
    rocksdb :  : Options option ; 

    option . create_if_missing  =  true ; 
    option . compression  =  rocksdb :  : CompressionType :  : kNoCompression ; 

    rocksdb :  : Status s  =  rocksdb :  : DB :  :  Open  ( option ,   "./iterator_db"  ,   & db )  ; 
     if   (  ! s .  ok  (  )  )   { 
        cout  <<   "Open failed with "   <<  s .  ToString  (  )   <<  endl ; 
         exit  (  1  )  ; 
     } 
     // set a snapshot before put 
     const  rocksdb :  : Snapshot  * sp1  =  db ->  GetSnapshot  (  )  ;  

    s  =  db ->  Put  ( rocksdb :  :  WriteOptions  (  )  ,   "sp2"  ,   "value_sp2"  )  ; 
     assert  ( s .  ok  (  )  )  ; 

	 // set a snapshot after put 
     const  rocksdb :  : Snapshot  * sp2  =  db ->  GetSnapshot  (  )  ; 

    rocksdb :  : ReadOptions read_option ; 
    read_option . snapshot  =  sp1 ; 
    string value  =   ""  ; 
     //预期获取不到sp2的value,因为这里用的是sp1的快照 
    s  =  db ->  Get  ( read_option ,   "sp2"  ,   & value )  ;  
     if  ( value  ==   ""  )   { 
        cout  <<   "Can‘t get sp2 at sp1!"   <<  endl ; 
     } 

    read_option . snapshot  =  sp2 ; 
     // 能够获取到,使用的是sp2的快照,其是在put之后设置的 
    s  =  db ->  Get  ( read_option ,   "sp2"  ,   & value )  ;  
     assert  ( s .  ok  (  )  )  ; 
     if  ( value  !=   ""  )   { 
        cout  <<   "Got sp2‘s value: "   <<  value  <<  endl ; 
     } 

    db ->  ReleaseSnapshot  ( sp1 )  ; 
    db ->  ReleaseSnapshot  ( sp2 )  ; 
 

输出如下:

 Can ‘t get sp2 at sp1!
Got sp2‘ s value: value_sp2
 

当然rocksdb也提供了更为复杂的mvcc特性,来以事务的方式支持不同的隔离级别。

Rocksdb提供迭代器来来访问整个db中的数据,就像STL中的迭代器功能一样,用来访问容器中的具体的数据。

访问形式以及访问接口有如下几种:

遍历所有的key-value
  //打开db,并初始化一个迭代器指针 
rocksdb :  : Iterator *  it  =  db ->  NewIterator  ( rocksdb :  :  ReadOptions  (  )  )  ; 
 for   ( it ->  SeekToFirst  (  )  ;  it ->  Valid  (  )  ;  it ->  Next  (  )  )   { 
    cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
delete it ; 
 
输出一个范围内的key-value,[small, big)
  for   ( it ->  Seek  ( small )  ; 
   it ->  Valid  (  )   &&  it ->  key  (  )  .  ToString  (  )   <  big ; 
   it ->  Next  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 
反向遍历db中的元素
  for   ( it ->  SeekToLast  (  )  ;  it ->  Valid  (  )  ;  it ->  Prev  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 
反向遍历一个指定范围的key,如(small, big]
  for   ( it ->  SeekForPrev  ( start )  ; 
   it ->  Valid  (  )   &&  it ->  key  (  )  .  ToString  (  )   >  limit ; 
   it ->  Prev  (  )  )   { 
 .  .  . 
 } 
 assert  ( it ->  status  (  )  .  ok  (  )  )  ;   // Check for any errors found during the scan 
 

迭代器的接口可以算是 rocksdb针对客户端的核心接口,主要是提供排序以及高效查找的功能。

测试代码如下:

  # include   <iostream>  
 # include   <string>  
 # include   <rocksdb/db.h>  
 # include   <rocksdb/iterator.h>  
 # include   <rocksdb/table.h>  
 # include   <rocksdb/options.h>  
 # include   <rocksdb/env.h>  

using namespace std ; 


 static  string  rand_key  (  unsigned   long   long  key_range )   { 
     char  buff [  30  ]  ; 
     unsigned   long   long  n  =   1  ; 

     for   (  int  i  =  1  ;  i  <=   4  ;   ++ i )   { 
        n  *  =   (  unsigned   long   long   )   rand  (  )  ; 
     } 

     sprintf  ( buff ,   "%llu"  ,  n  %  key_range )  ; 

    string  k  ( buff )  ; 
     return  k ; 
 } 

 int   main  (  )   { 
    rocksdb :  : DB  * db ; 
    rocksdb :  : Options option ; 

    option . create_if_missing  =  true ; 
    option . compression  =  rocksdb :  : CompressionType :  : kNoCompression ; 

    rocksdb :  : Status s  =  rocksdb :  : DB :  :  Open  ( option ,   "./iterator_db"  ,   & db )  ; 
     if   (  ! s .  ok  (  )  )   { 
        cout  <<   "Open failed with "   <<  s .  ToString  (  )   <<  endl ; 
         exit  (  1  )  ; 
     } 

    rocksdb :  :  DestroyDB  (  "./iterator_db"  ,  option )  ; 

	cout  <<   "seek all keys : "   <<  endl ; 
     for  (  int  i  =   0  ;  i  <   5  ;  i  ++  )   { 
        rocksdb :  : Status s  =  db ->  Put  ( rocksdb :  :  WriteOptions  (  )  ,  
                                 rand_key  (  9  )  ,   string  (  10  ,   ‘a‘   +   ( i  %   26  )  )   )  ; 

         if   (  ! s .  ok  (  )  )   { 
            cout  <<   "Put failed with "   <<  s .  ToString  (  )   <<  endl ; 
             exit  (  1  )  ; 
         } 
     }    
    
     /* traverse rocksdb key-value */ 
    rocksdb :  : Iterator  * it  =  db ->  NewIterator  ( rocksdb :  :  ReadOptions  (  )  )  ; 
     for   ( it ->  SeekToFirst  (  )  ;  it ->  Valid  (  )  ;  it ->  Next  (  )  )   { 
        cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
     } 

    string limit =  "4"  ; 
    string start =  "2"  ; 
    cout  <<   "seek from ‘2‘ to ‘4‘ : "   <<  endl ; 
     for  ( it ->  Seek  ( start )  ;  it ->  Valid  (  )  && it ->  key  (  )  .  ToString  (  )   <  limit ; 
        it ->  Next  (  )  )   { 
            cout  <<  it ->  key  (  )  .  ToString  (  )   <<   ": "   <<  it ->  value  (  )  .  ToString  (  )   <<  endl ; 
         }  
     assert  ( it ->  status  (  )  .  ok  (  )  )  ; 

    cout  <<   "seek from last to start :"   <<  endl ; 
     for   ( it ->  SeekToLast  (  )  ;  it ->  Valid  (  )  ;  it -><

      

查看更多关于Rocksdb iterator和snapshot 接口的详细内容...

  阅读:26次