flink 复postgresql数据库数据
安裝flink 實例為1.13.6:
下載版本對應jar包 https://mvnrepository.com/
如果是mysql 就下載mysql對應jar包 pg就下載pg 對應jar包
ps:根據數據源類型以及對應版本號下載對應jar包 jar,版本不對應會造成啟動報錯以及數據不能同步
通過執行 ./start-cluster.sh
啟動flink 打開網址http://localhost:8081 出現自帶的flink內置頁面
環境準備就緒之后 執行命令 /sql-client.sh
可以通過 finksql來進行數據庫的復制 .
進入之后顯示:
實例:地址localhost 版本為11.5postgresql 數據下屬 postgres 數據庫模式名為public 下屬的test1 復制到 test1_1
創建庫
CREATE DATABASE data_syn;
表結構:
CREATE TABLE “public”.“test1” (
“id” int4 NOT NULL,
“name” varchar(50) COLLATE “pg_catalog”.“default” NOT NULL
)
;
ALTER TABLE “public”.“test1” ADD CONSTRAINT “test1_pkey” PRIMARY KEY (“id”);
----------------------------------分割線---------------------------------------------------------
CREATE TABLE “public”.“test1_1” (
“id” int4 NOT NULL,
“name” varchar(50) COLLATE “pg_catalog”.“default” NOT NULL
)
;
ALTER TABLE “public”.“test1_1” ADD CONSTRAINT “test1_copy2_pkey” PRIMARY KEY (“id”);
----------------------------------flinksql---------------------------------------------------------
CREATE TABLE pgsql_source (
id int,
name STRING
) WITH (
‘connector’ = ‘postgres-cdc’,
‘hostname’ = ‘127.0.0.1’,
‘port’ = ‘5432’,
‘username’ = ‘postgres’,
‘password’ = ‘123456’,
‘database-name’ = ‘postgres’,
‘schema-name’ = ‘public’,
‘debezium.snapshot.mode’ = ‘never’,
‘decoding.plugin.name’ = ‘pgoutput’,
‘debezium.slot.name’ = ‘test3’,
‘table-name’ = ‘test1’
);
CREATE TABLE sink_sql (
id int,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:postgresql://127.0.0.1:5432/postgres’,
‘table-name’ = ‘test1_1’,
‘username’=‘postgres’,
‘password’=‘123456’
);
insert into sink_sql select id,name from pgsql_source;
執行完畢之后就可以實現表的test1的增加量同步了。
但是增量數據修改的時候會報錯:The “before” field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE public.test1 REPLICA IDENTITY FULL
這個是因為pg默認主鍵的重建操作會影響業務。需要規劃空閑窗口。因為主鍵重建過程中,主庫是無法進行delete和update操作的。此時更換一個復制標識代,使用唯一索引代替主鍵,作為一個中轉。即可減少業務的影響。主鍵重建完成后再修改回來即可。
所以說我們需要在pg命令行執行:
ALTER TABLE public.test1 REPLICA IDENTITY FULL;
這樣就可以實現test1至test1_1的CRUD了
package org.example;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class WordSourceFromPsql {
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);//拼接souceDLLString sourceDDL ="CREATE TABLE pgsql_source (\n" +" id int,\n" +" name STRING\n" +") WITH (\n" +" 'connector' = 'postgres-cdc',\n" +" 'hostname' = '127.0.0.1',\n" +" 'port' = '5432',\n" +" 'username' = 'postgres',\n" +" 'password' = '123456',\n" +" 'database-name' = 'postgres',\n" +" 'schema-name' = 'public',\n" +" 'debezium.snapshot.mode' = 'never',\n" +" 'decoding.plugin.name' = 'pgoutput',\n" +// 復制槽名稱
" ‘debezium.slot.name’ = ‘test3’,\n" +
" ‘table-name’ = ‘test7’\n" +
“)”;
}
可以在maven中引入實現在編輯器上直接運行
org.apache.flink
flink-clients_2.11
${flink.version}
同樣也可以打成jar包在flink服務上運行:
目前實例實現了數據復制:
pg ->elasticsearch
pg ->mysql
pg→pg 單表到單表 多表到單表
遠端地址為:
https://gitlab.xpaas.lenovo.com/prc_customer_mdm/prc-customer-mdm-flink.git master分支上
------------------------------------------------------------------------pg新建一個用戶來進行復制槽-------------------------------------------------------------------------------------------------
首先登錄pg數據庫
可以可視化工具
同樣也可以用命令行
– 創建數據同步庫
CREATE DATABASE database_syn;
– pg新建用戶
CREATE USER 用戶名稱 WITH PASSWORD ‘用戶密碼’;
– 給用戶復制流權限
ALTER ROLE 用戶名稱 replication;
– 給用戶登錄數據庫權限
grant CONNECT ON DATABASE database_syn to 用戶名稱;
– 把當前庫public下所有表查詢權限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO 用戶名稱;
– 把要同步的表進行發布
CREATE PUBLICATION data_syn FOR TABLE 表名;
– 查詢哪些表已經發布
select * from pg_publication_tables;
– 給用戶讀寫權限
grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to 用戶名稱;
上述操作結束之后就可以得到一個可以進行復制槽crud的用戶了
下面是一些常用的pg的設置
– pg新建用戶
CREATE USER ODPS_ETL WITH PASSWORD ‘odpsETL@2021’;
– 給用戶復制流權限
ALTER ROLE ODPS_ETL replication;
– 給用戶數據庫權限
grant CONNECT ON DATABASE test to ODPS_ETL;
– 設置發布開關
update pg_publication set puballtables=true where pubname is not null;
– 把所有表進行發布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
– 查詢哪些表已經發布
select * from pg_publication_tables;
– 給表查詢權限
grant select on TABLE aa to ODPS_ETL;
– 給用戶讀寫權限
grant select,insert,update,delete ON ALL TABLES IN SCHEMA public to bd_test;
– 把當前庫所有表查詢權限賦給用戶
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;
– 把當前庫以后新建的表查詢權限賦給用戶
alter default privileges in schema public grant select on tables to ODPS_ETL;
– 更改復制標識包含更新和刪除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
– 查看復制標識
select relreplident from pg_class where relname=‘test0425’;
– 查看solt使用情況
SELECT * FROM pg_replication_slots;
– 刪除solt
SELECT pg_drop_replication_slot(‘zd_org_goods_solt’);
– 查詢用戶當前連接數
select usename, count() from pg_stat_activity group by usename order by count() desc;
– 設置用戶最大連接數
alter role odps_etl connection limit 200;
完成之后 可以通過可視化工具來查看用戶權限
總結
以上是生活随笔為你收集整理的flink 复postgresql数据库数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: git fetch - git merg
- 下一篇: python圣诞树代码成品图片动态_基于