mysql 数据导入 elasticsearch

1.首先安装logstash
参考: https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

2.编写创建索引语句

curl -H "Content-Type: application/json" -X PUT 'localhost:9200/md_order' -d '
{
  "mappings": {
        "orders": {
            "properties": {
                "orderno": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "productid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "deptcode": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "gradecode": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "brandname": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "orderdate": {
                    "type": "date"
                },
                "brandcode": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "rowguid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "custmgrid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "deptname": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "gradename": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "pricetypename": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "qtydemand": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "customerid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "retailprice": {
                    "type": "float"
                },
                "pricetype": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "productname": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "qtyorder": {
                    "type": "integer_range"
                },
                "custmgrname": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                },
                "customername": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "ignore_above": 256,
                            "type": "keyword"
                        }
                    }
                }
            }
        }
  }
}
'

3.编写导入脚本

input {
    jdbc {
        #配置数据库连接 ip 和端口,以及数据库   
        jdbc_connection_string => "jdbc:mysql://localhost:3306/tobacco?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=CST"
        #配置数据库用户名
        jdbc_user => "root"
        #配置数据库密码
        jdbc_password => "123456"
        jdbc_validate_connection => true
        #驱动,各个数据库都有对应的驱动,需自己下载
        jdbc_driver_library => "./libs/mysql-connector-java-8.0.11.jar"
        #不同数据库有不同的 class 配置
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        #你的SQL的位置,当然,你的SQL也可以直接写在这里。
        statement => "SELECT * from md_order"
        #statement => SELECT * FROM tabeName t WHERE t.creat_time > :sql_last_value
        #statement_filepath => "/etc/logstash/statement_file.d/my_info.sql" 
        #数据类型,标明你属于那一方势力。单了ES哪里好给你安排不同的山头。 type => "my_info"
        #注意:外载的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句, 
        #如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        # 定时器 多久执行一次SQL,默认是一分钟
        # schedule => 分 时 天 月 年  
        # schedule =>  22     表示每天22点执行一次
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        #是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
        #此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
        #如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
        tracking_column => orderdate
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        #record_last_run => true
        #们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :sql_last_value 取得就是该文件中的值
        #last_run_metadata_path => ".my_info"
        #是否将字段名称转小写。
        #这里有个小的提示,如果你这前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
        #因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
        #lowercase_column_names => false
    }
}
output {
    elasticsearch {
        index => "md_order"
        document_type => "orders"
        document_id => "%{rowguid}"
        hosts => "172.16.1.160"
    }
}

4.运行脚本

  logstash -f config-mysql/mysql.conf

关于Zeno Chen

本人涉及的领域较多,杂而不精 程序设计语言: Perl, Java, PHP, Python; 数据库系统: MySQL,Oracle; 偶尔做做电路板的开发,主攻STM32单片机
此条目发表在NoSQL分类目录。将固定链接加入收藏夹。