
TL; DR
- indexとdocument idの組み合わせてデータを管理し、不要なデータはindex単位で削除していく
- 公式で使用されている通り、時系列データを1日単位でindexを作る
 - 古い不要なindexを1日単位で削除できるようにする
 - index単位の処理は高速
 
 - document idもElasticSearchに任せ、こちらで指定しないほうが性能的に良い
 
なので、つまり
- document idをユーザ側で指定することは非推奨
- どこかに書いてあったはずですが失念しました。。。
 
 - その指定したdocument idのdocumentをupdateすることは非推奨
 - 当然、upsertも非推奨
 
でも、必要に迫らせてやらざるを得ない場合もあります。
これをどうやるか?というお話です。
確認環境
docker-composeで環境を準備します。
kibanaのDev toolsで確認することにします。
version: '3.9'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - node.master=true
      - node.data=true
      - xpack.security.enabled=false
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - esnet
  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.1
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    networks:
      - esnet
volumes:
  esdata:
    driver: local
networks:
  esnet:
Dev toolsは、ここから使えます。


upsertとは(やりたいこと)
- documentが存在しない場合は、insertする
 - documentが存在する場合は、updateする
- ただし全置換ではない
 - 更新対象のkey:valueだけをPOSTしたら、そのkeyに対応するvalueだけがupdateされる
 - 存在しないkeyの場合は追加される
 
 
insert(指定したidのdocumentが存在しない場合)
documentは、bulk APIでrequestするものとします。
公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘index’ を指定するとよいようです。
実際のbulk APIでの例を示します。
POST _bulk
{"index":{"_index":"index1","_id":"1"}}
{"field":"value"}
responseは以下となりました。
{
  "took" : 2434,
  "errors" : false,
  "items" : [
    {
      "index" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}
こうすることで、indexが"index1"、idが"1"のdocumentがindexされます。
responseを確認してみましょう。
ちゃんとindexされていました。
# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value"
  }
}
ところが、公式を参照すると、こうも書かれています。
If the document exists, replaces the document and increments the version. The following line must contain the source data to be indexed.
要するに、後勝ちで、documentの全fieldがreplaceされてしまいます。
updateといえばそうかもしれませんが、必要なところだけ更新したいと思った時には、このままではうまく使えません。
事前に対象documentをgetしておいて、必要なところだけ差し替えたdocumentを作り、その後にPOSTしなければいけません。
アプリ側に余計な処理の作りこみが発生してしまいます。。。。
DB側でできる処理は、できるだけDB側に寄せたいです。
update(指定したidのdocumentが存在する場合)
公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘update’ を指定するとよいようです。
実際のbulk APIでの例を示します。
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1"} }
{"doc" : {"field" : "value2"} }
responseは、以下となりました。
{
  "took" : 19,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 2,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 1,
        "_primary_term" : 1,
        "status" : 200
      }
    }
  ]
}
indexされたはずの内容を確認してみましょう。
responseは以下の通りです。
key "field" の値が、ちゃんと "value2" となっていました。
# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value2"
  }
}
しかし、これでもやりたかったupsertはできません。
なぜなら、updateの場合は、対象documentがないとエラーになるようです。
つまり、updateするdocument idが存在しない場合、errorとなってしまいます。
やりたかったupsert(本命)
指定したidのdocumentが存在し、あと勝ちで部分的に更新したい場合です。
こちらが本命です。
結論からすると、こちらにやり方が書かれていました。
updateのオプションに、doc_as_upsert : true と指定しろと書かれています。
書き方のサンプルが、公式のここに書かれていました。
まず、既存のindexを削除してしまいます。
# DELETE index1
{
  "acknowledged" : true
}
upsertの挙動を確認するために、連続して以下3つのqueryをPOSTします。
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field" : "value"}, "doc_as_upsert" : true }
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2", "field5" : "value5"}, "doc_as_upsert" : true }
# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2-1"}, "doc_as_upsert" : true }
上記3つのPOSTの結果を確認してみます。
# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value",     # <- 1. insert: 最初に追加
    "field2" : "value2-1", # <- 3. update: value2 を value2-1 へ更新
    "field5" : "value5"    # <- 2. upsert: keyとvalueを追加
  }
}
意図通り、upsertが出来ています。
更新したいkey : valueだけをupdateすることも出来ました。