Github源码:https://github/Mengzuozhu/es-demo

相关文章链接:

  • from size 分页-Java示例
  • Scroll 分页-Java示例
  • Search After分页-Java示例

简介

Scroll分页与from-size分页的比较可参考:分页查询From&Size VS scroll
Scroll分页特点:

  • 模拟数据游标,可用于深分页查询大量数据场景
  • 基于历史快照和上一页的ScrollId,查询下一页;
  • 不适合实时搜索和跳页需求;

ES 5.6.8版本

maven引用

		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>5.6.8</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<version>5.6.8</version>
		</dependency>

ES Scroll分页查询的Java代码示例

import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

import java.util.function.Consumer;


public class EsScrollHandler {

    private static final int DEFAULT_SCROLL_TIME = 30000;

    /**
     * Search for hit.
     *
     * @param requestBuilder the request builder
     * @param consumer       the consumer
     */
    public static void searchForHit(TransportClient esClient, SearchRequestBuilder requestBuilder,
                                    Consumer<SearchHit> consumer) {
        searchForResponse(esClient, requestBuilder, searchResponse -> {
            SearchHit[] searchHits = searchResponse.getHits().getHits();
            for (SearchHit searchHit : searchHits) {
                consumer.accept(searchHit);
            }
        });
    }

    /**
     * Search for response.
     *
     * @param esClient       the es client
     * @param requestBuilder the request builder
     * @param consumer       the consumer
     */
    public static void searchForResponse(TransportClient esClient, SearchRequestBuilder requestBuilder,
                                         Consumer<SearchResponse> consumer) {
        searchForResponse(esClient, requestBuilder, consumer, new TimeValue(DEFAULT_SCROLL_TIME));
    }

    /**
     * Search for response.
     *
     * @param esClient       the es client
     * @param requestBuilder the request builder
     * @param consumer       the consumer
     * @param keepAlive      the keep alive
     */
    public static void searchForResponse(TransportClient esClient, SearchRequestBuilder requestBuilder,
                                         Consumer<SearchResponse> consumer, TimeValue keepAlive) {
        if (esClient == null || requestBuilder == null || consumer == null) {
            return;
        }
        SearchRequestBuilder builder = requestBuilder.setScroll(keepAlive);
        SearchResponse searchResponse = builder.get();
        long length = searchResponse.getHits().getHits().length;
        String scrollId = null;
        while (length > 0) {
            consumer.accept(searchResponse);
            scrollId = searchResponse.getScrollId();
            searchResponse = esClient.prepareSearchScroll(scrollId)
                    .setScroll(keepAlive)
                    .get();
            length = searchResponse.getHits().getHits().length;
        }
        // 可手动清空快照记录,避免内存占用        
        if (scrollId != null) {
            esClient.prepareClearScroll().addScrollId(scrollId).get();
        }
    }
}

ES 7.9.2版本

maven引用

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.9.2</version>
        </dependency>

ES Scroll分页查询的Java代码示例

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
import java.util.function.Consumer;

/**
 * ES Scroll查询
 *
 * @author zuozhu.meng
 **/
public class EsScrollHandler {
    private static final int DEFAULT_KEEP_ALIVE_MILLIS = 60000;
    private final RestHighLevelClient highLevelClient;
    private boolean isClearScroll = true;

    public EsScrollHandler(RestHighLevelClient client) {
        this.highLevelClient = client;
    }

    public EsScrollHandler(RestHighLevelClient client, boolean isClearScroll) {
        this.isClearScroll = isClearScroll;
        this.highLevelClient = client;
    }

    public EsScrollHandler setClearScroll(boolean clearScroll) {
        isClearScroll = clearScroll;
        return this;
    }

    /**
     * Search for hit.
     *
     * @param searchRequest the request builder
     * @param consumer      the consumer
     */
    public void searchForHit(SearchRequest searchRequest, Consumer<SearchHit> consumer) throws IOException {
        searchForResponse(searchRequest, searchResponse -> forEachHits(searchResponse, consumer));
    }

    /**
     * Search for response.
     *
     * @param searchRequest the request builder
     * @param consumer      the consumer
     */
    public void searchForResponse(SearchRequest searchRequest, Consumer<SearchResponse> consumer) throws IOException {
        searchForResponse(searchRequest, new TimeValue(DEFAULT_KEEP_ALIVE_MILLIS), consumer);
    }

    /**
     * Search for response.
     *
     * @param searchRequest the request builder
     * @param consumer      the consumer
     * @param keepAlive     the keep alive
     */
    public void searchForResponse(SearchRequest searchRequest, TimeValue keepAlive,
                                  Consumer<SearchResponse> consumer) throws IOException {
        if (searchRequest == null || consumer == null) {
            return;
        }
        searchRequest.scroll(keepAlive);
        SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        long length = searchResponse.getHits().getHits().length;
        String scrollId = null;
        while (length > 0) {
            consumer.accept(searchResponse);
            scrollId = searchResponse.getScrollId();
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
                    .scroll(keepAlive);
            searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
            length = searchResponse.getHits().getHits().length;
        }
        // 清空快照记录,避免内存占用
        if (isClearScroll && scrollId != null) {
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(scrollId);
            clearScrollAsync(clearScrollRequest);
        }
    }
    
    public static void forEachHits(SearchResponse searchResponse, Consumer<SearchHit> consumer) {
        if (searchResponse == null) {
            return;
        }
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        for (SearchHit searchHit : searchHits) {
            consumer.accept(searchHit);
        }
    }
    
    private void clearScrollAsync(ClearScrollRequest request) {
        highLevelClient.clearScrollAsync(request, RequestOptions.DEFAULT, new ActionListener<ClearScrollResponse>() {
            @Override
            public void onResponse(ClearScrollResponse clearScrollResponse) {

            }

            @Override
            public void onFailure(Exception e) {
                throw new ElasticsearchException(e);
            }
        });
    }

}

更多推荐

Elasticsearch-Scroll分页-Java示例