TL; DR
- indexとdocument idの組み合わせてデータを管理し、不要なデータはindex単位で削除していく
- 公式で使用されている通り、時系列データを1日単位でindexを作る
- 古い不要なindexを1日単位で削除できるようにする
- index単位の処理は高速
- document idもElasticSearchに任せ、こちらで指定しないほうが性能的に良い
なので、つまり
- document idをユーザ側で指定することは非推奨
- どこかに書いてあったはずですが失念しました。。。
- その指定したdocument idのdocumentをupdateすることは非推奨
- 当然、upsertも非推奨
でも、必要に迫らせてやらざるを得ない場合もあります。
これをどうやるか?というお話です。
確認環境
docker-composeで環境を準備します。
kibanaのDev toolsで確認することにします。
version: '3.9'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- node.master=true
- node.data=true
- xpack.security.enabled=false
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
volumes:
- ./elasticsearch:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
networks:
- esnet
kibana:
image: docker.elastic.co/kibana/kibana:7.15.1
ports:
- 5601:5601
depends_on:
- elasticsearch
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
networks:
- esnet
volumes:
esdata:
driver: local
networks:
esnet:
Dev toolsは、ここから使えます。
upsertとは(やりたいこと)
- documentが存在しない場合は、insertする
- documentが存在する場合は、updateする
- ただし全置換ではない
- 更新対象のkey:valueだけをPOSTしたら、そのkeyに対応するvalueだけがupdateされる
- 存在しないkeyの場合は追加される
insert(指定したidのdocumentが存在しない場合)
documentは、bulk APIでrequestするものとします。
公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘index’ を指定するとよいようです。
実際のbulk APIでの例を示します。
POST _bulk
{"index":{"_index":"index1","_id":"1"}}
{"field":"value"}
responseは以下となりました。
{
"took" : 2434,
"errors" : false,
"items" : [
{
"index" : {
"_index" : "index1",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
こうすることで、indexが"index1"、idが"1"のdocumentがindexされます。
responseを確認してみましょう。
ちゃんとindexされていました。
# GET index1/_doc/1
{
"_index" : "index1",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"field" : "value"
}
}
ところが、公式を参照すると、こうも書かれています。
If the document exists, replaces the document and increments the version. The following line must contain the source data to be indexed.
要するに、後勝ちで、documentの全fieldがreplaceされてしまいます。
updateといえばそうかもしれませんが、必要なところだけ更新したいと思った時には、このままではうまく使えません。
事前に対象documentをgetしておいて、必要なところだけ差し替えたdocumentを作り、その後にPOSTしなければいけません。
アプリ側に余計な処理の作りこみが発生してしまいます。。。。
DB側でできる処理は、できるだけDB側に寄せたいです。
update(指定したidのdocumentが存在する場合)
公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘update’ を指定するとよいようです。
実際のbulk APIでの例を示します。
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1"} }
{"doc" : {"field" : "value2"} }
responseは、以下となりました。
{
"took" : 19,
"errors" : false,
"items" : [
{
"update" : {
"_index" : "index1",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1,
"status" : 200
}
}
]
}
indexされたはずの内容を確認してみましょう。
responseは以下の通りです。
key "field" の値が、ちゃんと "value2" となっていました。
# GET index1/_doc/1
{
"_index" : "index1",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"field" : "value2"
}
}
しかし、これでもやりたかったupsertはできません。
なぜなら、updateの場合は、対象documentがないとエラーになるようです。
つまり、updateするdocument idが存在しない場合、errorとなってしまいます。
やりたかったupsert(本命)
指定したidのdocumentが存在し、あと勝ちで部分的に更新したい場合です。
こちらが本命です。
結論からすると、こちらにやり方が書かれていました。
updateのオプションに、doc_as_upsert : true と指定しろと書かれています。
書き方のサンプルが、公式のここに書かれていました。
まず、既存のindexを削除してしまいます。
# DELETE index1
{
"acknowledged" : true
}
upsertの挙動を確認するために、連続して以下3つのqueryをPOSTします。
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field" : "value"}, "doc_as_upsert" : true }
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2", "field5" : "value5"}, "doc_as_upsert" : true }
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2-1"}, "doc_as_upsert" : true }
上記3つのPOSTの結果を確認してみます。
# GET index1/_doc/1
{
"_index" : "index1",
"_type" : "_doc",
"_id" : "1",
"_version" : 5,
"_seq_no" : 4,
"_primary_term" : 1,
"found" : true,
"_source" : {
"field" : "value", # <- 1. insert: 最初に追加
"field2" : "value2-1", # <- 3. update: value2 を value2-1 へ更新
"field5" : "value5" # <- 2. upsert: keyとvalueを追加
}
}
意図通り、upsertが出来ています。
更新したいkey : valueだけをupdateすることも出来ました。