好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

基于Morphia实现MongoDB按小时、按天聚合操作方法

MongoDB按照天数或小时聚合

需求

最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为: Spring Boot , MongoDB,Morphia .

数据模型

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

@Data

@Builder

@Entity(value = "rawDevStatus" , noClassnameStored = true )

// 设备状态索引

@Indexes({

     // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)

     @ Index (fields = @Field( "time" ), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),

     @ Index (fields = {@Field( "userId" ), @Field(value = "time" , type = IndexType. DESC )})

})

public class RawDevStatus {

   @Id

   @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)

   private ObjectId objectId;

   private String userId;

   private Instant time ;

   @Embedded( "points" )

   List<Point> protocolPoints;

   @Data

   @AllArgsConstructor

   public static class Point {

     /**

      * 协议类型

      */

     private Protocol protocol;

     /**

      * 设备总数

      */

     private Integer total;

     /**

      * 设备在线数目

      */

     private Integer onlineNum;

     /**

      * 处于启用状态设备数目

      */

     private Integer enableNum;

   }

}

上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

@Data

@Builder

@Entity(value = "aggregationDevStatus" , noClassnameStored = true )

@Indexes({

     @ Index (fields = @Field( "expireAt" ), options = @IndexOptions(expireAfterSeconds = 0)),

     @ Index (fields = {@Field( "userId" ), @Field(value = "time" , type = IndexType. DESC )})

})

public class AggregationDevStatus {

   @Id

   @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)

   private ObjectId objectId;

   /**

    * 用户ID

    */

   private String userId;

   /**

    * 设备总数

    */

   private Double total;

   /**

    * 设备在线数目

    */

   private Double onlineNum;

   /**

    * 处于启用状态设备数目

    */

   private Double enableNum;

   /**

    * 聚合类型(按照小时还是按照天聚合)

    */

   @Property( "aggDuration" )

   private AggregationDuration aggregationDuration;

   private Instant time ;

   /**

    * 动态设置文档过期时间

    */

   private Instant expireAt;

}

上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

聚合操作符介绍

聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.

此次聚合主要涉及以下操作:

•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.

原始聚合语句

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

db.getCollection( 'raw_dev_status' ).aggregate([

   {$match:

     {

       time :{$gte: ISODate( "2019-06-27T0Z" )},

     }

   },

   {$unwind: "$points" },

   {$project:

     {

       userId:1,points:1,

       tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z" , date : "$time" } }

     }

   },

   {$project:

     {

       userId:1,points:1,

       groupTime: {$dateFromString: { dateString: "$tmp" , format: "%Y:%m:%dT%H:%M:%SZ" , } }

     }

   },

   {$ group :

     {

       _id:{user_id: '$userId' , cal_time: '$groupTime' },

       devTotal:{ '$avg' : '$points.total' },

       onlineTotal:{ '$avg' : '$points.onlineNum' },

       enableTotal:{ '$avg' : '$points.enableNum' }

     }

   },

])

上述代码是按小时聚合数据,以下来逐步介绍处理思路:

(1) $match

根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

(2) $unwind

raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

(3) $project

?

1

2

3

4

5

6

{$project:

   {

     userId:1,points:1,

     tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z" , date : "$time" } }

   }

}

选择需要输出的数据,分别为: userId,points 以及 tmp.

需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

如果需要按天聚合,则format数据可修改为 :%Y:%m:%dT0Z 即可满足要求.

(4) $project

?

1

2

3

4

5

6

{$project:

   {

     userId:1,points:1,

     groupTime: {$dateFromString: { dateString: "$tmp" , format: "%Y:%m:%dT%H:%M:%SZ" , } }

   }

}

因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

(5) $group

对聚合结果进行分类操作,并生成最终输出结果.

?

1

2

3

4

5

6

7

8

9

10

11

12

{$ group :

    {

      # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`

      _id:{user_id: '$userId' , cal_time: '$groupTime' },

      # 求设备总数平均值

      devTotal:{ '$avg' : '$points.total' },

      # 求设备在线数平均值

      onlineTotal:{ '$avg' : '$points.onlineNum' },

      # ...

      enableTotal:{ '$avg' : '$points.enableNum' }

    }

  }

代码编写

此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

/**

   * 创建聚合条件

   *

   * @param pastTime   过去时间段

   * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT0Z)

   * @ return 聚合条件

   */

  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {

    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);

    return datastore.createAggregation(RawDevStatus.class)

        .match(query.field( "time" ).greaterThanOrEq(pastTime))

        .unwind( "points" , new UnwindOptions().preserveNullAndEmptyArrays( false ))

        .match(query.field( "points.protocol" ).equal( "ALL" ))

        .project(Projection.projection( "userId" ),

            Projection.projection( "points" ),

            Projection.projection( "convertTime" ,

                Projection.expression( "$dateToString" ,

                    new BasicDBObject( "format" , dateToString)

                        .append( "date" , "$time" ))

            )

        )

        .project(Projection.projection( "userId" ),

            Projection.projection( "points" ),

            Projection.projection( "convertTime" ,

                Projection.expression( "$dateFromString" ,

                    new BasicDBObject( "format" , stringToDate)

                        .append( "dateString" , "$convertTime" ))

            )

        )

        . group (

            Group .id( Group . grouping ( "userId" ), Group . grouping ( "convertTime" )),

            Group . grouping ( "total" , Group .average( "points.total" )),

            Group . grouping ( "onlineNum" , Group .average( "points.onlineNum" )),

            Group . grouping ( "enableNum" , Group .average( "points.enableNum" ))

        );

  }

  /**

   * 获取聚合结果

   *

   * @param pipeline 聚合条件

   * @ return 聚合结果

   */

  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {

    List<AggregationMidDevStatus> statuses = new ArrayList<>();

    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(

        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse( true ).build());

    while (resultIterator.hasNext()) {

      statuses. add (resultIterator. next ());

    }

    return statuses;

  }

  //......................................................................................

  // 获取聚合结果(省略若干代码)

  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);

  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);

  if (CollectionUtils.isEmpty(midStatuses)) {

    log.warn( "Can not get dev status aggregation result." );

    return ;

  }

总结

以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

原文链接:https://www.cnblogs.com/jason1990/archive/2019/07/31/11269658.html

查看更多关于基于Morphia实现MongoDB按小时、按天聚合操作方法的详细内容...

  阅读:21次