06、Java Logstash_Kibana-LogStash+MySQL+Elasticsearch 实现数据增量导入(双写一致)

LogStash+MySQL+Elasticsearch 实现数据增量导入(双写一致)

  • 1.环境准备

  • 1.1 上传数据库驱动

  • 1.2 准备数据库表格

  • 2.编写 LogStash 配置文件

  • 3.安装 logstash-input-jdbc 插件

  • 4.启动测试

  • 原有系统中,如果使用了缓存应用,全文搜索服务等额外数据存储,则在代码实现中,要保证双写一致,即写数据库的同时,把数据的变量同步到其他存储中

  • 如果使用 LogStash,则可以实现数据的增量导入。

  • 思路:写数据到数据库,LogStash 监听数据库中数据的变化,把增量数据读取,并保存到 ES 中

1.环境准备

在开始之前,我们需要准备好环境,包括安装好LogStash和Elasticsearch,确保它们能够正常运行。

1.1 上传数据库驱动

LogStash需要使用JDBC与MySQL数据库进行通信,但它本身并不包含数据库驱动,因此我们需要手动提供MySQL的JDBC驱动包。

  • 对于Logstash 5.x以及6.3.*以下版本,你可以将驱动上传到LogStash安装目录下的任意位置。
  • 在Logstash 6.8.4版本中,驱动包的上传位置要求为:$LOGSTASH_HOME/logstash-core/lib/jars/

确保驱动上传至正确的位置,以便LogStash可以正确地连接到MySQL数据库。

1.2 准备数据库表格

接下来,我们需要准备实际的数据库表格。假设我们在电商系统中有一个商品表tb_item,表结构如下所示:

CREATE TABLE `tb_item` (
  `id` bigint(20) NOT NULL COMMENT '商品id,同时也是商品编号',
  `title` varchar(100) NOT NULL COMMENT '商品标题',
  `sell_point` varchar(500) DEFAULT NULL COMMENT '商品卖点',
  `price` bigint(20) NOT NULL COMMENT '商品价格,单位为:分',
  `num` int(10) NOT NULL COMMENT '库存数量',
  `barcode` varchar(30) DEFAULT NULL COMMENT '商品条形码',
  `image` varchar(500) DEFAULT NULL COMMENT '商品图片',
  `cid` bigint(10) NOT NULL COMMENT '所属类目,叶子类目',
  `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '商品状态,1-正常,2-下架,3-删除',
  `created` datetime NOT NULL COMMENT '创建时间',
  `updated` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `cid` (`cid`),
  KEY `status` (`status`),
  KEY `updated` (`updated`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='商品表';

我们将使用updated字段作为增量导入的定位字段,通过对比这个字段的值来判断哪些数据是最新的,并需要导入到Elasticsearch中。

2.编写 LogStash 配置文件

$LOGSTASH_HOME/config/目录下创建一个名为ego-items-db2es.conf的配置文件,使用以下配置来设置LogStash如何从MySQL读取数据并将其写入到Elasticsearch。

input {
  jdbc {
    # MySQL连接配置
    jdbc_connection_string => "jdbc:mysql://192.168.1.2:3306/ego?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
    jdbc_user => "root"
    jdbc_password => "root"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

    # 分页和定时配置
    jdbc_paging_enabled => true
    jdbc_page_size => "2000"
    jdbc_default_timezone => "Asia/Shanghai"

    # SQL查询和增量导入配置
    statement => "SELECT id, title, sell_point, price, image, updated FROM tb_item WHERE updated >= :sql_last_value ORDER BY updated ASC"
    schedule => "* * * * *"
    use_column_value => true
    tracking_column => "updated"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./ego-items-db2es-last-value"
    clean_run => false
    lowercase_column_names => false
  }
}

output {
  elasticsearch {
    hosts => ["http://192.168.89.140:9200", "http://192.168.89.141:9200"]
    index => "ego-items-index"
    action => "index"
    document_id => "%{id}"
  }
}

3.安装 logstash-input-jdbc 插件

在LogStash 6.3.x和5.x版本中,logstash-input-jdbc插件通常默认安装。如果你使用的是6.8.4版本的LogStash,你需要手动安装此插件:

$LOGSTASH_HOME/bin/logstash-plugin install logstash-input-jdbc

4.启动测试

一切配置就绪后,我们可以启动LogStash来测试是否可以成功将数据从MySQL同步到Elasticsearch。

启动LogStash的命令如下:

bin/logstash -f config/ego-items-db2es.conf

启动后,LogStash会根据配置文件中定义的schedule定期执行SQL语句,读取MySQL中变更的数据,并将增量数据同步到Elasticsearch中,从而实现双写一致性。

通过使用LogStash来同步MySQL数据到Elasticsearch,我们可以避免在应用程序代码中进行双写操作,简化系统架构,同时保证数据的实时性和一致性。