基于Spring Data Jest的Elasticsearch数据查询与统计

2018-02-28 07:48:45来源:作者:人点击

分享
第七城市

 命令查询职责分离模式(Command Query Responsibility Segregation,CQRS)从业务上分离修改 (Command,增,删,改,会对系统状态进行修改)和查询(Query,查,不会对系统状态进行修改)的行为。从而使得逻辑更加清晰,便于对不同部分进行针对性的优化。

CQRS有以下几点有点:

  • 1.分工明确,可以负责不同的部分;
  • 2.将业务上的命令和查询的职责分离能够提高系统的性能、可扩展性和安全性。并且在系统的演化中能够保持高度的灵活性,能够防止出现CRUD模式中,对查询或者修改中的某一方进行改动,导致另一方出现问题的情况;
  • 3.逻辑清晰,能够看到系统中的那些行为或者操作导致了系统的状态变化;
  • 4.可以从数据驱动(Data-Driven) 转到任务驱动(Task-Driven)以及事件驱动(Event-Driven)。

因此Command使用普通数据库(关系型数据库或非关系型数据库),Query使用效率查询效率更高的Elasticsearch。

如何确保数据库和Elasticsearch的数据的一致性?

  • 我们可以使用事件驱动(Event-Driven)即Spring Data的Domain Event同步数据,可参考博客:http://www.wisely.top/2017/06/20/spring-data-domain-event-usage/ 。

当老数据库有大量数据需要导入Elasticsearch时,可参考博客:http://www.wisely.top/2018/02/24/spring-batch-elasticsearch/

Spring Data Elasticsearch使用的是transport client,而Elasticsearch官网推荐使用REST client。阿里云的Elasticsearch使用transport client目前还在存在问题,阿里云推荐使用REST client。

本示例使用的是Spring Data Jest链接Elasticsearch,Elasticsearch的版本为:5.5.3(目前只有spring boot 2.0以上版本支持)

1.项目构建

  • 1.pom依赖如下:
<dependency> <groupId>com.github.vanroy</groupId> <artifactId>spring-boot-starter-data-jest</artifactId> <version>3.0.0.RELEASE</version></dependency><dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>5.3.2</version></dependency>
  • 2.配置文件
spring:  data: jest:uri: http://127.0.0.1:9200username: elasticpassword: changeme

2.构造查询条件

以简单的实体类为例

package com.hfcsbc.esetl.domain;import lombok.Data;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.Entity;import javax.persistence.Id;import javax.persistence.OneToOne;import java.util.Date;import java.util.List;/** * Create by pengchao on 2018/2/23 */@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")@Entity@Datapublic class Person { @Id private Long id; private String name; @OneToOne @Field(type = FieldType.Nested) private List<Address> address; private Integer number; private Integer status; private Date birthDay;}
package com.hfcsbc.esetl.domain;import lombok.Data;import javax.persistence.Entity;import javax.persistence.Id;/** * Create by pengchao on 2018/2/23 */@Entity@Datapublic class Address { @Id private Long id; private String name; private Integer number;}
  • 1.根据多个状态查询(类似于sql的in)
BoolQueryBuilder orderStatusCondition = QueryBuilders.boolQuery().should(QueryBuilders.termQuery("status", 1)).should(QueryBuilders.termQuery("status", 2)).should(QueryBuilders.termQuery("status", 3)).should(QueryBuilders.termQuery("status", 4)).should(QueryBuilders.termQuery("status", 5));
  • 2.and链接查询(类似于sql的and)
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();queryBuilder.must(queryBuilder1).must(queryBuilder2).must(queryBuilder3);
  • 3.range查询(类似于sql的between .. and ..)
QueryBuilder rangeQuery = QueryBuilders.rangeQuery("birthDay").from(yesterday).to(today);
  • 4.嵌套对象查询
QueryBuilder queryBuilder = QueryBuilders.nestedQuery("nested", QueryBuilders.termQuery("address.id", 100001), ScoreMode.None);

ScoreMode: 定义other join side中score是如何被使用的。如果不关注scoring,我们只需要设置成ScoreMode.None,此种方式会忽略评分因此会更高效和节约内存

3.获取统计数据

  • 1.非嵌套获取数据求和
SumAggregationBuilder sumBuilder = AggregationBuilders.sum("sum").field("number");SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(QUERY_INDEX).withTypes(QUERY_TYPE).withQuery(boolQueryBuilder).addAggregation(sumBuilder).build();AggregatedPage<ParkingOrder> account = (AggregatedPage<ParkingOrder>) esParkingOrderRepository.search(EsQueryBuilders.buildYesterdayArrearsSumQuery(employeeId));int sum = account.getAggregation("sum", SumAggregation.class).getSum().intValue();
  • 2.嵌套数据求和
SumAggregationBuilder sumBuilder = AggregationBuilders.sum("sum").field("adress.num");AggregationBuilder aggregationBuilder = AggregationBuilders.nested("nested", "adress").subAggregation(sumBuilder);SearchQuery searchQuery = new NativeSearchQueryBuilder().withIndices(QUERY_INDEX).withTypes(QUERY_TYPE).withQuery(boolQueryBuilder).addAggregation((AbstractAggregationBuilder) aggregationBuilder).build();AggregatedPage<ParkingOrder> account = (AggregatedPage<ParkingOrder>) esParkingOrderRepository.search(EsQueryBuilders.buildYesterdayArrearsSumQuery(employeeId));int sum = account.getAggregation("nested", SumAggregation.class).getAggregation("sum", SumAggregation.class).getSum().intValue();
第七城市

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台