好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

分布式统计的思考以及实现

分布式统计的思考以及实现

分布式统计的思考以及实现

在展开描述之前,先看个简单的例子,假设现有这样一组数据

Index A B C 0 a1 b1 c1 1 a2 b1 c2 2 a1 b1 c3

需求为这样:

以A,B作为分组字段,对C去重后求和

那么,针对上述的数据源,则结果表为:

A B Sum_C a1 b1 2 #c1, c3 a2 b1 1 #c2

计算过程大概为这样:

#1号数据,A和B的组合与0号不匹配,则生成新的组

上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:

* 单点式

* 无法做实时计算

而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程

那么,我们再来回顾刚才的计算过程:

* 对A、B字段的组合分组可以看作一个计算hash的操作

* 对C字段的去重求和也可以看作一个大的hashSet去重的操作

* 对于新的数据进入,重复计算hash的过程

OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适

那么,如何实现呢? ( 以下以python作为实现语言)

我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:

?

{ 'A' : 'a1' , 'B' : 'b1' , 'C' : 'c1' }

那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述

?

def   __do_aggerate( self , _2nd_k, op, _1st_k =   None ):

     assert   callable (op)

      

     _ =   self .__aggerates.get(_2nd_k)

     if   _:

         return   _

      

     _1st_k =   self .__id if   _1st_k is   None   else   _1st_k

     try :

         self .__r.watch(_1st_k)

         _ =   op(_1st_k, _2nd_k)

     except   WatchError, e:

         log.fatal( 'transaction fail: {0}' . format (_1st_k))

     finally :

         self .__r.unwatch()

          

     self .__aggerates.update({_2nd_k: _})

     return   _

      

      

def   __cal_grpkey( self , src):

     '''计算分组对应的key

     '''

     grp_key =   {}

     for   f in   self .__groupby_fields:

         ok, value =   self .sf_parser.unwind(f, src)

         if   not   ok:

             return   False , None

         grp_key.update({f: value})

          

     return   True , grp_key

 

 

def   group_distinct_sum( self , src, * unique_fields):

     assert   src and   isinstance (src, dict ), src

      

     ok, grp_key =   self .__cal_grpkey(src)

     if   not   ok:

         return   ok, None

      

     r_key =   grp_key.copy()

     for   u in   unique_fields:

         _ =   src.get(u)

         if   _ is   None :

             return   False , None

         r_key.update({u: _})

          

     def   __(h, k):

         self .__r.hset(h, k, 1 )

         #self.__r即redis对象

         return   self .__r.hlen(h)

          

     h_key =   '_u:{0}:{1}:{2}' . format ( self .__id,

         ':' .join(unique_fields),

         ':' .join(( self .__safe_str(v) for   v in   grp_key.values())))

     u_key =   hashlib.md5(cPickle.dumps(r_key)).hexdigest()

      

     _ =   self .__do_aggerate(u_key, __, h_key)

     return   True , (grp_key, _)

?

 

如上代码即完成了上文描述的操作:

* 计算分组字段的值

* 对多个分组字段计算hash

* 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程

更详细的,完整的实现过程将在第二部分中阐述

在展开描述之前,先看个简单的例子,假设现有这样一组数据

Index A B C 0 a1 b1 c1 1 a2 b1 c2 2 a1 b1 c3

需求为这样:

以A,B作为分组字段,对C去重后求和

那么,针对上述的数据源,则结果表为:

A B Sum_C a1 b1 2 #c1, c3 a2 b1 1 #c2

计算过程大概为这样:

#1号数据,A和B的组合与0号不匹配,则生成新的组

上述的计算过程我们通常会在各种数据库中见到,例如MS SQLServer或者Mongodb等等,在数据库中的计算都有明显的缺陷:

* 单点式

* 无法做实时计算

而且对于mongodb来说(很久没接触关系型数据库了,就不献丑了),数据量的增大以及数据表的增加对于性能是一个非常大的影响,对内存的需求会非常之高,从成本以及性能的角度考虑,我们需要一个可分布式的算法以及实现过程

那么,我们再来回顾刚才的计算过程:

* 对A、B字段的组合分组可以看作一个计算hash的操作

* 对C字段的去重求和也可以看作一个大的hashSet去重的操作

* 对于新的数据进入,重复计算hash的过程

OK,除了计算hash的过程,还应该有存储hash值的设施,很显然,redis最为合适

那么,如何实现呢? ( 以下以python作为实现语言)

我们知道,在python以及js这种语言中,可以很方便的用dict表示一条数据记录,例如:

?

{ 'A' : 'a1' , 'B' : 'b1' , 'C' : 'c1' }

那么,所有的记录操作都是针对dict对象进行的,以下将给出一段代码片段,第二部分将对实现过程做具体的描述

?

def   __do_aggerate( self , _2nd_k, op, _1st_k =   None ):

     assert   callable (op)

      

     _ =   self .__aggerates.get(_2nd_k)

     if   _:

         return   _

      

     _1st_k =   self .__id if   _1st_k is   None   else   _1st_k

     try :

         self .__r.watch(_1st_k)

         _ =   op(_1st_k, _2nd_k)

     except   WatchError, e:

         log.fatal( 'transaction fail: {0}' . format (_1st_k))

     finally :

         self .__r.unwatch()

          

     self .__aggerates.update({_2nd_k: _})

     return   _

      

      

def   __cal_grpkey( self , src):

     '''计算分组对应的key

     '''

     grp_key =   {}

     for   f in   self .__groupby_fields:

         ok, value =   self .sf_parser.unwind(f, src)

         if   not   ok:

             return   False , None

         grp_key.update({f: value})

          

     return   True , grp_key

 

 

def   group_distinct_sum( self , src, * unique_fields):

     assert   src and   isinstance (src, dict ), src

      

     ok, grp_key =   self .__cal_grpkey(src)

     if   not   ok:

         return   ok, None

      

     r_key =   grp_key.copy()

     for   u in   unique_fields:

         _ =   src.get(u)

         if   _ is   None :

             return   False , None

         r_key.update({u: _})

          

     def   __(h, k):

         self .__r.hset(h, k, 1 )

         #self.__r即redis对象

         return   self .__r.hlen(h)

          

     h_key =   '_u:{0}:{1}:{2}' . format ( self .__id,

         ':' .join(unique_fields),

         ':' .join(( self .__safe_str(v) for   v in   grp_key.values())))

     u_key =   hashlib.md5(cPickle.dumps(r_key)).hexdigest()

      

     _ =   self .__do_aggerate(u_key, __, h_key)

     return   True , (grp_key, _)

?

 

如上代码即完成了上文描述的操作:

* 计算分组字段的值

* 对多个分组字段计算hash

* 对聚集字段(即文章开始的C)作求和操作,调用redis对象的hset和hlen完成求和过程

更详细的,完整的实现过程将在第二部分中阐述

 

 

分类:  Python

 

作者: Leo_wl

    

出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/

    

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

版权信息

查看更多关于分布式统计的思考以及实现的详细内容...

  阅读:36次