elasticsearch search 模块分析

2018-01-06 10:52:21来源:https://www.easyice.cn/archives/257作者:easyice人点击

分享

基于版本:5.5.3


本篇从整体上分析一下检索的基本流程。


增删改查操作只对单个文档进行处理,通常由 _index, _type, 和 _id三元组来确定唯一文档。但搜索需要一种更复杂的模型,因为不知道查询会命中哪些文档。一个搜索请求必须询问指定索引的 所有分片中的某个副本 来进行匹配。假设一个索引有5个主分片,1个副本分片,共10个分片,一次搜索请求会由5个分片来共同完成,他们有可能是主分片,也可能是副分片。也就是说, 一次搜索请求只会命中所有副本中的一个 。


找到匹配文档仅仅完成了一半,多分片中的结果必须组合成单个排序列表。集群的任意节点都可以接受搜索请求,接收客户端请求的节点成为协调节点,在协调节点,搜索任务被执行成一个两阶段过程,称之为 query then fetch 。 真正执行搜索任务的节点暂且称为数据节点。


在协调节点,相应地实现位于:


查询阶段(query):o.e.a.search.InitialSearchPhase


取回阶段(fetch):o.e.a.search.FetchSearchPhase


他们都继承自:SearchPhase



search type

在 es5中有两种搜索类型:


DFS_QUERY_THEN_FETCH
QUERY_THEN_FETCH (默认)

两种不同的搜索类型的区别在于查询阶段,DFS 查询阶段的流程要多一些,他使用全局信息来获取更准确的评分。


下面的流程分析以默认搜索类型为例.


Query 阶段

在初始 查询阶段 时, 查询会广播到索引中每一个分片副本(主分片或者副分片)。 每个分片在本地执行搜索并构建一个匹配文档的 优先队列


优先队列是一个存有 topn 匹配文档的有序列表。优先队列大小为分页参数 from + size。


分布式搜索的 Query 阶段



QUERY_THEN_FETCH 搜索类型的查询阶段有以下步骤:



客户端发送 search 请求到 Node 3。
Node 3 将查询请求转发到索引的每个主分片或副分片中。
每个分片在本地执行查询,并使用 本地 的Term/Document Frequency信息进行打分,添加结果到大小为 from + size 的本地有序优先队列中
每个分片返回各自优先队列中所有文档的 ID 和排序值给协调节点,它合并这些值到自己的优先队列,产生一个全局排序后的列表。

协调节点广播查询请求到所有相关分片时,可以是主分片或副分片,协调节点将在之后的请求中轮询所有的分片副本来分摊负载。


小结:查询阶段并不会对搜索请求的内容解析理解,无论搜什么东西,只看本次搜索需要命中哪些shard,然后针对每个特定 shard 选择一个副本,转发搜索请求。


Query 阶段源码解析

本阶段运行于 http_server_work 线程


1.REST接口收到搜索请求


主要是将请求体解析为 SearchRequest 数据结构



SearchRequest searchRequest = new SearchRequest();
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser));

2.构造目的 shard 列表


将本集群shard列表和远程集群的shard列表(如果有的话)合并



GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);

3.遍历所有 shard 发送请求,请求是基于 shard 遍历的,如果某个节点有多个 shard,并不会把请求合并为一个。某节点有N个 shard,就向他发N次请求。



public final void run() throws IOException {
int shardIndex = -1;
for (final SearchShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull(); //后续请求轮询所有的分片副本
if (shard != null) {
performPhaseOnShard(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}

shardsIts为本次搜索涉及到的所有分片,shardIt.nextOrNull()从某个分片的所有副本中选择一个,例如从[website][0]中选择主分片。


发送请求同时定义一个listener,用于处理Response:



executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
onShardResult(result, shardIt);
}
@Override
public void onFailure(Exception t) {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
}
});

发送过程调用 transport 模块实现:



transportService.sendChildRequest

以简单普通搜索为例,对应 action 为:



indices:data/read/search[phase/query]

4.收集返回结果



private void onShardResult(FirstResult result, ShardIterator shardIt) {
onShardSuccess(result);
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
}
}

onShardSuccess 对收集到的结果进行合并。


onPhaseDone 会调用 executeNextPhase 从而开始执行取回阶段。


Fetch 阶段

Query 阶段知道了要取哪些数据,但是并没有取具体的数据,这就是 fetch 阶段要做的。


分布式搜索的 Fetch 阶段



Fetch 阶段由以下步骤构成:


1. 协调节点向相关 node 发送 GET 请求


2. 分片所在节点向协调节点返回数据


3. 协调节点等待所有文档被取得,然后返回给客户端


分片所在节点在返回文档数据时,处理有可能的_source 字段以及高亮参数。


协调节点首先决定哪些文档 确实 需要被取回,例如,如果查询指定了 { “from”: 90, “size”: 10 } ,只有从第91个开始的10个结果需要被取回。


深度分页


为了避免在协调节点创建 number_of_shards * (from + size)的优先队列,尽量控制分页深度。


Fetch 阶段源码解析

本阶段运行于 search 线程


1.发送 Fetch 请求


Query 阶段的 executeNextPhase 开始执行 Fetch 阶段,进入FetchSearchPhase#innerRun,处理 scroll(略),从查询阶段的 shard 列表中遍历,跳过查询结果为空的 shard,对特定目标 shard 执行 executeFetch 获取数据,其中包括分页信息。


executeFetch主要实现



private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
final CountedCollector<FetchSearchResult> counter,
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) {
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
@Override
public void innerOnResponse(FetchSearchResult result) {
counter.onResult(result);
}
});
}

executeFetch 的参数 querySearchResult 中包含分页信息,最后定义一个 Listener,每获取一个 shard 数据成功后执行 counter.onResult,其中调用对结果的处理回调,把 result 保存的数组,然后执行countDown:



void onResult(R result) {
try {
resultConsumer.accept(result);
} finally {
countDown();
}
}

以简单普通搜索为例,本节点发送的 action 为:



indices:data/read/search[phase/fetch/id

2.收集结果


收集器的定义在innerRun中,定义了包括了收到的 shard 数据存到哪,收集完了谁来处理:



final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase, context);

fetchResults 用于存储从某个 shard 收集到的结果,每收到一个 shard 的数据执行一次 counter.countDown() 当所有 shard 数据收集完毕后,countDown 会出触发执行 finishPhase:



final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);

下一阶段的定义在 FetchSearchPhase 构造函数:



this(resultConsumer, searchPhaseController, context,
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));

3.ExpandSearchPhase


取回阶段完成之后执行ExpandSearchPhase#run,主要判断字段是否折叠,实现 字段折叠 ,否则就直接返回给客户端。


4.回复客户端


ExpandSearchPhase 之后是恢复客户的 sendResponsePhase:



public void run() throws IOException {
context.onResponse(context.buildSearchResponse(response, scrollId));
}

执行搜索的数据节点

对各种 Query,Fetch 请求的处理入口注册于:


o.e.a.s.SearchTransportService#registerRequestHandler


响应 Query 请求

本阶段运行于 search 线程


以 active为:



indices:data/read/search[phase/query]

为例的普通请求,执行查询,发送 Response



SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);

查询实现入口在 searchService.executeQueryPhase ,完全封装在这个函数。查询时,先看是否允许 cache,由配置:



index.requests.cache.enable

决定,默认为 true,会把查询结果放到 cache 中,查询时优先从 cache 中取。这个 cache 由节点的所有分片共享,基于 LRU 算法实现:空间满的时候踢出最最近最少使用的数据。cache 并不缓存全部检索结果,具体参考 这里


核心的查询封装在:queryPhase.execute(context) 其中调用 lucene 实现检索,同时实现聚合:



aggregationPhase.preProcess(searchContext);
boolean rescore = execute(searchContext, searchContext.searcher());
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);

其中包含几个核心功能:


execute() 调用 lucene, searcher.search() 实现搜索
rescorePhase 全文检索且需要打分
suggestPhase 自动补全及纠错
aggregationPhase 实现聚合

小结:


慢查询Query日志的统计时间在于本阶段的处理时间。
聚合操作在本阶段实现, lucene 检索后完成。
响应 Fetch 请求

与 Query 节点类似,本阶段运行于 search 线程


以 action 为:



indices:data/read/search[phase/fetch/id]

为例的普通请求,执行Fetch,发送 Response



FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);

对 Fetch 响应的实现封装在:searchService.executeFetchPhase,核心是调用 fetchPhase.execute(context); 按照命中的 doc 取得相关数据,填充到 SearchHits,最终封装到 FetchSearchResult。


小结:


慢查询Fetch日志的统计时间在于本阶段的处理时间。
总结
聚合是在 es 实现的,而非 lucene
Query 和 Fetch 请求之间是无状态的,除非是scroll方式
分页搜索不会单独 cache,cache 和分页没有关系

最后补一张整体流程图:



参考

http://www.opscoder.info/es_search_client.html


http://www.opscoder.info/es_search_server.html


http://www.jianshu.com/p/c7529b98993e


最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台