操作文档
-
单文档:
- Index API
- Get API
- Delete API
- Update API
-
多文档:
- Multi Get API
- Bulk API
- Delete By Query API
- Update By Query API
- Reindex API
所有CRUD API都是单索引API。index参数接受一个索引名,或 一个指向单个索引的别名(alias)。
单文档API
1、Index API
索引API在特定索引中添加或更新JSON文档,使其可搜索。
//4种语法
PUT /<index>/_doc/<_id>
POST /<index>/_doc/
PUT /<index>/_create/<_id>
POST /<index>/_create/<_id>
如果target是data stream,则如果指定ID则不能使用PUT /<target>/_doc/<_id>
,应使用PUT /<target>/_create/<_id>
path参数:
-
<target>
:(必须, string) data stream 、 index 的名称。- 如果target不存在,且与data stream的index template的名称或 wildcard (*) 匹配,则自动创建对应的data stream。
- 如果target不存在,且与data stream的template的名称规则不匹配,则自动创建index。
-
<_id>
:(可选, string) 文档的唯一ID,不指定ID时使用 POST //_doc 。
query参数:
-
if_seq_no
:(可选, integer) 只有当index的if_seq_no是该值时才会处理该文档,否则409 Version Conflict。乐观锁控制,因为index每处理一个文档就会if_seq_no+1。 -
if_primary_term
:(可选, integer) 只有当该文档所在的primary分片号是if_primary_term时才会处理该文档,否则409 Version Conflict。乐观锁控制。 -
op_type
:(可选, enum)当设置为create
时的处理机制是文档不存在才处理 (相当于put if absent),如果文档已存在,处理会失败。可选值index
,create
。当ID指定时,默认值=index , 否则默认=create -
pipeline
:(可选, string) 指定一个pipeline的ID来触发。 -
refresh
:(可选, enum)true
:立即刷新关联的shard。wait_for
:等待response直到index刷新。false
:默认,无处理。
-
routing
:(可选, string) 分片的路由参数。 -
timeout
:(可选, time units) 对以下动作的超时时间,默认1m。 -
version
:(可选, integer) 乐观锁,指定ID时使用. -
version_type
:(可选, enum) Specific version type: external, external_gte. -
wait_for_active_shards
:(可选, string) 这个参数覆盖了index.write.wait_for_active_shards。预先检查shard的可用数量,如果当时可用的shard数量小于配置数,则ES会重试直到超时,默认1即只需primary,可选all或 1 - number_of_replicas+1 。
因为文档在索引时会从primary尽力的同步到各个可用的replica,所以该配置是预先检查机制,提供数据的可靠性。 -
require_alias
:(可选, Boolean) 如果是true,则要求目标是一个index的别名。默认false.
request body参数:
JSON
response body参数:
_shards
: 分片信息._shards.total
: 总的分片数量primary and replica shards._shards.successful
: 成功的分片数量,当操作成功时,successful至少是1。_shards.failed
: 失败的分片数量,0表示没有失败。_index
: index的名称。_id
: 文档的ID._version
: 文档的版本._seq_no
: 对应的index的_seq_no._primary_term
: primary分片号.result
: 处理结果是 created or updated.
下面的示例将ID为1文档插入到twitter索引中:
PUT twitter/_doc/1
{
"user":"kimchy",
"post_date":"2009-11-15T14:12:12",
"message":"trying out Elasticsearch"
}
默认情况下,当索引操作成功返回时,有可能一些replica 还没有开始或完成,因为只要primary成功执行,就会返回,这种行 为可以调整。其实这样做的目的是快速响应,一般的场景并不需要等 待所有分片都完成索引操作再返回,除非对数据安全要求极高的场景。
1.1、自动创建索引(action.auto_create_index)
当索引文档时,如果索引不存在,会自动创建索引。索引操作还 将创建一个动态映射(如果尚未存在)。默认情况下,如果需要,新 字段和对象将自动添加到映射定义中。
自动创建索引由action.auto_create_index
设置控制。此设置默 认为true,这意味着索引总是自动创建的。也可以设置只有匹配特定 模式的索引才允许自动创建索引,方法是将此设置的值更改为这些匹 配模式的逗号分隔列表。还可以通过在列表中的模式前面加上+或-, 明确地允许和禁止使用它。最后,通过将此设置更改为false,可以完全禁用它。可以在elasticsearch.yml中配置,也可以通过如下URL 配置。
只允许自动创建名为twitter、index10的索引,不允 许创建其他与index1*匹配的索引:
PUT _cluster/settings
{
"persistent": {
"action.auto_create_index": "twitter,index10,-index1*,+ind*"
}
}
完全禁用索引的自动创建:
PUT _cluster/settings
{
"persistent": {
"action.auto_create_index": "false"
}
}
允许使用任何名称自动创建索引,这是默认设置:
PUT _cluster/settings
{
"persistent": {
"action.auto_create_index": "true"
}
}
1.2、强制创建(op_type)
索引操作还接受一个op_type
参数,它可以用来强制create操 作,允许put-if-absent的行为。使用create时,如果索引中已存在具有该ID的文档,则索引操作将失败。
可以这样来理解,当索引文档时,如果带有&op_type=true
参 数,明确指明是创建文档,如果文档存在就报错。默认情况下,当文档存在时直接覆盖。
PUT twitter/_doc/1?op_type=create
{
"user":"kimchy",
"post_date":"2009-11-15T14:12:12",
"message":"trying out Elasticsearch"
}
1.3、ID自动生成
索引操作可以在不指定ID的情况下执行。在这种情况下,将自动 生成一个ID。
POST twitter/_doc
{
"user":"kimchy",
"post_date":"2009-11-15T14:12:12",
"message":"trying out Elasticsearch"
}
1.4、路由(routing)
默认情况下,数据存放到哪个分片,通过使用文档ID值的哈希值 来控制。对于更显式的控制,可以传递哈希函数的种子值。
POST twitter/_doc?routing=kimchy
{
"user":"kimchy",
"post_date":"2009-11-15T14:12:12",
"message":"trying out Elasticsearch"
}
在设置显式mapping时,可以选择使用_routing字段从文档本身 提取路由值。如果定义了mapping的_routing值并将其设置为必需, 则如果没有提供可提取路由值,索引操作将失败。
1.5、分发
索引操作根据路由定向到primary,并在包含此分片的实际节点 上执行。在primary完成操作后,如果需要,操作将分发到需要的其他分片。
1.6、超时(time)
执行索引操作时,分配用于执行索引操作的主碎片可能不可用。出现这种情况的一些原因可能是主碎片当前正在从网关恢复或正在进行重新定位。默认情况下,索引操作将在主碎片上等待长达1分钟,然后失败并返回错误。timeout参数可用于明确指定等待时间。
PUT twitter/_doc/1?timeout=5m
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
1.7、等待活动分片
为了兼顾系统写入的效率和可靠性,可以将索引操作配置为在继 续操作之前等待一定数量的活动分片。如果所需数量的活动分片不可 用,则写入操作必须等待并重试,直到所需分片已启动或发生超时。 默认情况下,写入操作仅在继续之前等待primary完成(即 wait_for_active_shards=1
) 。 可 以 通 过 设 置 index.write.wait_for_active_shards
来 动 态 重 写 此 默 认 值 。 要 更 改 每个请求操作的此行为,可以使用wait_for_active_shards请求参 数。
wait_for_active_shards
的有效值是任何正整数,最多为分片总 数。指定负值或大于分片数的数字将引发错误。
例如,假设有一个由3个节点(A、B和C)组成的集群,并且创 建了一个索引index,其中副本数设置为3(结果是共4个分片)。如 果我们尝试索引操作,默认情况下,该操作将仅确保primary在继续 操作之前可用。这意味着,即使B和C发生故障,并且A托管了 primary , 索 引 操 作 仍 然 继 续 进 行 。 如 果 在 请 求 中 将 wait_for_active_shards
设置为3(并且所有3个节点都已启动),那 么索引操作将需要3个活动的shard副本才能继续。这一要求应该满 足,因为集群中有3个活动的节点,每个节点都保存一个shard副本。 但是,如果我们将wait_for_active_shards设置为all(或设置为4, 这是相同的),则索引操作将不会继续,因为索引中没有每个shard 的所有4个副本。除非在集群中出现新节点以承载分片的第四副本, 否则操作将超时。
1.8、detect——noop参数
使用索引API更新文档时,即使文档没有更改,也会始终创建文 档的新版本。如果不想这样做,可以使用detect_noop=true
参数。这 个参数的作用是在更新之前与原文档对比,如果没有字段值的变化, 则不做更新操作。
2、Get API
GET API允许根据其ID从索引中获取JSON文档。
//获取整个文档
GET <index>/_doc/<_id>
//只获取_source部分
GET <index>/_source/<_id>
//检测文档是否存在
HEAD <index>/_doc/<_id>
HEAD <index>/_source/<_id>
path参数:
<index>
:(必须, string) 索引名称.<_id>
:(必须, string) 文档ID.
query参数:
preference
:(可选, string) 指定在哪个node或shard上执行,默认随机.realtime
:(可选, Boolean) 是否实时,默认true.refresh
:(可选, Boolean)如果为true,则请求在检索文档之前刷新相关碎片。默认为false。routing
:(可选, string) 路由参数,到哪个shard.stored_fields
:(可选, Boolean) 如果为true,则检索存储在索引中的文档字段,而不是文档 _source.默认false。_source
:(可选, string) 是否返回_source,默认true._source_excludes
(可选, string) 响应中在_source排除哪些字段,逗号分割.当_source 是false时不起作用。_source_includes
(可选, string) 跟_source_excludes相反。version
:(可选, integer) 指定version控制并发,version必须与文档当前的version匹配否则失败。version_type
:(可选, enum) Specific version type: external, external_gte.
response body参数:
_index
: index名称._id
: 文档ID._version
: 文档版本._seq_no
: 对应的index的_seq_no._primary_term
: 分片号.found
: 文档是否存在: true or false._routing
: 指定的路由,如果有设置的话._source
: 当文档存在时返回,除非参数_source是false或stored_fields是true._fields
: 如果stored_fields是true 且文档存在.
GET twitter/_doc/1
//返回
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
}
GET twitter/_source/1
//返回
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
2.1、实时性(realtime)
默认情况下,GET API是实时的,不受索引刷新频率的影响。如 果文档已更新但尚未刷新,则GET API将在适当时机发出刷新调用, 以使文档可见。这还将使上次刷新后更改的其他文档可见。如果需要 禁用此特性,可以将realtime
参数设置为false
。
GET twitter/_doc/1?realtime=false
2.2、字段选择(_source)
默认情况下,GET API操作返回_source的内容,除非使用了 stored_fields参数或禁用了_source。可以关闭_source检索,如下 所示:
GET twitter/_doc/1?_source=false
//返回
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true
}
如果只需要完整_source中的一个或两个字段,可以使用 _source_includes
和_source_excludes
参数来包含或排除字段。这 对于大型文档尤其有用,因为部分字段检索可以节省网络开销。两个 参数都采用逗号分隔的字段列表或通配符表达式。
GET twitter/_doc/0? _source_includes=*.id&_source_excludes=entities
如果只需指定include,可以使用较短的表示法:
GET twitter/_doc/0?_source=*.id,retweeted
2.3、存储字段(stored_fields)
GET操作允许指定一组存储字段(store属性值为true),这些 字段将通过传递stored_fields参数返回。如果未存储请求的字段,则 将忽略它们。
PUT twitter
{
"mappings": {
"properties": {
"counter": {
"type": "integer",
"store": false
},
"tags": {
"type": "keyword",
"store": true
}
}
}
}
PUT twitter/_doc/1
{
"counter":1,
"tags":["red"]
}
GET twitter/_doc/1?stored_fields=tags,counter
//返回
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"fields" : {
"tags" : [
"red"
]
}
}
此外,只有叶字段可以通过stored_field选项返回,不能返回对 象字段,这样的请求将失败。
2.4、路由(routing)
如果在索引文档时使用了路由参数,搜索时也应该加上该参数。 当然搜索时不加也是可以的,这会降低性能。
GET twitter/_doc/2?routing=user1
2.5、preference参数
preference参数的作用是控制优先从哪个shard获取数据。默认 情况下是随机选择的。一般情况下可以设置为_local,这可以降低网 络开销。
GET twitter/_doc/1?preference=_local
2.6、refresh参数
可以将refresh参数设置为true,以便在GET操作之前刷新相关 的分片并使其可见。将其设置为true应慎重考虑,因为这可能导致系 统负载过重,并减慢索引速度。
GET twitter/_doc/1?refresh
2.7、分发
GET操作被散列到一个特定的分片ID上,然后被重定向到该分片 ID对应的一个副本,并返回结果。副本是主分片及其在该分片ID上的 副本。这意味着拥有的副本越多,将拥有更好的扩展能力。
2.8、版本支持(version)
只有当文档的当前版本等于指定版本时,才能使用version参数来 检索文档(当然不等于也没关系,只是检索不到数据罢了)。对于所 有版本类型,此行为都相同。
在内部,Elasticsearch将旧文档标记为已删除,并添加了一个 全新的文档。旧版本的文档不会立即消失,尽管无法访问它。 Elasticsearch将采用一定的策略在后台清除被删除的文档。
GET twitter/_doc/1?version=1
3、Delete API
删除API(DELETE)允许根据特定文档的ID从其中删除JSON 文档。
//语法
DELETE /<index>/_doc/<_id>
path参数:
<index>
:(必须, string) index名称.<_id>
:(必须, string) 文档ID.
query参数:
-
if_seq_no
:(Optional, integer) 同上. -
if_primary_term
:(Optional, integer) 同上.
-refresh
:(Optional, enum)默认falsetrue
:refresh相关的shards.wait_for wait
:直到refresh。false
: do nothing with refreshes.
-
routing
:(Optional, string) 如果文档是有路由的,则必须指定相同的规则参数,否则RoutingMissingException . -
·timeout`:(Optional, time units) wait for active shards的时间. 默认1m.
-
version
:(Optional, integer) 指定版本号乐观锁. -
version_type
:(Optional, enum) Specific version type: external, external_gte. -
wait_for_active_shards
:(Optional, string) 默认1即只需primary,可选all或 1 - number_of_replicas+1 。
DELETE /twitter/_doc/1
//返回
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"result" : "deleted",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1
}
3.1、路由(routing)
如果在索引文档时使用了路由参数,删除时也应该加上该参数。
DELETE twitter/_doc/1?routing=user1
4、Update API
更新API(_update)允许根据提供的脚本更新文档。该操作从 索引中获取文档,运行脚本(使用可选的脚本语言和参数),并对结 果进行索引(还允许删除或忽略该操作)。它使用版本控制来确保在 Get和Reindex操作期间没有发生任何更新。
此操作仍然意味着文档的完全重新索引,它只是减少了一些网络 往返,并减少Get和Reindex操作之间版本冲突的可能性。需要启用 _source才能使用此功能。
//语法
POST /<index>/_update/<_id>
path参数:
<index>
:(必须, string) index名称,默认如果index不存在自动创建。<_id>
:(必须, string) 文档ID.
query参数:
-
if_seq_no
:(Optional, integer) 同上. -
if_primary_term
:(Optional, integer) 同上. -
lang
:(Optional, string) script 的语言类型. 默认: painless. -
require_alias
:(Optional, Boolean) 如果true, 目标必须是 index alias. 默认 false. -
refresh
(可选, enum)true
:立即刷新关联的shard。wait_for
:等待response直到index刷新。false
:默认,无处理。
-
retry_on_conflict
:(Optional, integer) 当并发更新版本冲突时的重试次数,默认0,如果达到重试次数还是冲突则409. -
routing
(Optional, string) 如果文档有routing时则必须指定. -
_source
(Optional, list) Set to false to disable source retrieval (default: true). You can also specify a comma-separated list of the fields you want to retrieve. -
_source_excludes
:(Optional, list) 同上. -
_source_includes
:(Optional, list) 同上. -
timeout
:(Optional, time units) 以下操作的等待时间,默认1m: -
wait_for_active_shards
:(Optional, string)同上。
现在创建一个简单的索引,后续将使用此索引:
PUT test/_doc/1
{
"counter": 1,
"tags": ["red"]
}
4.1、使用script更新
执行一个脚本来增加计数器(counter)字段的值:
POST test/_update/1
{
"script": {
"lang": "painless",
"source": "ctx._source.counter += params.count",
"params": {
"count": 4
}
}
}
向标签列表(tags)字段中添加标签(如果标签存在,它仍 然会被添加,因为这是一个列表):
POST test/_update/1
{
"script": {
"lang": "painless",
"source": "ctx._source.tags.add(params.tag)",
"params": {
"tag": "blue"
}
}
}
可以从标签列表中删除一个标签。删除标记的painless函数将要 移除的元素的数组索引作为其参数,因此在避免运行时错误的同时, 需要更多的逻辑来定位它。如果标记在列表中出现多次,则只会删除其中一次:
POST test/_update/1
{
"script" : {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
"lang": "painless",
"params" : {
"tag" : "blue"
}
}
}
除了_source外,还可以通过ctx映射获得以下变量:_index、 _type、_id、_version、_routing和_now(当前时间戳)。
还可以在文档中添加一个新字段:
POST test/_update/1
{
"script" : "ctx._source.new_field = 'value_of_new_field'"
}
或从文档中删除字段:
POST test/_update/1
{
"script" : "ctx._source.remove('new_field')"
}
而且,甚至可以动态地执行更改操作。如果标记字段包含 green,则此示例将删除文档,否则将不执行任何操作(noop):
POST test/_update/1
{
"script" : {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx.op = 'delete' } else { ctx.op = 'none' }",
"lang": "painless",
"params" : {
"tag" : "green"
}
}
}
4.2、部分字段更新
更新API还支持传递部分文档字段进行更新,在内部完成合并 (简单的递归合并、对象的内部合并、替换核心“键值”和数组)。 要完全替换现有文档,应使用索引API。
POST test/_update/1
{
"doc": {
"counter": 111
}
}
通过部分更新机 制将现有文档添加新字段:
POST test/_update/1
{
"doc" : {
"name" : "new_name"
}
}
4.3、避免无效更新
如果指定了doc,则其值将与现有的_source合并。默认情况下, 不更改任何内容的更新并不会真正执行并返回"result":“noop”,实 例如下:
POST test/_update/1
{
"doc" : {
"name" : "new_name"
}
}
因为传入的name值和source中的一样,则忽略整个更新请求。 如果请求被忽略,响应中的result元素将返回noop。
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 8,
"result" : "noop",
"_shards" : {
"total" : 0,
"successful" : 0,
"failed" : 0
},
"_seq_no" : 7,
"_primary_term" : 1
}
可以通过设置"detect_noop":false来禁用此行为。
POST test/_update/1
{
"doc" : {
"name" : "new_name"
},
"detect_noop": false
}
4.4、upsert元素
如果文档不存在,则upsert元素的内容将作为新文档插入。如果 文档确实存在,则将执行script:
POST test/_update/1
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
},
"upsert" : {
"counter" : 1
}
}
4.5、scripted_upsert参数
如果不管文档是否存在都希望运行脚本,即脚本处理初始化文 档,而不是upsert元素,可以将脚本scripted_upsert设置为true:
POST test/_update/1
{
"scripted_upsert": true,
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
},
"upsert" : {
"counter" : 1
}
}
4.6、doc_as_upsert参数
将doc_as_upsert设为true,会使用doc的内容作为upsert值, 而不是发送部分doc加上upsert,示例如下:
POST test/_update/1
{
"doc" : {
"name" : "new_name"
},
"doc_as_upsert" : true
}
多文档API
5、Multi Get API
MGet API(_mget)基于单个索引、类型(可选)和ID(可能 还有路由)返回多个文档。响应包括一个docs数组,其中包含与原始 _mget请求相对应的所有已提取文档(如果特定Get失败,则在响应 中包含此错误的对象)。成功的Get的结构与Get API提供的文档在结 构上类似。
//语法
GET /_mget
GET /<index>/_mget
path参数:
<index>
:(Optional, string) index的名称,如果不指定则在body中指定.
query参数:
-preference
:(Optional, string) 指定在哪个node或shard上执行,默认随机.
realtime
:(Optional, Boolean) 是否实时,默认true.refresh
:(Optional, Boolean) 如果为true,则请求在检索文档之前刷新相关碎片。默认为false。routing
:(Optional, string) 分片路由参数.stored_fields
:(Optional, Boolean) 如果为true,则检索存储在索引中的文档字段,而不是文档 _source.默认false。_source
:(可选, string) 是否返回_source,默认true._source_excludes
(可选, string) 响应中在_source排除哪些字段,逗号分割.当_source 是false时不起作用。_source_includes
:(可选, string) 跟_source_excludes相反。
request body:
-
docs
:(Optional, array) 当path参数没有指定index时必须:-
_id
:(Required, string) 文档ID. -
_index
:(Optional, string) 当path参数没有指定index时必须. -
routing
:(Optional, string) 如果文档有routing则必须,否则找不到. -
_source
:(Optional, Boolean) 是否需要_source,默认true.source_include
:(Optional, array) 响应中在_source排除哪些字段,逗号分割.当_source 是false时不起作用。source_exclude
:(Optional, array)跟_source_excludes相反。
-
_stored_fields
:(Optional, array) The stored fields you want to retrieve.
-
-
ids
:(Optional, array) 文档的IDs,当path参数指定index时使用。
response body:
- 响应是一个 docs array,每个文档的结构与Get API的响应类似。
示例如下:
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_id" : "1"
},
{
"_index" : "test",
"_id" : "2"
}
]
}
_mget请求也可以用于指定索引(在这种情况下,主体中不需要 它):
GET /test/_mget
{
"docs" : [
{
"_id" : "1"
},
{
"_id" : "2"
}
]
}
在这种情况下,可以直接使用ids元素来简化请求:
GET /test/_mget
{
"ids" : ["1", "2"]
}
5.1、_source过滤
默认情况下,将为每个文档返回_source字段(如果已存储)。 与Get API类似,可以使用_source参数仅检索_source的部分内容 ( 或 者 根 本 不 检 索 ) , 还 可 以 使 用 URL 参 数 _source 、 _source_includes和_source_excludes指定默认值。
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_source" : false
},
{
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_source" : ["field3", "field4"]
},
{
"_index" : "test",
"_type" : "_doc",
"_id" : "3",
"_source" : {
"include": ["user"],
"exclude": ["user.location"]
}
}
]
}
5.2、stored_fields存储字段
可以为每个要获取的文档指定检索特定存储字段(store属性为 true),类似于Get API的stored_fields参数。
GET /_mget
{
"docs" : [
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"stored_fields" : ["field1", "field2"]
},
{
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"stored_fields" : ["field3", "field4"]
}
]
}
或者,可以在查询字符串中指定stored_fields参数作为默认值, 应用于所有文档:
GET /test/_doc/_mget?stored_fields=field1,field2
{
"docs" : [
{
"_id" : "1"
},
{
"_id" : "2",
"stored_fields" : ["field3", "field4"]
}
]
}
5.3、routing路由
可以将路由值作为参数:
GET /_mget?routing=key1
{
"docs" : [
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"routing" : "key2"
},
{
"_index" : "test",
"_type" : "_doc",
"_id" : "2"
}
]
}
在上面的例子中,文档test/_doc/2将从路由键key1对应的分片 中获取,而文档test/_doc/1将从路由键key2对应的分片中提取,因 为test/_doc/2没有单独指定路由。
6、Bulk API
批量操作API(_bulk)可以在单个API调用中执行多个索引和删 除操作。这可以大大提高索引速度。
//语法
POST /_bulk
POST /<index>/_bulk
path参数:
<target>
:(Optional, string) data stream, index, 或 index alias 的名称.
query参数:
-
pipeline
:(Optional, string) 需要给incoming文档使用的pipeline的ID. -
refresh
:(可选, enum)true
:立即刷新关联的shard。wait_for
:等待response直到index刷新。false
:默认,无处理。
-
require_alias
:(Optional, Boolean) 如果true, target 必须是 index alias. 默认 false. -
routing
:(可选, string) 分片的路由参数. -
_source
:(可选, string) 是否返回_source,默认true. -
_source_excludes
:(可选, string) 响应中在_source排除哪些字段,逗号分割.当_source 是false时不起作用。 -
_source_includes
(可选, string) 跟_source_excludes相反。 -
timeout
:(可选, time units) 对以下动作的超时时间,默认1m: -
wait_for_active_shards
:(可选, string) 这个参数覆盖了index.write.wait_for_active_shards。预先检查shard的可用数量,如果当时可用的shard数量小于配置数,则ES会重试直到超时,默认1即只需primary,可选all或 1 - number_of_replicas+1 。因为文档在索引时会从primary尽力的同步到各个可用的replica,所以该配置是预先检查机制,提供数据的可靠性。
request body:
-
create
:(Optional, string) 创建文档._index
:(Optional, string) data stream, index, or index alias的名称. 当 没有指定时必须._id
:(Optional, string) 文档的ID,如果不指定则使用自动ID.require_alias
:(Optional, Boolean) 如果true, target 必须是 index alias. 默认 false.dynamic_templates
:(Optional, map) 动态模板,与Index Mapping的动态模板类似。
-
delete
:(Optional, string) 删除文档._index
:(Optional, string)data stream, index, or index alias的名称. 当 没有指定时必须._id
:(Required, string) 文档 ID.require_alias
:(Optional, Boolean) 如果true, target 必须是 index alias. 默认 false.
-
index
:(Optional, string) 索引文档._index
:(Optional, string) data stream, index, or index alias的名称. 当没有指定时必须. _id
:(Optional, string) 文档的ID,如果不指定则使用自动ID.require_alias
:(Optional, Boolean) 如果true, target 必须是 index alias. 默认 false.dynamic_templates
:(Optional, map) 动态模板,与Index Mapping的动态模板类似。
-
update
:(Optional, string) 更新文档._index
:(Optional, string)data stream, index, or index alias的名称. 当 没有指定时必须._id
:(Required, string) 文档 ID.require_alias
:(Optional, Boolean) 如果true, target 必须是 index alias. 默认 false.
-
doc
:(Optional, object) 文档的一部分,用于 update. 例如 { “doc” : {“field2” : “value2”} } -
<fields>
:(Optional, object) 文档的一部分,用于 create 和index . 例如 { “field1” : “value1” }
response body:
bulk API的响应包含请求中每个操作的单独结果,并以提交的顺序返回。单个操作的成功或失败不会影响请求中的其他操作。为了确保快速响应,如果一个或多个shard失败,bulk API将响应部分结果。
-
took
(integer) 耗时ms. -
errors
(Boolean) 如果 true, 表示至少有一个操作失败. -
items
(array of objects) 每个操作的单独结果,并以提交的顺序返回.-
<action>
(object) 即操作的类型: create, delete, index, 和 update.-
_index
(string) 操作对应的index名称,如果是data stream则是背后的具体index名称. -
_type
(string) 固定_doc. -
_id
(integer) 文档ID. -
_version
(integer) 文档最新版本. -
result
(string) 操作的结果. 成功的值有 created, deleted, 和updated. 该参数只有在成功时才返回。 -
_shards
(object) 包含以下:total
(integer) 总的shard数量.successful
(integer) 成功的shard数量.failed
(integer) 失败的shard数量.
-
_seq_no
(integer) 同上,该参数只有在成功时才返回。 -
_primary_term
(integer) 同上,该参数只有在成功时才返回。 -
status
(integer)操作的http code结果。 -
error
(object) 该参数只有在失败时才返回,包含如下:type
(string) 操作的失败类型.reason
(string) 失败原因.index_uuid
(string) 失败的index的UUID.shard
(string) 失败的shard.index
(string) 失败对应的index名称,如果是data stream则是背后的具体index名称.
-
-
REST API结点是/_bulk,它需要以下以新行分隔的 JSON(ndjson)结构:
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
注意:最后一行数据必须以换行符(\n)结尾。每个换行符前面 都可以有一个回车符\r。向此结点发送请求时,Content-Type应设置为application/x-ndjson。
可能的操作包括Index、Create、Delete和Update。Index和 Create需要下一行有一个源,并且与标准索引API的op_type参数具有相同的语义(即,如果已经存在具有相同索引的文档,则Create操作将失败,而Index操作将根据需要添加或替换文档)。但Delete操 作不需要在下面的行中有一个源,它与标准的Delete API具有相同的语义。Update要求在下一行指定分部文档、upsert和脚本及其选项。
如果要为Curl提供文本文件输入,则必须使用--data-binary
标 志而不是-d,后者不保留换行符。如下示例:
$ cat requests
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
以下是正确的批量命令序列示例:
POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
节点是/_bulk,它和/{index}/_bulk的区别是,提供索引index 后,默认情况下,它将用于未显式提供索引的批量项目。
格式在这里说明,这里的想法是让处理过程尽可能快。由于某些 操作将被重定向到其他节点上的其他分片,因此只有 action_meta_data是确定在接受节点处理的。
对批量操作的响应是一个大型JSON结构,每个操作的单独结果 按照与请求中显示的操作相同的顺序执行。单个操作的失败不会影响 其余操作。
在单个批量调用中没有一个绝对合适的批操作大小数字,应该使 用不同的设置进行试验,以找到适合特定工作负载的最佳大小。
如果使用HTTP API,请确保客户端不发送HTTP块,因为这样会 减慢速度。
6.1、路由
每个批量项目都可以使用routing字段传递路由值。它会根据 _routing映射自动跟踪索引和删除操作的行为。
6.2、更新
当使用更新操作时,retry_on_conflict可以用作操作本身(不在 额外的有效负载行中)中的字段,以指定在发生版本冲突时应重试更 新的次数。
Update操作支持以下选项:doc(部分文档)、upsert、 doc_as_upsert、script、params(用于脚本)、lang(用于脚本) 和_source。更新操作示例如下:
POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}
7、Delete By Query API
查询删除API(_delete_by_query)是对每个与查询匹配的文档 执行删除操作。
//语法
POST /<index>/_delete_by_query
条件可以使用Search API 在query参数和body参数中。
path参数:
<target>
:(Optional, string) data streams, indices, and aliases 的逗号分割. 可以使用通配符*或_all.
query参数:
-
allow_no_indices
:(Optional, Boolean)是否允许匹配不到index,如果false则匹配不到时失败,例如target是foo*,bar*,实际存在foo开头的index但没有bar开头的就会失败,默认true. -
analyzer
:(Optional, string) query的分词器,只能在 q 参数指定时使用. -
analyze_wildcard
:(Optional, Boolean) If true, wildcard and prefix queries are analyzed. Defaults to false.只能在 q 参数指定时使用. -
conflicts
:(Optional, string) 当版本冲突时怎么处理: abort or proceed. 默认 abort. -
default_operator
:(Optional, string) 条件关系: AND or OR. 默认 OR.只能在 q 参数指定时使用. -
df
:(Optional, string) Field to use as default where no field prefix is given in the query string.只能在 q 参数指定时使用. -
expand_wildcards
:(Optional, string) 可以被通配符匹配的索引,逗号分割,默认open,可选值:- all
- open
- closed
- hidden
- none
- from
-
ignore_unavailable
:(Optional, Boolean) 如果true, 响应中不包含missing or closed indices.默认false. -
lenient
:(Optional, Boolean) 错误容忍,如果true,忽略query的格式错误(比如给数字类型使用text) .默认false.只能在 q 参数指定时使用. -
max_docs
:(Optional, integer) 最多删除文档数量,默认全部. -
preference
:(Optional, string) 指定在哪个node或shard上执行,默认随机. -
q
:(Optional, string) Query in the Lucene query string syntax. -
request_cache
:(Optional, Boolean) If true, the request cache is used for this request. Defaults to the index-level setting. -
refresh
:(Optional, Boolean) 如果 true, 请求完成后refresh所有shard,而不是Delete API的刷新相关shard,默认 false. -
requests_per_second
:(Optional, integer) 删除处理的每秒处理数量,默认-1不限制. -
routing
:(Optional, string) Custom value used to route operations to a specific shard. -
scroll
:(Optional, time value) delete by query使用scroll查询,该参数控制scroll该次的生命周期,默认5m. 见Scroll search results. -
scroll_size
:(Optional, integer) scroll的Size。默认1000. -
search_type
:(Optional, string) The type of the search operation. Available options:query_then_fetch
dfs_query_then_fetch
-
search_timeout
:(Optional, time units) search的超时时间,默认不超时. -
slices
:(Optional, integer) 是否需要拆分成子任务,默认1不拆分. -
sort
:(Optional, string) 查询的排序,逗号分割, : pairs. -
_source
:(Optional, string) True or false 控制是否return _source ,或者指定字段集. -
_source_excludes
:(可选, string) 响应中在_source排除哪些字段,逗号分割.当_source 是false时不起作用。 -
_source_includes
:(可选, string) 跟_source_excludes相反。 -
stats
:(Optional, string) Specific tag of the request for logging and statistical purposes. -
terminate_after
:(Optional, integer) 每个shard允许搜集的最大文档数量,当到达最大数量时提前结束query. -
timeout
:(Optional, time units) 超时时间 waits for active shards. 默认1m. -
version
:(Optional, Boolean) 如果true, 返回文档的version. -
wait_for_active_shards
:(Optional, string) 默认1即只需primary,可选all或 1 - number_of_replicas+1 。
request body:
query
:json Query DSL
response body:
took
整个操作的耗时ms.timed_out
是否超时.total
成功处理的文档数量.deleted
成功删除的文档数量.batches
The number of scroll responses pulled back by the delete by query.version_conflicts
版本冲突的数量.noops
对于该API一直是0.retries
The number of retries attempted by delete by query. bulk is the number of bulk actions retried, and search is the number of search actions retried.throttled_millis
总的对requests_per_second的sleep节流ms.requests_per_second
删除处理的每秒处理数量.throttled_until_millis
该API一直是0.failures
处理过程中的不可重试的失败。
示例:
POST /twitter/_delete_by_query
{
"query": {
"match": {
"message": "some message"
}
}
}
POST /twitter,blog/_delete_by_query
{
"query": {
"match_all": {}
}
}
_delete_by_query操作在启动时会获取索引的一个快照,并使用 内部版本控制删除找到的内容。这意味着,如果文档在获取快照的时 间和处理删除请求的时间之间发生更改,则会出现版本冲突,如图所示。当版本匹配时,文档将被删除。
在_delete_by_query执行期间,将按顺序执行多个搜索请求,以 查找所有要删除的匹配文档。每当找到一批文档时,都会执行相应的 批量请求来删除所有这些文档。如果搜索或批量请求被拒绝, _delete_by_query按照默认策略重试被拒绝的请求(最多10次,指数下降)。达到最大重试次数限制会导致_delete_by_query中止,并在 响应的failures中返回所有失败信息。已执行的删除操作仍然保持不变。换句话说,操作不会回滚,只会中止。当第一个失败导致中止时,由失败的批量请求返回的所有失败信息都会在failures元素中返回,因此,可能会有相当多失败的实体。
如果不想因版本冲突而让它们中止,那么在URL上设置 conflicts=proceed
或在请求正文中设置conflicts:proceed
。
POST /test/_delete_by_query?conflicts=proceed
{
"query": {
"match_all": {}
}
}
如果提供了routing,则路由将复制到滚动(scroll)查询中,并 将操作限制在与该路由值匹配的分片上,如下示例:
POST twitter/_delete_by_query?routing=1
{
"query": {
"range" : {
"age" : {
"gte" : 10
}
}
}
}
默认情况下,_delete_by_query查询使用的滚动批次大小为 1000。可以使用scroll_size参数更改批次大小:
POST twitter/_delete_by_query?scroll_size=5000
{
"query": {
"term": {
"user": "kimchy"
}
}
}
7.1、URL参数
除了标准参数(如pretty)外,_delete_by_query API还支持:
- refresh
- wait_for_completion
- wait_for_active_shards
- timeout
- scroll
发送带有refresh参数的请求,将在请求完成后刷新涉及的所有分 片。这与DELETE API的refresh参数不同,后者只会刷新接收到删 除请求的分片。同时,它不支持wait_for参数。
如 果 请 求 包 含 wait_for_completion=false
设 置 , 那 么 Elasticsearch将执行一些预检查。启动请求,然后返回一个Task以 取 消 或 获 取 任 务 的 状 态 。 Elasticsearch 还 将 在.tasks/task/${taskId}索引中创建此任务的记录文档,用户可以根 据需要保留或删除创建的文档,完成后删除它。这样Elasticsearch 可以回收它使用的空间。
wait_for_active_shards
参数用来控制在继续执行请求之前必须 激活多少个分片或副本。timeout参数用来控制每个写请求等待不可 用分片变为可用的时间。这两个参数在批量API中的工作方式完全相 同。由于_delete_by_query
使用scroll
搜索,还可以指定scroll参数 来控制“搜索上下文”保持活动的时间,例如scroll=10m。默认为5 分钟。
requests_per_second
参数可以设置为任何正十进制数(1.4、 6、1000等),并通过用等待时间填充每个批来限制通过查询发出删 除操作批的速率。通过将requests_per_second设置为-1,可以禁用 限制。此限制是通过在批之间等待来完成的,这样就可以为 _delete_by_query内部使用的滚动指定一个超时时间。这个时间是批 处理大小除以requests_per_second与写入时间之差。默认情况下, 批处理大小为1000,因此如果requests_per_second设置为500,计 算方式如下:
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds -.5 seconds = 1.5 seconds
由于该操作是作为单个_bulk请求发出的,因此较大的批大小将 导致Elasticsearch创建多个请求,并在启动下一个操作之前等待一 段时间,这是不平稳的。默认值为-1。
7.2、返回体
{
"took":147,
"timed_out":false,
"total":119,
"deleted":119,
"batches":1,
"version_conflicts":0,
"noops":0,
"retries":{
"bulk":0,
"search":0
},
"throttled_millis":0,
"requests_per_second":-1.0,
"throttled_until_millis":0,
"failures":[]
}
- took:整个操作从开始到结束的毫秒数。
- timed_out:如果在执行_delete_by_query期间执行的任何 请求超时,则此标志为true。
- total:成功处理的文档数。
- deleted:成功删除的文档数。
- batches:分了多少批次执行。
- version_conflicts:版本冲突的文档数。
- noops:对于_delete_by_query,此字段始终等于零。
- retries:操作尝试的重试次数。bulk是重试的批量操作数, search是重试的搜索操作数。
- throttled_millis : 请 求 休 眠 以 符 合 requests_per_second 参数设置的毫秒数。
- requests_per_second:每秒有效执行的请求数。
- throttled_until_millis:在_delete_by_query响应中,此字 段应始终等于零。它只有在使用_tasks API时才有意义,在 该API中,它指示下一次将再次执行请求的等待时间(从 epoch开始以毫秒为单位)。
- failures:如果请求处理中有任何不可恢复的错误,则记录 到这个失败数组中。如果不为空,那么请求会因为这些失败 而中止。_delete_by_query是使用批处理实现的,任何失败 都会导致整个过程中止,但当前批处理中的所有失败都会收 集到此数组中。可以使用conflicts选项防止覆盖写入在版本 冲突时中止。
7.3、任务API(_tasks)
可 以 使 用 任 务 API ( _tasks ) 获 取 任 何 正 在 运 行 的 _delete_by_query的状态:
GET _tasks?detailed=true&actions=*/delete/byquery
返回:
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/delete/byquery",
"status" : {
"total" : 6154,
"updated" : 0,
"created" : 0,
"deleted" : 3500,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"throttled_millis": 0
},
"description" : ""
}
}
}
}
}
上述返回结果中的status字段,包含任务的实际状态。total字段 是重新索引预期执行的操作总数。可以通过updated、created和 deleted的字段的加总值来估计进度。当它们的总和等于合计字段时, 请求将完成。
使用任务ID,可以直接查找任务:
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
此API的优点是,它可以与wait_for_completion=false一起使 用,以透明地返回已完成任务的状态。如果任务已完成,并且在其上 设置了wait_for_completion=false,那么它将返回results或error字 段。此功能的成本是创建的新的文档,需要手动来删除创建的文档。
7.4、取消任务API(_cancel)
可以使用取消任务API(_cancel)取消_delete_by_query进程:
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
可以使用任务API找到任务ID。
取消应该很快发生,但可能需要几秒钟。上面的任务状态API将 继续列出相应的任务,直到该任务检查它是否已被取消并自行终止。
7.5、动态调整API
requests_per_second 的 值 可 以 在 运 行 时 使 用 动 态 调 整 API(_rethrottle)进行更改:
POST _delete_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1
就像在_delete_by_query API上设置它一样, requests_per_second可以置为-1以禁用限制,也可以是任何十进制数(如1.7或12)以限制到该级别。加快查询速度的重新标记将立即 生效,但减慢查询速度的重新标记将在完成当前批处理后生效。这样 可以防止滚动超时。
7.6、切片
_delete_by_query API支持切片滚动,使删除过程并行。这种并 行化可以提高效率,并提供一种方便的方法将请求分解为较小的部 分。
7.6.1、人工切片
通过为每个请求提供一个切片ID和切片总数,进行人工切片:
POST twitter/_delete_by_query
{
"slice": {
"id": 0,
"max": 2
},
"query": {
"range": {
"likes": {
"lt": 10
}
}
}
}
POST twitter/_delete_by_query
{
"slice": {
"id": 1,
"max": 2
},
"query": {
"range": {
"likes": {
"lt": 10
}
}
}
}
可以通过如下URL验证其是否生效:
GET _refresh
POST twitter/_search?size=0&filter_path=hits.total
{
"query": {
"range": {
"likes": {
"lt": 10
}
}
}
}
//返回
{
"hits": {
"total" : {
"value": 0,
"relation": "eq"
}
}
}
7.6.2、自动切片
可以让_delete_by_query过程自动并行化,方法是使用“切片滚 动”机制对_id进行切片。使用slices指定要使用的切片数:
POST twitter/_delete_by_query?refresh&slices=5
{
"query": {
"range": {
"likes": {
"lt": 10
}
}
}
}
可以通过如下URL验证是否生效:
POST twitter/_search?size=0&filter_path=hits.total
{
"query": {
"range": {
"likes": {
"lt": 10
}
}
}
}
//返回
{
"hits": {
"total" : {
"value": 0,
"relation": "eq"
}
}
}
把slices设置为auto将允许Elasticsearch选择要使用的切片数。 此设置将会为每个分片使用一个切片,直至达到某个限制。如果存在 多个源索引,它将根据具有最小分片数的索引选择切片数。
通过在_delete_by_query API中添加slices会自动执行切片过 程。
7.6.3、切片数量选择
如果使用自动切片机制,将为大多数索引选择一个合理的切片数 字。如果要手动切片或调整自动切片,请遵循以下准则:
- 当切片数等于索引中的分片数时,查询性能最佳。如果该数 字很大(例如500),请选择一个较小的数字,因为太多的 切片会影响性能。设置高于分片数量的切片通常不会提高效 率并增加开销。
- 删除性能随可用资源的切片数线性扩展。
- 查询或删除性能是否支配运行时间取决于重新索引的文档和 集群资源。
8、Update By Query API
查询更新API(_update_by_query)的功能是在不更改源的情况 下对索引中的每个文档执行更新。这对于获取新属性或其他联机映射 更改很有用。
可以使用script、pipeline作为更新。
总体机制与 Delete by query相似,path、query、request body、response body(多了一个updated参数)的参数全部相同。
POST twitter/_update_by_query?conflicts=proceed
返回:
{
"took" : 147,
"timed_out": false,
"updated": 120,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"total": 120,
"failures" : [ ]
}
示例:更新整个文档: 你可以使用_update_by_query来更新整个文档,例如将所有包含特定条件的文档中的某个字段增加一个固定值。示例请求如下:
PUT person/_doc/1
{
"name":"tom 1",
"age": 18
}
PUT person/_doc/2
{
"name":"jerry",
"age": 22
}
PUT person/_doc/3
{
"name":"tom 2",
"age": 19
}
POST person/_update_by_query
{
"query": {
"match": {
"name": "tom"
}
},
"script": {
"source": "ctx._source.age += params.increment",
"params": {
"increment": 5
}
}
}
上述示例将满足条件 “match”:“tom” 的所有文档中的 age 字段增加了 5。
示例:更新部分字段: 除了更新整个文档外,你也可以使用 _update_by_query 来更新部分字段。示例请求如下:
POST your_index/_update_by_query
{
"query": {
"term": {
"field": "value"
}
},
"script": {
"source": """
if (ctx._source.containsKey('new_field')) {
ctx._source.new_field = 'new_value'
} else {
ctx.op = 'noop'
}
"""
}
}
上述示例会将满足条件 “field” = “value” 的所有文档中的 new_field 字段更新为 ‘new_value’,如果文档中不存在 new_field 字段,则不执行任何操作。
_update_by_query API在开始时获取该索引的快照,并使用内 部版本控制对找到的内容进行索引。这意味着,如果文档在获取到快照和处理索引请求的时间之间发生更改,则会出现版本冲突。当版本 匹配时,将更新文档并增加版本号。
所有更新和查询失败都会导致_update_by_query中止,并在响 应failures元素中返回相关错误信息。已经执行的更新仍然保持不 变,即进程不会回滚,只会中止。当第一个失败导致中止时,由失败 的批量请求返回的所有失败信息都会在failures元素中返回。因此, 可能会有相当多失败的实体。
如果只想统计版本冲突,而不想让_update_by_query操作中 止,可以在URL上设置conflicts=proceed或在请求正文中设 置"conflicts":“proceed”。上面的例子就是这样做的,因为它只是在 尝试获取一个在线映射更改,而版本冲突仅仅意味着冲突文档在 _update_by_query的开始和试图更新文档的时间之间被更新。这种 机制用于获取联机映射更新非常有用。
此API还可以用于DSL查询中。这将 更新用户Kimchy的twitter索引中的所有文档:
POST twitter/_update_by_query?conflicts=proceed
{
"query": {
"term": {
"user": "kimchy"
}
}
}
到目前为止,都是只更新文档而不更改其源字段,这对获取新属 性或映射这类的信息真的很有用,但只是其中一半的功能。 _update_by_query API支持脚本更新文档。
下面的例子,将增加 kimchy所有tweet上的likes字段:
POST twitter/_update_by_query
{
"script": {
"source": "ctx._source.likes++",
"lang": "painless"
},
"query": {
"term": {
"user": "kimchy"
}
}
}
正如在更新API中一样,可以设置ctx.op来控制执行的操作:
- noop:如果脚本决定不需要进行任何更改,则设置 ctx.op=noop 。 这 将 导 致 _update_by_query 操 作 从 其 更 新 中忽略该文档,此操作将在响应主体的noop计数器中报告。
- delete : 如 果 脚 本 决 定 必 须 删 除 文 档 , 则 设 置 ctx.op=delete,删除将在响应正文中的deleted计数器中报 告。
- 将ctx.op设置为其他值是错误的。在ctx中设置任何其他字段 都是错误的。
在没有指定conflicts=proceed的情况下,版本冲突会中止进程, 这样可以让用户来处理失败。
这个API不允许移动文档,只须修改它们的源字段即可。这是有 意义的,我们没有能力将文件从原始位置移走(这是由Lucene的存储 结构决定的)。
也可以一次对多个索引执行整个操作,就像搜索API一样:
POST twitter,blog/_update_by_query
如果提供路由routing字段,则路由将复制到滚动查询中,并将 处理过程限制为与该路由值匹配的分片,如下示例指定了路由参数:
POST twitter/_update_by_query?routing=1
默认情况下,_update_by_query API使用1000滚动批次大小 (操作是分批进行的,每批1000个文档)。可以使用URL参数 scroll_size更改批次大小:
POST twitter/_update_by_query?scroll_size=100
_update_by_queryAPI还可以通过如下方式指定管道来使用“索 引预处理节点”(预处理的一个功能)功能:
PUT _ingest/pipeline/set-foo
{
"description" : "sets foo",
"processors" : [ {
"set" : {
"field": "foo",
"value": "bar"
}
} ]
}
POST twitter/_update_by_query?pipeline=set-foo
8.1、URL参数
除了标准参数如pretty外,_update_by_query API还支持 refresh 、 wait_for_completion 、 wait_for_active_shards 、 timeout和scroll参数。
发送refresh参数将在请求完成时刷新索引中的所有分片。这与更 新API的refresh参数不同,后者只会导致接收到新数据的分片被刷 新。还有一点,它不支持wait_for参数。
如果请求包含wait_for_completion=false,那么Elasticsearch 将执行一些预检查,启动请求,然后返回一个Task,以取消或获取任 务的状态。Elasticsearch还将在.tasks/task/${taskId}.索引中创 建此任务的记录文档,可以根据需要保留或删除创建的文档。完成后 删除它,这样Elasticsearch可以回收它使用的空间。
wait_for_active_shards控制在继续执行请求之前必须激活多少 个shard副本。timeout控制每个写请求等待不可用分片变为可用分 片的时间。这两个参数在Bulk API中的工作方式完全相同。由于 _update_by_query操作使用滚动搜索,还可以指定scroll参数来控制 “搜索上下文”保持活动的时间,例如scroll=10m。默认为5分钟。
requests_per_second可以设置为任何正十进制数(1.4、6、 1000等),并通过用等待时间填充每个批来限制_update_by_query 发出索引操作批的速率。通过将requests_per_second设置为-1,可 以禁用限制。限制是通过在批之间等待来完成的,这样就可以为 _update_by_query操作在内部使用的滚动指定一个填充的超时。填 充时间是批处理大小除以requests_per_second与写入时间之差。默 认情况下,批处理大小为1000,因此如果requests_per_second设 置为500,计算方法如下:
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
由于该批是作为_bulk请求发出的,因此大小较大的批将导致 Elasticsearch创建多个请求,然后在启动下一个集合之前等待一段 时间,这是不平稳的。默认值为-1。
8.2、请求体
JSON响应如下:
{
"took" : 147,
"timed_out": false,
"total": 5,
"updated": 5,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"failures" : [ ]
}
- took:整个操作从开始到结束的毫秒数。
- timed_out:如果在操作执行期间的任何请求超时,则此标 志设置为true。
- total:成功处理的文档数。
- updated:成功更新的文档数。
- deleted:成功删除的文档数。
- batches:分了多少批次执行。
- version_conflicts:版本冲突的文档数。
- noops:当设置了ctx.op="noop"时被忽略的文档数。
- retries:尝试的重试次数。bulk是重试的批量操作数, search是重试的搜索操作数。
- throttled_millis : 请 求 休 眠 以 符 合 requests_per_second 的毫秒数。
- requests_per_second:每秒有效执行的请求数。
- throttled_until_millis:在_update_by_query响应中,此 字段应始终等于零。它只有在使用TASK API时才有意义, 在该API中,它指示下一次将再次执行请求的等待时间(从 epoch开始以毫秒为单位),以符合requests_per_second 参数的限制要求。
- failures:如果请求处理中有任何不可恢复的错误,则记录 到这个失败数组中。如果这不是空的,那么请求会因为这些 失败而中止。_update_by_query是使用批处理实现的,任 何失败都会导致整个过程中止,但当前批处理中的所有失败 信息都会收集到数组中。可以使用conflicts选项防止操作在 版本冲突时中止。
8.3、任务API
与查询删除delete by query一致
8.4、取消任务API
与查询删除delete by query一致
8.5、动态调整API
与查询删除delete by query一致
8.6、切片
与查询删除delete by query一致
8.7、获取新属性
假设创建了一个没有动态映射的索引,用数据填充它,然后添加 了一个映射值以从数据中提取更多字段,如下示例,“dynamic”: false设置意味着新字段不会被索引,只会存储在_source中。
PUT test
{
"mappings": {
"dynamic": false,
"properties": {
"text": {"type": "text"}
}
}
}
现在,添加两条文档,示例如下:
POST test/_doc?refresh
{
"text": "words words",
"flag": "bar"
}
POST test/_doc?refresh
{
"text": "words words",
"flag": "foo"
}
如下示例,更新映射以添加新flag字段。要提取新字段,必须用 它重新索引所有文档。
PUT test/_mapping
{
"properties": {
"text": {"type": "text"},
"flag": {"type": "text", "analyzer": "keyword"}
}
}
如下示例,搜索数据找不到任何内容:
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
//返回
{
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
}
}
}
但可以通过_update_by_query发出更新请求来获取新映射:
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
//返回
{
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
}
}
}
9、Reindex API
重建索引的最基本功能是从一个索引拷贝文档到另一个索引。
//语法
POST /_reindex
使用reindex手动重新索引旧索引的步骤:
-
创建一个新索引,并从旧索引复制映射和设置。
将refresh_interval设置为-1,将number_of_replicas设置为0,可以有效地重新索引。 -
使用Reindex将旧索引中的所有文档重新索引到新索引中。
-
将refresh_interval和number_of_replicas重置为旧索引中使用的值。
-
等待索引状态变为绿色。
-
在一个单一的更新别名请求中:
- 删除旧索引。
- 将旧索引名的别名添加到新索引中。
- 将旧索引上存在的别名添加到新索引中。
Reindex要求对源索引中的所有文档启用_source。
Reindex自动设置目标索引,但是不复制源索引的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射、分片计数、副本等。
reindex最基本的形式只是将文档从一个索引复制到另一个索引。这将把文档从twitter索引复制到new_twitter索引中:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
就像update_by_query一样,reindex获取源索引的快照,但是它的目标必须是一个不同的索引,这样就不太可能出现版本冲突。可以像新建索引那样配置dest子句来控制乐观并发控制。忽略version_type或将其设置为internal会导致Elasticsearch盲目地将文档转储到目标中,覆盖任何恰巧具有相同类型和id的文档:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "internal"
}
}
将version_type设置为external会导致Elasticsearch保存源文件的版本,创建任何缺失的文档,并更新目标索引中版本比源索引中版本更旧的文档:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
}
}
设置op_type为create将导致_reindex只在目标索引中创建缺少的文档。所有现有文档将导致版本冲突:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
错误示例:
"[_doc][2]: version conflict, document already exists (current version [2])"
默认情况下,版本冲突中止reindex进程,但你可以通过设置“conflicts”:“proceed”忽略冲突继续执行:
POST _reindex
{
"conflicts": "proceed",
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"op_type": "create"
}
}
可以通过向源添加类型或添加查询来限制文档。这将只复制匹配cat的文档到dest中:
POST _reindex
{
"source": {
"index": "source",
"query": {
"match": {
"company": "cat"
}
}
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
source中的索引和类型都可以是列表,允许您在一个请求中从许多源进行复制。这将从twitter的_doc类型中和blog索引的post类型中复制文档。
POST _reindex
{
"source": {
"index": ["twitter", "blog"]
},
"dest": {
"index": "all_together"
}
}
还可以通过设置max_docs来限制已处理文档的数量。这只会将一个文档从twitter复制到new_twitter:
POST _reindex
{
"max_docs": 1,
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
如果您想要从twitter索引中获得一组特定的文档,则需要使用排序。排序会降低滚动的效率,但在某些情况下,这样做是值得的。如果可能的话,请选择更具选择性的查询而不是size和sort。这将从twitter复制10000个文档到new_twitter:
POST _reindex
{
"size": 10000,
"source": {
"index": "twitter",
"sort": { "date": "desc" }
},
"dest": {
"index": "new_twitter"
}
}
与update_by_query一样,reindex支持修改文档的脚本。与_update_by_query不同,允许脚本修改文档的元数据。这个例子与源文档的版本冲突:
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter",
"version_type": "external"
},
"script": {
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}
}
您可以使用以下请求将公司名称为cat的源索引中的所有文档复制到路由设置为cat的dest索引中
POST _reindex
{
"source": {
"index": "source",
"query": {
"match": {
"company": "cat"
}
}
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
默认情况下,reindex使用1000的滚动批。您可以更改size批大小字段在源元素:
POST _reindex
{
"source": {
"index": "source",
"size": 100
},
"dest": {
"index": "dest",
"routing": "=cat"
}
}
Reindex还可以通过这样指定pipeline来使用Ingest节点特性:
POST _reindex
{
"source": {
"index": "source"
},
"dest": {
"index": "dest",
"pipeline": "some_ingest_pipeline"
}
}
9.1、远端集群
Reindex支持从一个远程Elasticsearch集群重建索引:
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"username": "user",
"password": "pass"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
host参数必须包含一个连接方式,主机,端口(例如:(https://otherhost:9200)和可选路径(例如:https://otherhost:9200/proxy)。username,和password参数是可选的,当它们出现时,reindex将使用基本身份验证连接到远程Elasticsearch节点。在使用基本身份验证时,请确保使用https,否则密码将以纯文本形式发送。
远程主机必须使用reindex.remote.whitelist在elasticsearch.yaml中显式白名单。可以将其设置为允许的远程主机和端口组合(例如otherhost:9200, another:9200, 127.0.10.:9200, localhost:)的逗号分隔列表。只需设置主机和端口即可,例如:
reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"
必须在协调重建索引的每一个节点上配置白名单。
这个特性应该适用于您可能找到的任何版本的Elasticsearch的远程集群。这将允许您从任何版本的Elasticsearch升级到当前版本,方法是对旧版本的集群进行索引重建。
为了使查询能够发送到旧版本的Elasticsearch,查询参数将直接发送到远程主机,而不需要进行验证或修改。
从远程集群重新索引不支持手动或自动切片。
从远程服务器重新索引使用堆上缓冲区,默认最大为100mb。如果远程索引包含非常大的文档,则需要使用较小的批处理大小。下面的示例将批处理大小设置为10,这非常非常小。
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200"
},
"index": "source",
"size": 10,
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
还可以使用socket_timeout字段在远程连接上设置套接字读取超时,使用connect_timeout字段设置连接超时。两者都默认为30秒。这个例子将套接字读取超时设置为1分钟,连接超时设置为10秒:
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"socket_timeout": "1m",
"connect_timeout": "10s"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
9.2、URL参数
除了像pretty这样的标准参数外,Reindex还支持refresh、wait_for_completion、wait_for_active_shards、timeout、scroll和requests_per_second。
发送refresh参数将导致刷新所有索引的写请求。这与Index的refresh参数不同,后者只会刷新接收到新数据的分片。另外,与Index不同,它不支持wait_for。
如果请求包含wait_for_completion=false,那么Elasticsearch将进行一些执行前检查,启动请求,然后返回一个任务,该任务可用于task来取消或获得该任务的状态。Elasticsearch还将在.tasks/task/${taskId}处创建该任务的记录作为文档。你可以自主决定保留或删除。当你完成或删除它时,使用的空间将被回收。
wait_for_active_shards控制在继续进行重建索引之前必须激活一个分片的多少个副本。timeout控制每个写请求等待不可用分片变为可用的时间。两者的工作方式与它们在Bulk中的工作方式完全相同。由于_reindex使用scroll搜索,你也可以指定滚动参数来控制它保持“搜索上下文”存活的时间(例如?scroll=10m)。默认值是5分钟。
可以将requests_per_second设置为任何正的十进制数(1.4、6、1000等),并通过填充每个批处理的等待时间来调节_reindex发出批索引操作的速度。可以通过将requests_per_second设置为-1来禁用限流。
限流是通过在批之间等待来完成的,这样_reindex在内部使用的scroll就可以获得一个数据索引的时间。该时间是批处理大小除以requests_per_second与花费在写索引上的时间的差。默认情况下批大小为1000,因此如果将requests_per_second设置为500:
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - 0.5 seconds = 1.5 seconds
由于批处理是作为单个_bulk请求发出的,因此较大的批处理将导致Elasticsearch创建多个请求,然后在开始下一组请求之前等待一段时间。默认值是-1。
9.3、任务API
与查询删除delete by query一致
9.4、取消任务API
与查询删除delete by query一致
9.5、动态调整API
与查询删除delete by query一致
9.6、切片
与查询删除delete by query一致
9.7、字段重命名
_reindex可用于构建具有重命名字段的索引副本。假设你创建了一个包含如下文档的索引:
POST test/_doc/1?refresh
{
"text": "words words",
"flag": "foo"
}
但是您不喜欢flag这个名字,想用tag替换它。reindex可以为您创建另一个索引:
POST _reindex
{
"source": {
"index": "test"
},
"dest": {
"index": "test2"
},
"script": {
"source": "ctx._source.tag = ctx._source.remove(\"flag\")"
}
}
新文档:
{
"_index" : "test2",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"text" : "words words",
"tag" : "foo"
}
}
9.8、重建多个索引
如果您有许多索引要重建,通常最好一次一个,而不是使用一个全局模式来重建许多索引。这样,如果有任何错误,您可以通过删除部分完成的索引并在该索引处重新开始来恢复该过程。它还使得并行化过程相当简单:分割索引列表以重新索引并并行地运行每个列表。
bash脚本:
for index in i1 i2 i3 i4 i5; do
curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{
"source": {
"index": "'$index'"
},
"dest": {
"index": "'$index'-reindexed"
}
}'
done