文章十五:ElasticSearch 运用ingest加工索引数据
ingest简单介绍他是es中的独立的数据处理加工的模块等同于是轻量级的ETL(数据的抽取转换加载)类似于logstash,使用的是popeline的管道处理模型。应用场景数据写入数据更新构建大宽表索引重建共享处理常规应用实战创建ingestPUT _ingest/pipeline/shijian_test { description: 尝试创建ingest pipeline, processors: [ { set: { field: shijian, value: 小黑黑 } }, { remove: { field: age } } ] }虚拟执行命令POST _ingest/pipeline/shijian_test/_simulate { docs: [ { _index:chen, _source:{ name:lihua } } ] }ingest场景应用-数据写入真实场景中执行数据写入执行pipeLine进行数据的处理POST ingest_test/_doc?pipelineshijian_test { name:lisi, age:11, class:11 } PUT ingest_test/_doc/1?pipelineshijian_test { name:lihua, age:10 }使用这种方式进行数据的插入的话这个数据在执行的时候就会被ingest进行处理达到我们数据处理的作用。ingest场景应用-数据更新原始数据一开始就在我们的索引中但是需要对他进行处理使用ingest进行数据的更新POST ingest_test/_update_by_query?pipelineshijian_test { query: { match: { name: lihua } } }ingest场景应用-索引重建在使用reindex重建索引时指定pipeline这个字段POST _reindex { source: { index:kibana_sample_data_flights }, dest: { index: ingest_test, pipeline: shijian_test } }ingest访问索引meta元数据访问PUT _ingest/pipeline/shijian_test { description: 创建ingest, processors: [ { set: { field: _meta.index, value: {{_index}} } }, { set: { field: _meta.id, value: {{_id}} } }, { set: { field: _meta.timestamp, value: {{_ingest.timestamp}} } } ] }通过这种方式我们可以在数据的meta数据中拿到我们需要的数据。PUT index_test/_doc/1?pipelineshijian_test {name:lihua} GET index_test/_search { took: 466, timed_out: false, _shards: { total: 1, successful: 1, skipped: 0, failed: 0 }, hits: { total: { value: 1, relation: eq }, max_score: 1, hits: [ { _index: index_test, _id: 1, _score: 1, _source: { name: lihua, _meta: { index: index_test, id: 1, timestamp: 2026-04-29T13:19:55.617899914Z } } } ] } }ingest访问索引source源数据在创建的时候我们可以从原有的数据中获取到_source中的数据的语法如下PUT _ingest/pipeline/shijian_test { description: 创建ingest, processors: [ { set: { field: new_name, value: {{name}} } }, { set: { field: new_source_name, value: {{_source.name}} } } ] }高级实战应用if逻辑条件判断PUT _ingest/pipeline/shijian_test { description: 测试使用if语句, processors: [ { set: { if: ctx.age10, field: class, value: 3 } } ] }ignore_failure和on_failure属性创建ingest之后如果我们在命令中执行了实际上没有的字段或者是出现了错误的时候终端请求抛出异常下面的例子会展示出来这个问题在实际的开发中我们可以通过高级的属性来解决这个问题。ignore_failure默认是false,是否在执行数据处理的时候当前处理的字段出现错误时忽略这个错误如果忽略则继续执行on_failure在不忽略问题的时候如果出现问题的时候执行什么语句。PUT _ingest/pipeline/shijian_test { description: 测试使用if语句, processors: [ { set: { if: ctx.age10, field: class, value: 3 } }, { remove: { field: text, ignore_failure: false, on_failure: [ { remove:{ filed:name } } ] } } ] }但是在实际开发中出现问题直接报错还是很正常的事情可以帮助我们发现问题和解决问题。pipeline多管道执行我们在执行之前创建多个ingest管道之后使用多个管道执行任务#创建两个管道 PUT _ingest/pipeline/test_001 { description: 001号, processors: [ { set: { field: name, value: lihua } } ] } PUT _ingest/pipeline/test_002 { description: 002号, processors: [ { set: { field: age, value: 11 } } ] } #创建高级管道,绑定两个管道 PUT _ingest/pipeline/number_sum_pipeline { processors: [ { pipeline: { name: test_001 } }, { pipeline: { name: test_002 } } ] } #查询数据 GET index_test/_search #执行命令插入数据 PUT index_test/_doc/1?pipelinenumber_sum_pipeline { class:1 }scriptingest处理数据问题假设我们在实际的存储中存储的一个人的姓名但是是分开存储的这是我们使用脚本将他合并到一起使用这个例子展示一下应用。PUT _ingest/pipeline/name_ingest { processors: [ { script: { source: String first_name ctx.f_name; String last_name ctx.l_name; ctx.name [first_name,first_name,last_name] , lang: painless } } ] } PUT index_test/_doc/1?pipelinename_ingest { f_name:li, l_name:hua }执行之后进行查询时他的返回结果是{ took: 68, timed_out: false, _shards: { total: 1, successful: 1, skipped: 0, failed: 0 }, hits: { total: { value: 1, relation: eq }, max_score: 1, hits: [ { _index: index_test, _id: 1, _score: 1, _source: { f_name: li, name: [ li, li, hua ], l_name: hua } } ] } }
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2570819.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!