Low Level REST Client
Elasticsearch Client低级别客户端。它允许通过HTTP请求与Elasticsearch集群进行通信。API本身不负责数据的编码解码,由用户去编码解码。它与所有的Elasticsearch版本兼容。这种方式好处就是兼容所有的 Es 版本。但是就是数据处理比较麻烦。
引入依赖:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.16.0</version>
</dependency>
1、初始化及配置
RestClient实例可以通过RestClientBuilder类创建,通过RestClient 的 builder(HttpHost …)静态方法创建。 唯一需要的参数是客户端将与之通信的一个或多个主机,如下所示:
@Test
public void create_client() {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")).build();
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
RestClient类是线程安全的,理想情况下与使用它的应用程序具有相同的生命周期。当不再需要时关闭它是非常重要的,这样它所使用的所有资源以及底层http客户端实例及其线程都可以得到释放。
RestClientBuilder还允许在构建RestClient实例时可选地设置以下配置参数:
@Test
public void config_clent() {
//配置可选参数
RestClientBuilder builder = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
//设置每个请求需要发送的默认headers,这样就不用在每个请求中指定它们。
builder.setDefaultHeaders(defaultHeaders);
//REST客户端是否应该使用gzip内容编码压缩请求,并添加“Accept-encoding:gzip”*标头以接收压缩的响应。
builder.setCompressionEnabled(true);
//是否发送运行环境的元信息
builder.setMetaHeaderEnabled(false);
//为http客户端使用的每个请求设置路径的前缀。例如,如果设置为“/my/path”,则任何客户端请求都将变为 /my/path/ + endpoint
builder.setPathPrefix("/my/test");
//是否包含warnning header的响应都是失败的。
builder.setStrictDeprecationMode(false);
//设置节点选择器(对所有请求都有效),自定义:选择节点名称不以 log_ 开头的
builder.setNodeSelector(new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
Iterator<Node> itr = nodes.iterator();
while (itr.hasNext()) {
if (itr.next().getName().startsWith("log_")) {
itr.remove();
}
}
}
});
//监听器,每次请求失败都会被调用
builder.setFailureListener(new RestClient.FailureListener(){
@Override
public void onFailure(Node node) {
super.onFailure(node);
}
});
//用于修改默认请求配置的回调,可以配置多个参数,如请求超时,是否进行身份验证等
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setSocketTimeout(10000);
}
});
//用于修改默认http客户端配置的回调
//例如,通过SSL的加密通信,或者org.apache.http.impl.nio.client.HttpAsyncClientBuilder允许设置的任何内容)
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setProxy(new HttpHost("proxy", 9000, "http"));
}
});
RestClient clien = builder.build();
try {
clien.close();
} catch (IOException e) {
e.printStackTrace();
}
}
2、Base Auth认证
private RestClient restClient;
@Before
public void auth() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "123456"));
restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).build();
}
3、执行请求
一旦创建了RestClient,就可以调用performRequest或performRequestAsync方法来发送请求。
- performRequest方法是同步的,直接返回响应,这意味着客户端将被阻塞并等待响应返回。
- performRequestAsync方法返回void,并接受一个ResponseListener作为参数,这意味着它们是异步执行的。当请求完成或失败时,监听器将被通知。
3.1、配置Request
@Test
public void config_request() throws IOException {
//构建Request,两个必填参数 request-method endpoint
Request request = new Request("POST", "/_analyze");
//设置查询参数,即 /_analyze?pretty=true
request.addParameter("pretty", "true");
//设置请求体和ContentType:一般就是JSON字符串
//第一种方式:使用NStringEntity,此时一定要设置Content-Type为ContentType.APPLICATION_JSON
// String jsonString = "{" +
// "\"analyzer\":\"ik_max_word\"," +
// "\"text\":\"我是中国人\"" +
// "}";
// HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
// request.setEntity(entity);
//第二种方式:使用JsonEntity,此时不需要设置Content-Type,因为方法内部已经自动设置了ContentType.APPLICATION_JSON
JSONObject jsonEntity = new JSONObject();
jsonEntity.put("analyzer", "ik_max_word");
jsonEntity.put("text", "我是中国人");
request.setJsonEntity(jsonEntity.toJSONString());
//配置选项
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("key", "value"));
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity()));
}
{
"tokens" : [
{
"token" : "我",
"start_offset" : 0,
"end_offset" : 1,
"type" : "CN_CHAR",
"position" : 0
},
{
"token" : "是",
"start_offset" : 1,
"end_offset" : 2,
"type" : "CN_CHAR",
"position" : 1
},
{
"token" : "中国人",
"start_offset" : 2,
"end_offset" : 5,
"type" : "CN_WORD",
"position" : 2
},
{
"token" : "中国",
"start_offset" : 2,
"end_offset" : 4,
"type" : "CN_WORD",
"position" : 3
},
{
"token" : "国人",
"start_offset" : 3,
"end_offset" : 5,
"type" : "CN_WORD",
"position" : 4
}
]
}
3.2、同步发送请求
@Test
public void sync_request() throws IOException {
Request request = new Request("GET", "/_count");
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity(), "UTF-8"));
}
{"count":33607,"_shards":{"total":12,"successful":12,"skipped":0,"failed":0}}
3.3、异步发送请求
@Test
public void async_request() {
CountDownLatch latch = new CountDownLatch(1);
Request request = new Request("GET", "/_count");
restClient.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
//处理返回的响应内容
try {
System.out.println(EntityUtils.toString(response.getEntity(), "UTF-8"));
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(Exception exception) {
//处理返回的异常
System.out.println(exception.getMessage());
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
{"count":33607,"_shards":{"total":12,"successful":12,"skipped":0,"failed":0}}
4、解析Response
@Test
public void analyze_response() throws IOException {
Request request = new Request("GET", "/_count");
Response response = restClient.performRequest(request);
//响应码
System.out.println(response.getStatusLine().getStatusCode());
//协议
System.out.println(response.getStatusLine().getProtocolVersion());
//响应码对应的描述短语
System.out.println(response.getStatusLine().getReasonPhrase());
//Host
System.out.println(response.getHost());
//Headers
Header[] headers = response.getHeaders();
for (int i = 0; i < headers.length; i++) {
System.out.println(headers[i].getName() + ":" + headers[i].getValue());
}
//warnings
List<String> warnings = response.getWarnings();
for (String warning : warnings) {
System.out.println(warning);
}
//响应体
System.out.println(EntityUtils.toString(response.getEntity(), "UTF-8"));
}
200
HTTP/1.1
OK
http://localhost:9200
X-elastic-product:Elasticsearch
Warning:299 Elasticsearch-7.16.0-6fc81662312141fe7691d7c1c91b8658ac17aa0d "this request accesses system indices: [.apm-agent-configuration, .apm-custom-link, .fleet-policies-7, .kibana_7.16.0_001, .kibana_task_manager_7.16.0_001, .security-7, .tasks], but in a future major version, direct access to system indices will be prevented by default"
content-type:application/json; charset=UTF-8
content-length:77
this request accesses system indices: [.apm-agent-configuration, .apm-custom-link, .fleet-policies-7, .kibana_7.16.0_001, .kibana_task_manager_7.16.0_001, .security-7, .tasks], but in a future major version, direct access to system indices will be prevented by default
{"count":33607,"_shards":{"total":12,"successful":12,"skipped":0,"failed":0}}
5、简单示例
5.1、搜索
@Test
public void search_match() throws IOException {
Request request = new Request("GET", "/bank/_search?pretty=true");
NStringEntity entity = new NStringEntity("{\n" +
" \"query\": {\n" +
" \"match\": {\n" +
" \"address\": \"street\"\n" +
" }\n" +
" },\n" +
" \"from\": 0,\n" +
" \"size\": 2\n" +
"}", ContentType.APPLICATION_JSON);
request.setEntity(entity);
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity(), "UTF-8"));
}
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 385,
"relation" : "eq"
},
"max_score" : 0.95395315,
"hits" : [
{
"_index" : "bank",
"_type" : "_doc",
"_id" : "6",
"_score" : 0.95395315,
"_source" : {
"account_number" : 6,
"balance" : 5686,
"firstname" : "Hattie",
"lastname" : "Bond",
"age" : 36,
"gender" : "M",
"address" : "671 Bristol Street",
"employer" : "Netagy",
"email" : "hattiebond@netagy.com",
"city" : "Dante",
"state" : "TN"
}
},
{
"_index" : "bank",
"_type" : "_doc",
"_id" : "13",
"_score" : 0.95395315,
"_source" : {
"account_number" : 13,
"balance" : 32838,
"firstname" : "Nanette",
"lastname" : "Bates",
"age" : 28,
"gender" : "F",
"address" : "789 Madison Street",
"employer" : "Quility",
"email" : "nanettebates@quility.com",
"city" : "Nogal",
"state" : "VA"
}
}
]
}
}
5.2、设置Mapping
@Test
public void put_mapping() throws IOException {
Request request = new Request("PUT", "/test_index");
NStringEntity entity = new NStringEntity(
"{\n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 1\n" +
" },\n" +
" \"mappings\": {\n" +
" \"properties\": {\n" +
" \"title\": {\n" +
" \"type\": \"keyword\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n" +
"\n", ContentType.APPLICATION_JSON);
request.setEntity(entity);
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity()));
}
{"acknowledged":true,"shards_acknowledged":true,"index":"test_index"}
5.3、高亮结果
@Test
public void highlight() throws IOException {
Request request = new Request("GET", "/bank/_search?pretty=true");
NStringEntity entity = new NStringEntity("{\n" +
" \"query\": {\n" +
" \"term\": {\n" +
" \"city.keyword\": {\n" +
" \"value\": \"Brogan\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"highlight\": {\n" +
" \"fields\": {\"city.keyword\": {}}\n" +
" }\n" +
"}"
, ContentType.APPLICATION_JSON);
request.setEntity(entity);
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity()));
}
{
"took" : 45,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 6.5032897,
"hits" : [
{
"_index" : "bank",
"_type" : "_doc",
"_id" : "1",
"_score" : 6.5032897,
"_source" : {
"account_number" : 1,
"balance" : 39225,
"firstname" : "Amber",
"lastname" : "Duke",
"age" : 32,
"gender" : "M",
"address" : "880 Holmes Lane",
"employer" : "Pyrami",
"email" : "amberduke@pyrami.com",
"city" : "Brogan",
"state" : "IL"
},
"highlight" : {
"city.keyword" : [
"<em>Brogan</em>"
]
}
}
]
}
}
5.4、聚合
日期直方图聚合:
@Test
public void date_histogram() throws IOException {
Request request = new Request("GET", "/kibana_sample_data_flights/_search?pretty=true");
NStringEntity entity = new NStringEntity("{\n" +
" \"track_total_hits\": true,\n" +
" \"size\": 0,\n" +
" \"aggs\": {\n" +
" \"timestamp_histogram\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"timestamp\",\n" +
" \"calendar_interval\": \"1M\",\n" +
" \"format\": \"yyyy-MM-dd\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}"
, ContentType.APPLICATION_JSON);
request.setEntity(entity);
Response response = restClient.performRequest(request);
System.out.println(EntityUtils.toString(response.getEntity()));
}
{
"took" : 10,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 13059,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"timestamp_histogram" : {
"buckets" : [
{
"key_as_string" : "2023-10-01",
"key" : 1696118400000,
"doc_count" : 7165
},
{
"key_as_string" : "2023-11-01",
"key" : 1698796800000,
"doc_count" : 5894
}
]
}
}
}