首页数据库PostgreSQL 数据同步到ES 搭建操作

PostgreSQL 数据同步到ES 搭建操作

时间2024-02-29 17:37:03发布访客分类数据库浏览867
导读:收集整理的这篇文章主要介绍了PostgreSQL 数据同步到ES 搭建操作,觉得挺不错的,现在分享给大家,也给大家做个参考。 安装python 和dev 开发包[root@rtm2 Pa...
收集整理的这篇文章主要介绍了PostgreSQL 数据同步到ES 搭建操作,觉得挺不错的,现在分享给大家,也给大家做个参考。

安装python 和dev 开发包

[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm准备中...       ################################# [100%]正在升级/安装... 1:python-devel-2.7.5-58.el7  ################################# [100%][root@rtm2 Packages]# ls

安装 multicorn

[root@rtm2 multicorn-1.3.5]# makePython version is 2.7gcc -Wall -Wmissing-PRototyPEs -Wpointer-arITh -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgSQL-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.cgcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.cgcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.cgcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.cgcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic.//preflight-check.shcp sql/multicorn.sql sql/multicorn--1.3.5.sql[root@rtm2 multicorn-1.3.5]# make installPython version is 2.7...

安装pg-es-fdw-master

[root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master[root@rtm2 pg-es-fdw-master]# lsdemo.sh dite LICENSE README.md SETUP.py[root@rtm2 pg-es-fdw-master]# python setup.py buildrunning buildrunning build_pycreating buildcreating build/libcreating build/lib/ditecopying dite/__init__.py ->
     build/lib/dite[root@rtm2 pg-es-fdw-master]# python setup.py installrunning installrunning bdist_eggrunning egg_infocreating dite.egg-infowriting dite.egg-info/PKG-INFO

安装插件 multicorn

[postgres@rtm2 ~]$ psqlpsql (10.3)Type "help" for help.postgres=# select * From pg_extension;
     extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition---------+----------+--------------+----------------+------------+-----------+-------------- plpgsql |  10 |   11 | f    | 1.0  |   |(1 row)postgres=# CREATE EXTENSION multicorn;
    CREATE EXTENSIONpostgres=# psqlpostgres=# select * from pg_extension;
     extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition-----------+----------+--------------+----------------+------------+-----------+-------------- plpgsql |  10 |   11 | f    | 1.0  |   | multicorn |  10 |   2200 | t    | 1.3.5  |   |(2 rows)postgres=# CREATE SERVER multicorn_es FOReiGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticseArchFDW');
    CREATE SERVERpostgres=#

es

[root@rtm2 config]# vi elasticsearch.yMLnode.name: "es-node1"network.host: 192.168.31.121discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
[root@rtm2 config]# vi /etc/Sysctl.confvm.max_map_count=262144sysctl -p[root@rtm2 config]# vi /etc/security/limits.conf# End of file root soft nofile 65536root hard nofile 65536root soft nproc 4096root hard nproc 4096~

启动es

[root@rtm2 bin]# lselasticsearch  elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.batelasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin       service.bat[root@rtm2 bin]# ./bin/elasticsearch
test=# CREATE FOREIGN TABLE pp_es (id Bigint,age bigint) SERVER multicorn_es OPTIONS (hosttest(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');
    CREATE FOREIGN TABLEtest=#

创建触发器和外部表

test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$test$# BEGINtest$# INSERT INTO pp_es (id, age) VALUEStest$# (NEW.id, NEW.age);
    test$# RETURN NEW;
    test$# END;
    test$# $def$ LANGUAGE plpgsql;
    CREATE FUNCTIONtest=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR each ROW EXECUTE PROCEDURE index_pp();
    CREATE TRIGGERtest=#

新增数据测试

test=# insert into pp (id,age) values (1,11);
    INSERT 0 1test=# select * from pp;
     id | age----+----- 1 | 11(1 row)test=#

检查es数据

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&
pretty'{
 "took" : 104, "timed_out" : false, "_shards" : {
 "total" : 5, "successful" : 5, "failed" : 0 }
, "hits" : {
 "total" : 2, "max_score" : 1.0, "hits" : [ {
  "_index" : "es-node1",  "_type" : "pp",  "_id" : "1",  "_score" : 1.0,  "_source":{
"age": "11"}
 }
, {
  "_index" : "es-node1",  "_type" : "pp",  "_id" : "2",  "_score" : 1.0,  "_source":{
"age": "22"}
 }
 ] }
}
    [root@rtm2 ~]#

创建更新触发器

test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$BEGINUPDATE pp_es SETid = NEW.id,age = NEW.agewhere id =NEW.id;
    RETURN NEW;
    END;
    $def$ LANGUAGE plpgsql;
    CREATE FUNCTIONtest=# ^Ctest=#test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCTtest(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();
    CREATE TRIGGERtest=#

更新表数据

test=# select * from pp;
     id | age----+----- 1 | 11 2 | 22 3 | 22(3 rows)test=# update pp a set a.age = 33 where a.id = 3;
    ERROR: column "a" of relation "pp" does not existLINE 1: update pp a set a.age = 33 where a.id = 3;
          ^test=# update pp set age = 33 where id = 3;
    UPDATE 1test=# select * from pp;
     id | age----+----- 1 | 11 2 | 22 3 | 33(3 rows)test=#

es查询变更

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&
pretty'{
 "took" : 4, "timed_out" : false, "_shards" : {
 "total" : 5, "successful" : 5, "failed" : 0 }
, "hits" : {
 "total" : 3, "max_score" : 1.0, "hits" : [ {
  "_index" : "es-node1",  "_type" : "pp",  "_id" : "1",  "_score" : 1.0,  "_source":{
"age": "11"}
 }
, {
  "_index" : "es-node1",  "_type" : "pp",  "_id" : "2",  "_score" : 1.0,  "_source":{
"age": "22"}
 }
, {
  "_index" : "es-node1",  "_type" : "pp",  "_id" : "3",  "_score" : 1.0,  "_source":{
"age": "33"}
 }
 ] }
}
    [root@rtm2 ~]# 

补充:LOGstash同步pgsql数据到Elasticsearch

一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置

二、配置步骤

1、input配置

input {
 stdin {
 }
 jdbc {
      jdbc_connection_string =>
     "jdbc:postgresql://127.0.0.1:5432/world"  jdbc_user =>
     "postgres"  jdbc_password =>
     "zhang123"  jdbc_driver_library =>
     "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"  jdbc_driver_class =>
     "org.postgresql.Driver"  jdbc_paging_enabled =>
     "true"  jdbc_page_size =>
     "300000"  use_column_value =>
     "true"  tracking_column =>
     "id"  statement_filepath =>
     "D:\logstash-6.4.0\bin\pgsql\jdbc.sql" schedule =>
     "* * * * *" type =>
     "jdbc" jdbc_default_timezone =>
"Asia/Shanghai" }
}
    

2、filter配置

filter {
 JSON {
      source =>
     "message"  remove_field =>
 ["message"] }
}
    

3、output 配置,就是elasticsearch的基本配置

output {
 elasticsearch {
      hosts =>
     ["localhost:9200"]  index =>
     "test_out" template =>
     "D:\logstash-6.4.0\bin\pgsql\es-template.json" template_name =>
     "t-statistic-out-logstash" template_overwrite =>
     true document_type =>
     "out"  document_id =>
 "%{
id}
" }
 stdout {
      codec =>
 json_lines }
}
    

以上就是整个logstash 的jdbc.conf

4、es-template.json的配置

{
 "template" : "t-statistis-out-template",  "order":1, "settings": {
   "index": {
    "refresh_interval": "5s"   }
  }
, "mappings": {
   "_default_": {
 "_all" : {
"enabled":false}
,     "dynamic_templates": [     {
     "message_field" : {
     "match" : "message",     "match_mapping_type" : "string",     "mapping" : {
 "type" : "string", "index" : "not_analyzed" }
     }
    }
, {
     "string_fields" : {
     "match" : "*",     "match_mapping_type" : "string",     "mapping" : {
 "type" : "string", "index" : "not_analyzed" }
     }
    }
    ],    "properties": {
     "@timestamp": {
      "type": "date"     }
,     "@version": {
      "type": "keyword"     }
,       "id": {
      "type": "keyword"     }
,  "name": {
      "type": "keyword"     }
,  "pp": {
      "type": "keyword"     }
      }
   }
  }
,  "aliases": {
}
 }
    

最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据

启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)

logstash.bat -f ./pgsql/jdbc.conf

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。

您可能感兴趣的文章:@H_269_126@
  • 用python简单实现mysql数据同步到ElasticSearch的教程
  • 基于python操作ES实例详解
  • 使用Python操作Elasticsearch数据索引的教程
  • Python操作ES的方式及与Mysql数据同步过程示例

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!


若转载请注明出处: PostgreSQL 数据同步到ES 搭建操作
本文地址: https://pptw.com/jishu/633174.html
基于PostgreSQL pg PostgreSQL使用MySQL外表的步骤详解(mysql

游客 回复需填写必要信息