Github源码:
https://github/Mengzuozhu/es-demo
相关文章链接:
- from size 分页-Java示例
- Scroll 分页-Java示例
- Search After分页-Java示例
简介
Scroll分页与from-size分页的比较可参考:分页查询From&Size VS scroll
Scroll分页特点:
- 模拟数据游标,可用于深分页查询大量数据场景
- 基于历史快照和上一页的ScrollId,查询下一页;
- 不适合实时搜索和跳页需求;
ES 5.6.8版本
maven引用
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.8</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.8</version>
</dependency>
ES Scroll分页查询的Java代码示例
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.util.function.Consumer;
public class EsScrollHandler {
private static final int DEFAULT_SCROLL_TIME = 30000;
/**
* Search for hit.
*
* @param requestBuilder the request builder
* @param consumer the consumer
*/
public static void searchForHit(TransportClient esClient, SearchRequestBuilder requestBuilder,
Consumer<SearchHit> consumer) {
searchForResponse(esClient, requestBuilder, searchResponse -> {
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
consumer.accept(searchHit);
}
});
}
/**
* Search for response.
*
* @param esClient the es client
* @param requestBuilder the request builder
* @param consumer the consumer
*/
public static void searchForResponse(TransportClient esClient, SearchRequestBuilder requestBuilder,
Consumer<SearchResponse> consumer) {
searchForResponse(esClient, requestBuilder, consumer, new TimeValue(DEFAULT_SCROLL_TIME));
}
/**
* Search for response.
*
* @param esClient the es client
* @param requestBuilder the request builder
* @param consumer the consumer
* @param keepAlive the keep alive
*/
public static void searchForResponse(TransportClient esClient, SearchRequestBuilder requestBuilder,
Consumer<SearchResponse> consumer, TimeValue keepAlive) {
if (esClient == null || requestBuilder == null || consumer == null) {
return;
}
SearchRequestBuilder builder = requestBuilder.setScroll(keepAlive);
SearchResponse searchResponse = builder.get();
long length = searchResponse.getHits().getHits().length;
String scrollId = null;
while (length > 0) {
consumer.accept(searchResponse);
scrollId = searchResponse.getScrollId();
searchResponse = esClient.prepareSearchScroll(scrollId)
.setScroll(keepAlive)
.get();
length = searchResponse.getHits().getHits().length;
}
// 可手动清空快照记录,避免内存占用
if (scrollId != null) {
esClient.prepareClearScroll().addScrollId(scrollId).get();
}
}
}
ES 7.9.2版本
maven引用
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.2</version>
</dependency>
ES Scroll分页查询的Java代码示例
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.function.Consumer;
/**
* ES Scroll查询
*
* @author zuozhu.meng
**/
public class EsScrollHandler {
private static final int DEFAULT_KEEP_ALIVE_MILLIS = 60000;
private final RestHighLevelClient highLevelClient;
private boolean isClearScroll = true;
public EsScrollHandler(RestHighLevelClient client) {
this.highLevelClient = client;
}
public EsScrollHandler(RestHighLevelClient client, boolean isClearScroll) {
this.isClearScroll = isClearScroll;
this.highLevelClient = client;
}
public EsScrollHandler setClearScroll(boolean clearScroll) {
isClearScroll = clearScroll;
return this;
}
/**
* Search for hit.
*
* @param searchRequest the request builder
* @param consumer the consumer
*/
public void searchForHit(SearchRequest searchRequest, Consumer<SearchHit> consumer) throws IOException {
searchForResponse(searchRequest, searchResponse -> forEachHits(searchResponse, consumer));
}
/**
* Search for response.
*
* @param searchRequest the request builder
* @param consumer the consumer
*/
public void searchForResponse(SearchRequest searchRequest, Consumer<SearchResponse> consumer) throws IOException {
searchForResponse(searchRequest, new TimeValue(DEFAULT_KEEP_ALIVE_MILLIS), consumer);
}
/**
* Search for response.
*
* @param searchRequest the request builder
* @param consumer the consumer
* @param keepAlive the keep alive
*/
public void searchForResponse(SearchRequest searchRequest, TimeValue keepAlive,
Consumer<SearchResponse> consumer) throws IOException {
if (searchRequest == null || consumer == null) {
return;
}
searchRequest.scroll(keepAlive);
SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
long length = searchResponse.getHits().getHits().length;
String scrollId = null;
while (length > 0) {
consumer.accept(searchResponse);
scrollId = searchResponse.getScrollId();
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
.scroll(keepAlive);
searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
length = searchResponse.getHits().getHits().length;
}
// 清空快照记录,避免内存占用
if (isClearScroll && scrollId != null) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
clearScrollAsync(clearScrollRequest);
}
}
public static void forEachHits(SearchResponse searchResponse, Consumer<SearchHit> consumer) {
if (searchResponse == null) {
return;
}
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
consumer.accept(searchHit);
}
}
private void clearScrollAsync(ClearScrollRequest request) {
highLevelClient.clearScrollAsync(request, RequestOptions.DEFAULT, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
}
@Override
public void onFailure(Exception e) {
throw new ElasticsearchException(e);
}
});
}
}
更多推荐
Elasticsearch-Scroll分页-Java示例
发布评论