我的用例很簡單。我在輸入中有一個 Kafka,在 Elasticsearch 中有一些索引(主題名稱 === 索引名稱),其中索引名稱與我們在應用程式中使用的物體相同,例如“建筑物”、“汽車”、“公共汽車” (僅舉例)。
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
if [@metadata][kafka][topic] == 'cars' {
mutate {
convert => {
"car_id" => "integer"
}
add_field => {
'id' => "%{car_id}"
}
}
}
if [@metadata][kafka][topic] == 'bus' {
mutate {
convert => {
"bus_id" => "integer"
}
add_field => {
'id' => "%{bus_id}"
}
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{car_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{car_id}'
}
}
}
if [@metadata][kafka][topic] == 'cars' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{bus_id}'
}
if '%{[isDelete]}' {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{bus_id}'
}
}
}
要從 Logstash 更新/洗掉檔案,我需要使用他們的 id。但你會明白,我不想為每個物體設定 50 個條件,我更喜歡分解。
我想重新使用我在過濾器部分中創建的“id”,在輸出中使用它在document_id中。
你知道我該怎么做嗎?
uj5u.com熱心網友回復:
你可以這樣做:
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ['cars', 'bus']
decorate_events => true
codec => 'json'
}
}
filter {
translate {
source => "[@metadata][topic]"
target => "[@metadata][id_field]"
dictionary => {
"cars" => "car_id"
"bus" => "bus_id"
}
fallback => "no_id"
}
ruby {
code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
}
}
output {
if '%{[isDelete]}' in [message] {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
action => 'delete'
document_id => '%{id}'
}
} else {
elasticsearch {
hosts => "elasticsearch:9200"
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][kafka][topic]}"
doc_as_upsert => true
action => 'update'
document_id => '%{id}'
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/481754.html
上一篇:嘗試在elasticsearch7.8.1中建立索引時,出現錯誤說“欄位”太大,必須是<=32766有解決辦法嗎?