本文共 4718 字,大约阅读时间需要 15 分钟。
Logstash是ES下的一款开源软件,它能够同时 从多个来源采集数据、转换数据,然后将数据发送到Eleasticsearch中创建索引。
我这里是用于同步MySQL数据库和ES索引库的数据
配置mysql.conf(使Logstash连接数据库进行数据采集)mysql.conf文件分两部分 input、output
input:表示将数据库数据输入到Logstash中处理
output:表示将处理好的数据输出给ES索引库
input { stdin { } jdbc { #连接mysql语句 jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC" # the user we wish to excute our statement as#账号 jdbc_user => "root"#密码 jdbc_password => admin # the path to our downloaded jdbc driver #mysql jdbc 连接驱动jdbc_driver_library => "F:\elasticsearch\logstash-6.2.1/mysql-connector-java-5.0.8-bin.jar" # jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #要执行的sql文件 #statement_filepath => "/conf/course.sql" #每分钟要执行采集mysql数据的sql语句(当新添加的数据timestamp大于记录的最后一次sql_last_value时间则会选择该数据) #ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时 statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)" #定时配置(一分钟执行sql一次) schedule => "* * * * *" record_last_run => true #每次从mysql数据库采集的时间的文件储存位置 last_run_metadata_path => "F:/elasticsearch/logstash-6.2.1/config/logstash_metadata" }}output { elasticsearch { #ES的ip地址和端口 hosts => "localhost:9200" #hosts => ["localhost:9200","localhost:9202","localhost:9203"] #ES索引库名称 index => "xc_course" #要声明哪个属性作为索引的id document_id => "%{id}" #索引类型(表) document_type => "doc" #索引映射格式的模板文件(可配置多种映射类型,要注明该映射类型名称)(将mysql表数据转换成映射格式) template =>"F:/elasticsearch/logstash-6.2.1/config/xc_course_template.json" #指定模板文件中的模板名称 template_name =>"xc_course" template_overwrite =>"true" } stdout { #日志输出 codec => json_lines }}
jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
(每分钟)sql语句代表了当该数据的 timestamp 字段属性大于:sql_last_value 时,则会采集该条数据到ES索引库中
:sql_last_value 储存在
last_run_metadata_path => "F:/elasticsearch/logstash-6.2.1/config/logstash_metadata"
xc_course_template.json索引映射格式的模板文件
{ "mappings" : { "doc" : { "properties" : { "charge" : { "type" : "keyword" }, "description" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "end_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "expires" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "grade" : { "type" : "keyword" }, "id" : { "type" : "keyword" }, "mt" : { "type" : "keyword" }, "name" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "pic" : { "index" : false, "type" : "keyword" }, "price" : { "type" : "float" }, "price_old" : { "type" : "float" }, "pub_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "qq" : { "index" : false, "type" : "keyword" }, "st" : { "type" : "keyword" }, "start_time" : { "format" : "yyyy-MM-dd HH:mm:ss", "type" : "date" }, "status" : { "type" : "keyword" }, "studymodel" : { "type" : "keyword" }, "teachmode" : { "type" : "keyword" }, "teachplan" : { "analyzer" : "ik_max_word", "search_analyzer" : "ik_smart", "type" : "text" }, "users" : { "index" : false, "type" : "text" }, "valid" : { "type" : "keyword" } } } }, "template" : "xc_course" ##声明模板名称}
进入bin文件夹下打开窗口命令:logstash.bat -f …\config\mysql.conf
报错:加载不了主类异常,参考
浏览器输入:loaclhost:9600测试是否启动成功
浏览器输入:loaclhost:9100打开head插件查看数据,,表示成功将数据库数据引入到ES索引库中
转载地址:http://qbern.baihongyu.com/