ElasticSearchで、documentをupsertする
19
- 10月
2021
Posted By : boomin
ElasticSearchで、documentをupsertする
Advertisements

TL; DR

  • indexとdocument idの組み合わせてデータを管理し、不要なデータはindex単位で削除していく
    • 公式で使用されている通り、時系列データを1日単位でindexを作る
    • 古い不要なindexを1日単位で削除できるようにする
    • index単位の処理は高速
  • document idもElasticSearchに任せ、こちらで指定しないほうが性能的に良い

なので、つまり

  • document idをユーザ側で指定することは非推奨
    • どこかに書いてあったはずですが失念しました。。。
  • その指定したdocument idのdocumentをupdateすることは非推奨
  • 当然、upsertも非推奨

でも、必要に迫らせてやらざるを得ない場合もあります。
これをどうやるか?というお話です。

確認環境

docker-composeで環境を準備します。
kibanaのDev toolsで確認することにします。

version: '3.9'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - node.master=true
      - node.data=true
      - xpack.security.enabled=false
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - esnet

  kibana:
    image: docker.elastic.co/kibana/kibana:7.15.1
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    networks:
      - esnet

volumes:
  esdata:
    driver: local

networks:
  esnet:

Dev toolsは、ここから使えます。

upsertとは(やりたいこと)

  • documentが存在しない場合は、insertする
  • documentが存在する場合は、updateする
    • ただし全置換ではない
    • 更新対象のkey:valueだけをPOSTしたら、そのkeyに対応するvalueだけがupdateされる
    • 存在しないkeyの場合は追加される

insert(指定したidのdocumentが存在しない場合)

documentは、bulk APIでrequestするものとします。

公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘index’ を指定するとよいようです。

実際のbulk APIでの例を示します。

POST _bulk
{"index":{"_index":"index1","_id":"1"}}
{"field":"value"}

responseは以下となりました。

{
  "took" : 2434,
  "errors" : false,
  "items" : [
    {
      "index" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

こうすることで、indexが"index1"、idが"1"のdocumentがindexされます。
responseを確認してみましょう。
ちゃんとindexされていました。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value"
  }
}

ところが、公式を参照すると、こうも書かれています。

If the document exists, replaces the document and increments the version. The following line must contain the source data to be indexed.

要するに、後勝ちで、documentの全fieldがreplaceされてしまいます。
updateといえばそうかもしれませんが、必要なところだけ更新したいと思った時には、このままではうまく使えません。

事前に対象documentをgetしておいて、必要なところだけ差し替えたdocumentを作り、その後にPOSTしなければいけません。 アプリ側に余計な処理の作りこみが発生してしまいます。。。。
DB側でできる処理は、できるだけDB側に寄せたいです。

update(指定したidのdocumentが存在する場合)

公式のここを見ると、bulk APIでPOSTする際、request bodyに、 ‘_op_type’: ‘update’ を指定するとよいようです。

実際のbulk APIでの例を示します。

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1"} }
{"doc" : {"field" : "value2"} }

responseは、以下となりました。

{
  "took" : 19,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "index1",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 2,
        "result" : "updated",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 1,
        "_primary_term" : 1,
        "status" : 200
      }
    }
  ]
}

indexされたはずの内容を確認してみましょう。
responseは以下の通りです。
key "field" の値が、ちゃんと "value2" となっていました。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value2"
  }
}

しかし、これでもやりたかったupsertはできません。
なぜなら、updateの場合は、対象documentがないとエラーになるようです。
つまり、updateするdocument idが存在しない場合、errorとなってしまいます。

やりたかったupsert(本命)

指定したidのdocumentが存在し、あと勝ちで部分的に更新したい場合です。
こちらが本命です。

結論からすると、こちらにやり方が書かれていました
updateのオプションに、doc_as_upsert : true と指定しろと書かれています。
書き方のサンプルが、公式のここに書かれていました

まず、既存のindexを削除してしまいます。

# DELETE index1
{
  "acknowledged" : true
}

upsertの挙動を確認するために、連続して以下3つのqueryをPOSTします。

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field" : "value"}, "doc_as_upsert" : true }

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2", "field5" : "value5"}, "doc_as_upsert" : true }

# POST _bulk
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{"doc" : {"field2" : "value2-1"}, "doc_as_upsert" : true }

上記3つのPOSTの結果を確認してみます。

# GET index1/_doc/1
{
  "_index" : "index1",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "field" : "value",     # <- 1. insert: 最初に追加
    "field2" : "value2-1", # <- 3. update: value2 を value2-1 へ更新
    "field5" : "value5"    # <- 2. upsert: keyとvalueを追加
  }
}

意図通り、upsertが出来ています。
更新したいkey : valueだけをupdateすることも出来ました。

Advertisements

コメントを残す