High Level REST Client
Elasticsearch Client官方高级客户端。基于低级客户端,主要目标是为了暴露各API特定的方法。Java High Level REST Client依赖于Elasticsearch核心项目,将Request对象作为参数,返回一个Response对象。所有API都可以同步或异步调用。
- 同步调用方法立即返回一个Response对象。
- 而异步调用方法(方法名以async结尾)依赖于监听,当有请求返回或是错误返回时,该监听会通知到对应的方法继续执行。
注意:这种方式,所使用的依赖库的版本要和 Es 对应。
引入依赖:同时排除掉版本不一致的依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.16.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.16.0</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.16.0</version>
</dependency>
1、初始化客户端
高级客户端是基于低级客户端构建的,所以RestHighLevelClient实例需要一个RestLowLevelClient Builder:
@Test
public void create_client() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
//高级客户端将在内部创建用于基于所提供的构建器执行请求的低级客户端。该低级客户端维护一个连接池并启动一些线程,因此您应该在真正使用高级客户端时关闭它,它将反过来关闭内部低级客户端以释放这些资源
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
2、配置客户端
因为高级客户端是基于低级客户端构建的,所以配置选项和低级客户端一致:
@Test
public void config_client() {
//配置可选参数
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
//设置每个请求需要发送的默认headers,这样就不用在每个请求中指定它们。
builder.setDefaultHeaders(defaultHeaders);
//REST客户端是否应该使用gzip内容编码压缩请求,并添加“Accept-encoding:gzip”*标头以接收压缩的响应。
builder.setCompressionEnabled(true);
//是否发送运行环境的元信息
builder.setMetaHeaderEnabled(false);
//为http客户端使用的每个请求设置路径的前缀。例如,如果设置为“/my/path”,则任何客户端请求都将变为 /my/path/ + endpoint
builder.setPathPrefix("/my/test");
//是否包含warnning header的响应都是失败的。
builder.setStrictDeprecationMode(false);
//设置节点选择器(对所有请求都有效),自定义:选择节点名称不以 log_ 开头的
builder.setNodeSelector(new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
Iterator<Node> itr = nodes.iterator();
while (itr.hasNext()) {
if (itr.next().getName().startsWith("log_")) {
itr.remove();
}
}
}
});
//监听器,每次请求失败都会被调用
builder.setFailureListener(new RestClient.FailureListener(){
@Override
public void onFailure(Node node) {
super.onFailure(node);
}
});
//用于修改默认请求配置的回调,可以配置多个参数,如请求超时,是否进行身份验证等
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setSocketTimeout(10000);
}
});
//用于修改默认http客户端配置的回调
//例如,通过SSL的加密通信,或者org.apache.http.impl.nio.client.HttpAsyncClientBuilder允许设置的任何内容)
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setProxy(new HttpHost("proxy", 9000, "http"));
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
3、Basic Auth认证
private RestHighLevelClient client;
@Before
public void auth() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "123456"));
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost("localhost", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
client = new RestHighLevelClient(restClientBuilder);
}
4、请求方式
4.1、同步请求
@Test
public void sync_request() throws IOException {
//创建索引请求
CreateIndexRequest request = new CreateIndexRequest("test").mapping("{\n" +
" \"properties\": {\n" +
" \"title\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"content\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}", XContentType.JSON);
//同步请求
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
System.out.println("acknowledged:" + response.isAcknowledged());
}
acknowledged:true
4.2、异步请求
@Test
public void async_request() {
//创建索引请求
CreateIndexRequest request = new CreateIndexRequest("test").mapping("{\n" +
" \"properties\": {\n" +
" \"title\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"content\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}", XContentType.JSON);
client.indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
//处理响应成功的返回信息
System.out.println("acknowledged:" + createIndexResponse.isAcknowledged());
}
@Override
public void onFailure(Exception e) {
//处理失败信息
System.out.println(e.getMessage());
}
});
}
5、设置请求体
5.1、手动拼接
CreateIndexRequest request = new CreateIndexRequest("test").mapping("{\n" +
" \"properties\": {\n" +
" \"title\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"content\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}", XContentType.JSON);
5.2、XContentBuilder
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
//相当于:{
jsonBuilder.startObject();
//相当于:{"properties":{
jsonBuilder.startObject("properties");
//相当于:{"properties":{"title":{
jsonBuilder.startObject("title");
//相当于:{"properties":{"title":{"type":"keyword"
jsonBuilder.field("type", "keyword");
//相当于:{"properties":{"title":{"type":"keyword"}
jsonBuilder.endObject();
//相当于:{"properties":{"title":{"type":"keyword"},"content":{
jsonBuilder.startObject("content");
//相当于:{"properties":{"title":{"type":"keyword"},"content":{"type":"text"
jsonBuilder.field("type", "text");
//相当于:{"properties":{"title":{"type":"keyword"},"content":{"type":"text"}
jsonBuilder.endObject();
//相当于:{"properties":{"title":{"type":"keyword"},"content":{"type":"text"}}
jsonBuilder.endObject();
//相当于:{"properties":{"title":{"type":"keyword"},"content":{"type":"text"}}}
jsonBuilder.endObject();
CreateIndexRequest request = new CreateIndexRequest("test").mapping(jsonBuilder);
6、Document APIs
6.1、Index API
@Test
public void index1() throws IOException {
IndexRequest request = new IndexRequest("posts");
request.id("1");
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
System.out.println(response.toString());
}
IndexResponse[index=posts,type=_doc,id=1,version=1,result=created,seqNo=0,primaryTerm=1,shards={"total":2,"successful":1,"failed":0}]
6.1.1、提供文档源
通过Map方式提供文档源:
IndexRequest request = new IndexRequest("posts");
request.id("2");
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(jsonMap);
request.source(jsonMap, XContentType.JSON);
通过XContentBuiler提供文档源:
IndexRequest request = new IndexRequest("posts");
request.id("3");
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
request.source(builder);
通过API内置的助手提供:
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
6.1.2、可选参数
//路由
request.routing("routing");
//超时
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
//刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
//版本
request.version(2);
//version type
request.versionType(VersionType.EXTERNAL);
//op_type
request.opType(DocWriteRequest.OpType.CREATE);
request.opType("create");
//pileline
request.setPipeline("pipeline");
6.1.3、响应(Inde Response)
//index
String index = indexResponse.getIndex();
//id
String id = indexResponse.getId();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//第一创建文档的情况
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//更新文档的情况
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数小于总分片数的情况
}
if (shardInfo.getFailed() > 0) {
//处理潜在的故障
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
6.1.4、异常
- 如果发生版本冲突,将会抛出
ElasticsearchException
。 - 如果
op_type
被设置为create
,并且已经存储一个具有相同index和id的文档,也会抛出ElasticsearchException
6.2、Get API
@Test
public void get_test1() throws IOException {
GetRequest getRequest = new GetRequest("posts", "1");
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString());
}
{"user":"kimchy","postDate":"2013-01-30","message":"trying out Elasticsearch"}
6.2.1、可选参数
//是否返回_source,默认不返回
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
//_source_includes和_source_excludes
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
//stored_fields
request.storedFields("message");
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
String message = getResponse.getField("message").getValue();
//路由
request.routing("routing");
//preference
request.preference("preference");
//realtime
request.realtime(false);
//refresh
request.refresh(true);
//version type
request.versionType(VersionType.EXTERNAL);
6.2.2、响应(GetResponse)
String index = getResponse.getIndex();
String id = getResponse.getId();
//是否找到
if (getResponse.isExists()) {
long version = getResponse.getVersion();
//解析为字符串
String sourceAsString = getResponse.getSourceAsString();
//解析为map
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
//解析为数组
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//注意,即使没有找到文档,404状态码,单仍然返回GetResponse。
//所以一定要判断isExists
}
6.2.3、异常
- 当对不存在的索引执行get请求时,响应有404状态码,抛出ElasticsearchException,需要按如下方式处理:
GetRequest request = new GetRequest("does_not_exist", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
}
}
- 如果请求了特定的文档版本,而现有文档具有不同的版本号,则会引发版本冲突:
try {
GetRequest request = new GetRequest("posts", "1").version(2);
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
}
}
6.3、Get Source API
@Test
public void get_source() throws IOException {
GetSourceRequest getSourceRequest = new GetSourceRequest(
"posts",
"1");
GetSourceResponse getSourceResponse = client.getSource(getSourceRequest, RequestOptions.DEFAULT);
System.out.println(getSourceResponse.toString());
}
{postDate=2013-01-30, message=trying out Elasticsearch, user=kimchy}
6.3.1、可选参数
//_source必须为true,同时_source_includes和_source_excludes可选
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"postDate"};
getSourceRequest.fetchSourceContext(
new FetchSourceContext(true, includes, excludes));
//路由
getSourceRequest.routing("routing");
//preference
getSourceRequest.preference("preference");
//realtime
getSourceRequest.realtime(false);
//refresh
getSourceRequest.refresh(true);
6.3.2、响应(GetSourceResponse)
//解析为String
String str = getSourceResponse.toString();
//解析为map
Map<String, Object> source = response.getSource();
6.4、Exists API
使用Get API一样使用GetRequest。支持它的所有可选参数。由于exists()只返回true或false,建议关闭抓取_source和store_fields。
@Test
public void exists() throws IOException {
GetRequest existsRequest = new GetRequest("posts", "1");
existsRequest.fetchSourceContext(new FetchSourceContext(false));
existsRequest.storedFields("_none_");
boolean exists = client.exists(existsRequest, RequestOptions.DEFAULT);
System.out.println(exists);
}
6.5、Delete API
@Test
public void delete() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(
"posts",
"3");
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(deleteResponse.getResult().toString());
}
DELETED
6.5.1、可选参数
//路由
request.routing("routing");
//超时
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
//刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
//version
request.version(2);
//version type
request.versionType(VersionType.EXTERNAL);
6.5.2、响应(DeleteResponse)
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//成功的分片数与总分片数不同的情况
}
if (shardInfo.getFailed() > 0) {
//处理潜在的错误
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
判断是否删除成功:
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
//未找到
}
if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) {
//成功删除
}
6.5.3、异常
版本冲突则抛出ElasticsearchException异常:
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
}
}
6.6、Update API
6.6.1、使用inline脚本更新
@Test
public void update() throws IOException {
UpdateRequest request = new UpdateRequest(
"posts",
"1");
//脚本参数
Map<String, Object> parameters = new HashMap<>();
parameters.put("name", "tom");
//内置脚本
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.count = params.name", parameters);
request.script(inline);
//执行
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
System.out.println(updateResponse.toString());
}
UpdateResponse[index=posts,type=_doc,id=1,version=2,seqNo=4,primaryTerm=1,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
6.6.2、使用stored脚本更新
Script stored = new Script(
ScriptType.STORED, null, "increment-field", parameters);
request.script(stored);
6.6.3、使用部分文档更新
@Test
public void update_by_partdoc() throws IOException {
UpdateRequest request = new UpdateRequest(
"posts",
"1");
//文档源可以也可以使用map、XContentBuilder或者内置方式提供
String jsonString = "{" +
"\"name\":\"jerry\"" +
"}";
request.doc(jsonString, XContentType.JSON);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
System.out.println(updateResponse.toString());
}
6.6.4、Upserts
如果文档不存在,则可以使用upsert方法定义一些将作为新文档插入的内容:
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
6.6.5、可选参数
//路由
request.routing("routing");
//超时
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
//刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
//重试次数
request.retryOnConflict(3);
//是否获取_source
request.fetchSource(true);
//_source_include和_source_exclude
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
new FetchSourceContext(true, includes, excludes));
//版本控制
request.setIfSeqNo(2L);
request.setIfPrimaryTerm(1L);
//是否关闭noop检测
request.detectNoop(false);
//指示无论文档是否存在,脚本都必须运行,即如果文档不存在,脚本将负责创建文档。
request.scriptedUpsert(true);
//如果部分文档还不存在,则指示必须将其用作起始文档。
request.docAsUpsert(true);
//设置在继续更新操作之前必须处于活动状态的分片副本数量。
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
6.6.6、响应(UpdateResponse)
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//处理第一次创建文档的情况
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//处理文档被更新的情况
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//处理文档被删除的情况
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//处理文档未受更新影响的情况,即未对文档执行任何操作(noop)
}
当通过fetchSource方法在UpdateRequest中启用_source字段时,响应包含更新文档的源:
//返回_source的情况
GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
//解析为String
String sourceAsString = result.sourceAsString();
//解析为map
Map<String, Object> sourceAsMap = result.sourceAsMap();
//解析为byte[]
byte[] sourceAsBytes = result.source();
} else {
//处理响应中不存在文档源的场景(这是默认情况)
}
也可以检查分片故障:
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
6.6.7、异常
- 当对不存在的文档执行UpdateRequest时,响应具有404状态码,抛出ElasticsearchException,需要按如下方式处理:
UpdateRequest request = new UpdateRequest("posts", "does_not_exist")
.doc("field", "value");
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
}
}
- 处理版本冲突问题:
UpdateRequest request = new UpdateRequest("posts", "1")
.doc("field", "value")
.setIfSeqNo(101L)
.setIfPrimaryTerm(200L);
try {
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}
6.7、Term Vectors API
6.7.1、使用已经索引的文档
@Test
public void term_vectors() throws IOException {
TermVectorsRequest request = new TermVectorsRequest("twitter", "1");
//使用已经索引的文档字段
request.setFields("text");
//指标参数
request.setFieldStatistics(false);
request.setTermStatistics(true);
request.setPositions(false);
request.setOffsets(false);
request.setPayloads(false);
TermVectorsResponse termVectorsResponse = client.termvectors(request, RequestOptions.DEFAULT);
System.out.println(termVectorsResponse.getTermVectorsList());
}
6.7.2、使用人工文档
//使用人工文档
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("text","hello world").endObject();
TermVectorsRequest request = new TermVectorsRequest("twitter", "1");
6.7.3、可选参数
//指标参数
request.setFieldStatistics(false);
request.setTermStatistics(true);
request.setPositions(false);
request.setOffsets(false);
request.setPayloads(false);
//过滤器参数
Map<String, Integer> filterSettings = new HashMap<>();
filterSettings.put("max_num_terms", 3);
filterSettings.put("min_term_freq", 1);
filterSettings.put("max_term_freq", 10);
filterSettings.put("min_doc_freq", 1);
filterSettings.put("max_doc_freq", 100);
filterSettings.put("min_word_length", 1);
filterSettings.put("max_word_length", 10);
request.setFilterSettings(filterSettings);
//字段分析器
Map<String, String> perFieldAnalyzer = new HashMap<>();
perFieldAnalyzer.put("user", "keyword");
request.setPerFieldAnalyzer(perFieldAnalyzer);
//realtime
request.setRealtime(false);
//路由
request.setRouting("routing");
6.7.4、响应(TermVectorsResponse)
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
//是否找到文档
boolean found = response.getFound();
//解析
for (TermVectorsResponse.TermVector tv : response.getTermVectorsList()) {
String fieldname = tv.getFieldName();
int docCount = tv.getFieldStatistics().getDocCount();
long sumTotalTermFreq =
tv.getFieldStatistics().getSumTotalTermFreq();
long sumDocFreq = tv.getFieldStatistics().getSumDocFreq();
if (tv.getTerms() != null) {
List<TermVectorsResponse.TermVector.Term> terms =
tv.getTerms();
for (TermVectorsResponse.TermVector.Term term : terms) {
String termStr = term.getTerm();
int termFreq = term.getTermFreq();
int docFreq = term.getDocFreq();
long totalTermFreq = term.getTotalTermFreq();
float score = term.getScore();
if (term.getTokens() != null) {
List<TermVectorsResponse.TermVector.Token> tokens =
term.getTokens();
for (TermVectorsResponse.TermVector.Token token : tokens) {
int position = token.getPosition();
int startOffset = token.getStartOffset();
int endOffset = token.getEndOffset();
String payload = token.getPayload();
}
}
}
}
}
6.8、Bulk API
@Test
public void bulk() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("article").id("1")
.source(XContentType.JSON,"title", "title1", "content", "content1"));
request.add(new IndexRequest("article").id("2")
.source(XContentType.JSON,"title", "title2", "content", "content2"));
request.add(new IndexRequest("article").id("3")
.source(XContentType.JSON,"title", "title1", "content", "content3"));
request.add(new DeleteRequest("article", "3"));
request.add(new UpdateRequest("article", "2")
.doc(XContentType.JSON,"title", "updateTitle"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
}
6.8.1、可选参数
//超时
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
//刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
//设置在执行索引/更新/删除操作之前必须处于活动状态的分片副本数量。
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
//pileline
request.pipeline("pipelineId");
//路由
request.routing("routingId");
6.8.2、响应(BulkResponse)
返回的BulkResponse包含关于执行操作的信息,并允许迭代每个结果,如下所示:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
Bulk响应提供了一个方法来快速检查一个或多个操作是否失败:
if (bulkResponse.hasFailures()) {
}
在这种情况下,有必要遍历所有操作结果以检查操作是否失败,如果失败,则检索相应的失败:
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
}
}
6.8.3、Bulk Processor
BulkProcessor通过提供一个实用程序类简化了Bulk API的使用,该实用程序类允许在添加到处理器时透明地执行索引/更新/删除操作。
为了执行请求,BulkProcessor需要以下组件:
- RestHighLevelClient:
该客户端用于执行BulkRequest和检索BulkResponse - BulkProcessor.Listener
此侦听器在每次BulkRequest执行前后或BulkRequest失败时调用
使用BulkProcessor.builder方法可以创建一个BulkProcessor:
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//每次执行BulkRequest之前调用
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//此方法在每次执行BulkRequest后调用
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//此方法在BulkRequest失败时调用
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
配置BulkProcessor:
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
//根据当前添加的操作数设置何时刷新新的批量请求(默认为1000,使用-1禁用)
builder.setBulkActions(500);
//根据当前添加的操作大小设置何时刷新新的批量请求(默认为5Mb,使用-1禁用)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
//设置允许执行的并发请求数(默认为1,使用0只允许执行单个请求)
builder.setConcurrentRequests(0);
//设置刷新间隔,如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
//设置一个固定的后退策略,初始等待1秒,然后重试最多3次
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
一旦创建了BulkProcessor,请求就可以添加到它:
//构建请求
IndexRequest one = new IndexRequest("article").id("4")
.source(XContentType.JSON,"title", "title4", "content", "content4");
IndexRequest two = new IndexRequest("article").id("5")
.source(XContentType.JSON,"title", "title5", "content", "content5");
//添加到BulkProcessor
processor.add(one);
processor.add(two);
一旦所有请求都被添加到BulkProcessor中,它的实例需要使用两种可用的关闭方法之一来关闭。
awaitClose()方法可以用来等待,直到所有请求都被处理或指定的等待时间结束:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
或者立即关闭:
bulkProcessor.close();
完整示例:
@Test
public void bulk_processor() throws InterruptedException {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//每次执行BulkRequest之前调用
int numberOfActions = request.numberOfActions();
System.out.println("请求数量为:" + numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//此方法在每次执行BulkRequest后调用
if (response.hasFailures()) {
System.out.println("存在失败的请求");
} else {
System.out.println("全部成功,共花费:" + response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//此方法在BulkRequest失败时调用
System.out.println(failure.getMessage());
}
};
//配置
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
//根据当前添加的操作数设置何时刷新新的批量请求(默认为1000,使用-1禁用)
builder.setBulkActions(500);
//根据当前添加的操作大小设置何时刷新新的批量请求(默认为5Mb,使用-1禁用)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
//设置允许执行的并发请求数(默认为1,使用0只允许执行单个请求)
builder.setConcurrentRequests(0);
//设置刷新间隔,如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
//设置一个固定的后退策略,初始等待1秒,然后重试最多3次
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor processor = builder.build();
//构建请求
IndexRequest one = new IndexRequest("article").id("4")
.source(XContentType.JSON,"title", "title4", "content", "content4");
IndexRequest two = new IndexRequest("article").id("5")
.source(XContentType.JSON,"title", "title5", "content", "content5");
//添加到BulkProcessor
processor.add(one);
processor.add(two);
//等待30s或全部请求完毕
boolean terminated = processor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println(terminated);
}
请求数量为:2
全部成功,共花费:239
true
6.9、Multi-Get API
@Test
public void multi_get() throws IOException {
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("article", "1"));
request.add(new MultiGetRequest.Item("article", "4"));
MultiGetResponse response = client.multiGet(request, RequestOptions.DEFAULT);
}
6.9.1、可选参数
multiGet支持与get API支持的相同的可选参数。您可以在Item中设置其中的大部分:
//_source_include和_source_exclude
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id")
.fetchSourceContext(fetchSourceContext));
//路由
request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing"));
//type
request.add(new MultiGetRequest.Item("index", "with_version")
.versionType(VersionType.EXTERNAL)
.version(10123L));
//preference、realtime、refresh
request.preference("some_preference");
request.realtime(false);
request.refresh(true);
6.9.2、响应(MultiGetResponse)
返回的MultiGetResponse包含一个MultiGetItemResponse的列表,Responses的顺序与请求的顺序相同。
如果GET请求成功,则MultiGetItemResponse包含一个GetResponse或者MultiGetResponse。
如果请求失败,则MultiGetItemResponse也包含一个GetResponse。(需要自己判断)
MultiGetItemResponse[] arr = response.getResponses();
for (int i = 0; i < arr.length; i++) {
MultiGetItemResponse item = arr[i];
//是否失败
System.out.println(item.getFailure());
item.getId();
item.getId();
GetResponse r = item.getResponse();
if (r.isExists()) {
long version = r.getVersion();
String sourceAsString = r.getSourceAsString();
Map<String, Object> sourceAsMap = r.getSourceAsMap();
byte[] sourceAsBytes = r.getSourceAsBytes();
} else {
}
}
6.9.3、异常
当对不存在的索引执行其中一个子请求时,getFailure将包含一个异常:
assertNull(missingIndexItem.getResponse());
Exception e = missingIndexItem.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.NOT_FOUND, ee.status());
assertThat(e.getMessage(),
containsString("reason=no such index [missing_index]"));
如果发生了版本冲突:
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id")
.version(1000L));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
assertNull(item.getResponse());
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.CONFLICT, ee.status());
assertThat(e.getMessage(),
containsString("version conflict, current version [1] is "
+ "different than the one provided [1000]"));
6.10、Reindex API
@Test
public void reindex() throws IOException {
ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1", "source2");
request.setDestIndex("dest");
//可以使用Task API提交请求而不等待它完成。
//这相当于将wait_for_completion标志设置为false的REST请求。
TaskSubmissionResponse response = client.submitReindexTask(request, RequestOptions.DEFAULT);
String taskId = response.getTask();
}
6.10.1、可选参数
//version_type
request.setDestVersionType(VersionType.EXTERNAL);
//op_type
request.setDestOpType("create");
//版本冲突
request.setConflicts("proceed");
//query
request.setSourceQuery(new TermQueryBuilder("user", "kimchy"));
//maxdocs
request.setMaxDocs(10);
//批数量,默认1000
request.setSourceBatchSize(100);
//pipeline
request.setDestPipeline("my_pipeline");
//script
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
//远端源
request.setRemoteInfo(
new RemoteInfo(
"http", remoteHost, remotePort, null,
new BytesArray(new MatchAllQueryBuilder().toString()),
user, password, Collections.emptyMap(),
new TimeValue(100, TimeUnit.MILLISECONDS),
new TimeValue(100, TimeUnit.SECONDS)
)
);
//分片数量
request.setSlices(2);
//scroll
request.setScroll(TimeValue.timeValueMinutes(10));
//timeout
request.setTimeout(TimeValue.timeValueMinutes(2));
//refresh
request.setRefresh(true);
6.10.2、响应
也可以使用Task API提交请求而不等待它完成。这相当于将wait_for_completion标志设置为false的REST请求。
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setRefresh(true);
TaskSubmissionResponse reindexSubmission = highLevelClient()
.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
String taskId = reindexSubmission.getTask();
同步调用返回:BulkByScrollResponse
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
//花费的总时间
TimeValue timeTaken = bulkResponse.getTook();
//是否请求超时
boolean timedOut = bulkResponse.isTimedOut();
//已处理的文档的总数
long totalDocs = bulkResponse.getTotal();
//已更新的文档数
long updatedDocs = bulkResponse.getUpdated();
//已创建的文档数
long createdDocs = bulkResponse.getCreated();
//已删除的文档数
long deletedDocs = bulkResponse.getDeleted();
//已执行的批次数
long batches = bulkResponse.getBatches();
//跳过的文档数
long noops = bulkResponse.getNoops();
//版本冲突数量
long versionConflicts = bulkResponse.getVersionConflicts();
//请求必须重试批量索引操作的次数
long bulkRetries = bulkResponse.getBulkRetries();
//请求必须重试搜索操作的次数
long searchRetries = bulkResponse.getSearchRetries();
//此请求进行自身节流的总时间,如果它当前处于睡眠状态,则不包括当前的节流时间
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
//任何当前节流休眠的剩余延迟,如果没有休眠,则为0
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
//搜索截断的故障
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
//大容量索引操作期间的失败
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
6.11、Update By Query API
@Test
public void update_by_query() throws IOException {
UpdateByQueryRequest request =
new UpdateByQueryRequest("person");
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.name == 'tom') {ctx._source.age++;}",
Collections.emptyMap()));
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
}
6.11.1、可选参数
//冲突处理
request.setConflicts("proceed");
//query
request.setQuery(new TermQueryBuilder("user", "kimchy"));
//maxsocs
request.setMaxDocs(10);
//batchsize
request.setBatchSize(100);
//pipeline
request.setPipeline("my_pipeline");
//script
request.setScript(
new Script(
ScriptType.INLINE, "painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
Collections.emptyMap()));
//分片数
request.setSlices(2);
//scroll
request.setScroll(TimeValue.timeValueMinutes(10));
//路由
request.setRouting("=cat");
//超时
request.setTimeout(TimeValue.timeValueMinutes(2));
//refresh
request.setRefresh(true);
//indices options
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
6.11.2、响应(BulkByScrollResponse)
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
//花费的总时间
TimeValue timeTaken = bulkResponse.getTook();
//是否超时
boolean timedOut = bulkResponse.isTimedOut();
//已处理的文档总数
long totalDocs = bulkResponse.getTotal();
//已更新的文档数
long updatedDocs = bulkResponse.getUpdated();
//已删除的文档数
long deletedDocs = bulkResponse.getDeleted();
//已执行的批次数
long batches = bulkResponse.getBatches();
//跳过的文档数
long noops = bulkResponse.getNoops();
//版本冲突数
long versionConflicts = bulkResponse.getVersionConflicts();
//请求必须重试批量索引操作的次数
long bulkRetries = bulkResponse.getBulkRetries();
//请求必须重试搜索操作的次数
long searchRetries = bulkResponse.getSearchRetries();
//此请求进行自身节流的总时间,如果它当前处于睡眠状态,则不包括当前的节流时间
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
//任何当前节流休眠的剩余延迟,如果没有休眠,则为0
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
//搜索阶段的故障
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
//大容量索引操作期间的失败
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
6.12、Delete By Query API
@Test
public void delete_by_query() throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest("person");
request.setQuery(new TermQueryBuilder("name", "tom"));
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
}
6.12.1、可选参数
//冲突处理
request.setConflicts("proceed");
//query
request.setQuery(new TermQueryBuilder("user", "kimchy"));
//maxsocs
request.setMaxDocs(10);
//分片数
request.setSlices(2);
//scroll
request.setScroll(TimeValue.timeValueMinutes(10));
//路由
request.setRouting("=cat");
//超时
request.setTimeout(TimeValue.timeValueMinutes(2));
//refresh
request.setRefresh(true);
//indices options
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
6.12.2、响应(BulkByScrollResponse)
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
//花费的总时间
TimeValue timeTaken = bulkResponse.getTook();
//是否超时
boolean timedOut = bulkResponse.isTimedOut();
//已处理的文档总数
long totalDocs = bulkResponse.getTotal();
//已删除的文档数
long deletedDocs = bulkResponse.getDeleted();
//已执行的批次数
long batches = bulkResponse.getBatches();
//跳过的文档数
long noops = bulkResponse.getNoops();
//版本冲突数
long versionConflicts = bulkResponse.getVersionConflicts();
//请求必须重试批量索引操作的次数
long bulkRetries = bulkResponse.getBulkRetries();
//请求必须重试搜索操作的次数
long searchRetries = bulkResponse.getSearchRetries();
//此请求进行自身节流的总时间,如果它当前处于睡眠状态,则不包括当前的节流时间
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
//任何当前节流休眠的剩余延迟,如果没有休眠,则为0
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
//搜索阶段的故障
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
//大容量索引操作期间的失败
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
6.13、Rethrottle API
RethrottleRequest可用于更改正在运行的reindex、update_by_query或update_by_delete任务的当前限制,或者完全禁用任务的限制。
@Test
public void rethrottle() {
RethrottleRequest request = new RethrottleRequest(new TaskId("1"));
//请求将任务的限制更改为每秒100个请求
// RethrottleRequest request = new RethrottleRequest(taskId, 100.0f);
//根据reindex、update-by-query或delete-by-query任务是否需要重新节流,
//可以使用以下三种适当的同步方法之一来执行重新节流请求:
// client.reindexRethrottle(request, RequestOptions.DEFAULT);
// client.updateByQueryRethrottle(request, RequestOptions.DEFAULT);
// client.deleteByQueryRethrottle(request, RequestOptions.DEFAULT);
//异步方法
client.reindexRethrottleAsync(request,
RequestOptions.DEFAULT, new ActionListener<org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse>() {
@Override
public void onResponse(org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse listTasksResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
client.updateByQueryRethrottleAsync(request,
RequestOptions.DEFAULT, new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse listTasksResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
client.deleteByQueryRethrottleAsync(request,
RequestOptions.DEFAULT, new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse listTasksResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
}
6.14、Multi Term Vectors API
有两种方法可以创建MultiTermVectorsRequest。
- 第一种方法是创建一个空的MultiTermVectorsRequest,然后向其中添加单个术语向量请求。
- 第二种方法可以在所有术语向量请求共享相同参数时使用,例如索引和其他设置。在这种情况下,可以创建模板TermVectorsRequest并设置所有必要的设置,并且可以将该模板请求与执行这些请求的所有文档的id一起传递给MultiTermVectorsRequest。
@Test
public void multi_term_vectors() throws IOException {
//第一种方式:
//创建一个空的MultiTermVectorsRequest
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
//添加第一个TermVectorsRequest
TermVectorsRequest tvrequest1 = new TermVectorsRequest("authors", "1");
tvrequest1.setFields("user");
request.add(tvrequest1);
//添加第二个TermVectorsRequest
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "guest-user").endObject();
TermVectorsRequest tvrequest2 = new TermVectorsRequest("authors", docBuilder);
request.add(tvrequest2);
//第二种方式:
TermVectorsRequest tvrequestTemplate = new TermVectorsRequest("authors", "fake_id");
tvrequestTemplate.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request2 = new MultiTermVectorsRequest(ids, tvrequestTemplate);
//同步请求
MultiTermVectorsResponse response = client.mtermvectors(request, RequestOptions.DEFAULT);
//响应
List<TermVectorsResponse> tvresponseList = response.getTermVectorsResponses();
if (tvresponseList != null) {
for (TermVectorsResponse tvresponse : tvresponseList) {
}
}
//异步请求
client.mtermvectorsAsync(request, RequestOptions.DEFAULT, new ActionListener<MultiTermVectorsResponse>() {
@Override
public void onResponse(MultiTermVectorsResponse multiTermVectorsResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
}