使用Logstash保持Elasticsearch与M

一、同步数据

Elasticsearch经常与关系型数据库协作,为业务提供搜索功能,而官方提供了Logstash中间件用于数据同步

Elasticsearch关联关系型数据库需要满足以下几个条件:

  • Elasticsearch 中的 _id字段必须设置为 MySQL 中的 "id" 字段,用于保证es中间件与关系型数据库建立映射关系。
    • 如果MySQL更新了某些数据,那么这些操作会导致数据同步。但是,这个同步操作会覆盖掉ES中的整个document,而不是某些特定字段。
  • 当向 MySQL 中插入或更新数据时,建议设置update-time字段作为更新的标识,以便可以捕获这个更新。

相关配置如下:

  • MySQL:8.0.24
  • Elasticsearch:7.15.2
  • Logstash:7.15.2:提供了ElasticSearch插件用于推送数据到ES中

二、解决删除操作无法同步

当Logstash使用ES插件推送MySQL数据时,只有insertupdate操作会进行同步,但是delete操作不会传播到Elasticsearch数据库中。

解决方案:

  1. 通过软删除处理。软删除的表一般会包含 is_deleted字段,可以在查询时对该字段进行过滤将相关文档排除在外。然后,通过定时任务从 MySQL 和 Elasticsearch 中删除该数据。
  2. 通过业务处理删除。在删除时,设置事务将删除同时传播到 MySQL、Elasticsearch中。但是,事务错误时可能会导致数据不一致。

软删除处理需要配置logstash.conf文件,例如:

input {
	jdbc {
		# jdbc_driver_library => "mysql-connector-java-8.0.27.jar"
		type => "jdbc"
		jdbc_connection_string => "jdbc:mysql://cos-mysql:3306/cosimcloud?characterEncoding=UTF-8&autoReconnect=true"
		jdbc_user => "root"
		jdbc_password => "xxx"
		jdbc_driver_class => "Java::com.mysql.cj.jdbc.Driver"
		statement => "SELECT fid AS id, fis_deleted as deleted FROM t1"
		connection_retry_attempts => "3"
		jdbc_validate_connection => "true"
		jdbc_validation_timeout => "600"
		jdbc_paging_enabled => "true"
		jdbc_page_size => "5000"
		# 设置定时任务
		schedule => "* * * * *"
		# 基于字段进行增量更新
		tracking_column => "fupdated_at"
		tracking_column_type => "timestamp"
		lowercase_column_names => false
	}
	
}

filter {
	if [deleted] {
        mutate {
            add_field => {
                "[@metadata][elasticsearch_action]" => "delete"
            }
        }
        mutate {
            remove_field => [ "deleted","@version","@timestamp" ]
        }
    } else {
        mutate {
            add_field => {
                "[@metadata][elasticsearch_action]" => "index"
            }
        }
        mutate {
            remove_field => [ "deleted","@version","@timestamp" ]
        }
    }
}


output {
	elasticsearch {
		hosts => "es01:9200"
		user => "elastic"
		password => "elastic"
		ecs_compatibility => disabled
		manage_template => true
		template_overwrite => true
		template => "/usr/share/logstash/template/logstash-template.json"
		template_name => "logstash-mysql"
		index => "logstash-fmu"
		pipeline => "fmu-tag"
		action => "%{[@metadata][elasticsearch_action]}"
		document_id => "%{id}"
	}
}
复制代码

需要配置:

  • 在input-jdbc-statement 中,需要查询软删除字段deleted
  • 在filter中,判断deleted并且使用mutate处理数据
  • output中elasticsearch的action字段
    • 配置action导致template无法生成索引(待处理)

三、相关文档

如何使用 Logstash 和 JDBC 确保 Elasticsearch 与关系型数据库保持同步