springboot整合postgre和hbase实现互相交互功能
生活随笔
收集整理的這篇文章主要介紹了
springboot整合postgre和hbase实现互相交互功能
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
此項目是一個小測試,將postgre中的某些字段讀取到hbase中變成某個表的列族,其中postgre和hbase已經在云服務器上建立好,用的docker技術,開放相應端口,并且win上用管道安全連接。
此項目用到了JPA技術,實現entity和postgre數據庫的交互。
首先要加入相應的依賴:
相應的yaml配置文件:
hbase:zookeeper:quorum: xxxxproperty:clientPort: 2181zookeeper:znode:parent: /zkDataspring.datasource:url: jdbc:postgresql://localhost:5432/db1username: xxxxpassword: xxxx spring.jpa:database: postgresqlproperties.hibernate.dialect: org.hibernate.dialect.PostgreSQL9Dialecthibernate.ddl-auto: updateshow-sql: falselogging.level:root: info加入后進行開發即可:
entity實例如下(映射著postgre中的一張表device_type):
entity代碼:
package com.nevt.db.repository.entity;import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import org.springframework.data.jpa.domain.support.AuditingEntityListener;import javax.persistence.*; import java.io.Serializable;/*** (DeviceType)實體類** @author makejava* @since 2020-12-28 15:50:04*/ @Data @Entity @Table(name = "device_type") @JsonIgnoreProperties(ignoreUnknown = true) @EntityListeners(AuditingEntityListener.class) public class DeviceType implements Serializable {private static final long serialVersionUID = 106469502944492174L;@Id@Column(name = "id")private Integer id;@Column(name = "name")private String name;@Column(name = "column_family")private String columnFamily;@Column(name = "data_station_type_id")private Integer dataStationTypeId;}數據訪問層使用JPA提供的接口繼承即可:
package com.nevt.db.repository;import com.nevt.db.repository.entity.DeviceType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.data.jpa.repository.Query;/*** (DeviceType)表數據庫訪問層** @author makejava* @since 2020-12-28 15:50:04*/ public interface DeviceTypeRepository extends JpaRepository<DeviceType, Integer>,JpaSpecificationExecutor<DeviceType> {}相應的hbaseconfig文件利用yaml數據創造hbase連接如下:
package com.nevt.configuration;import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.hadoop.hbase.HbaseTemplate;import java.io.IOException;@Configuration public class HBaseConfig {@Value("${hbase.zookeeper.quorum}")private String zookeeperQuorum;@Value("${hbase.zookeeper.property.clientPort}")private String clientPort;@Value("${zookeeper.znode.parent}")private String znodeParent;@Beanpublic Connection hbaseConnection() throws IOException {System.out.println("creating HBase bean");org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);Connection connection = ConnectionFactory.createConnection(configuration);return connection;} }核心代碼postgre及hbase的類如下:
postgre:
hbase(實現了和postgre的交互):
package com.nevt.service;import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.io.IOException; import java.text.DecimalFormat; import java.util.Date; import java.util.List; import java.util.Map;/*** @Auther: gzq* @Date: 2021/1/7 - 01 - 07 - 16:27* @Description: com.nevt.service*/@Component @EnableScheduling public class HBaseService {@Autowiredprivate Connection hbaseConnection;@Autowiredprivate DBService dbService;/** 制氫廠數據寫入HBase數據庫表* 數據庫表RowKey = <data_source_id>:<timestamp>* @param tableName 寫出要添加列族的表名* @param dataStationType 填postgre數據庫里面對應的字段*/public void writeHydrogenFactory(String tableName, int dataStationType) throws IOException {Admin admin = hbaseConnection.getAdmin();List<String> columnFamily = dbService.getColumnFamily(dataStationType);System.out.println(2);System.out.println(columnFamily);if (admin.tableExists(TableName.valueOf(tableName))) {ifTableExist(columnFamily, admin, tableName);} else {ifTableNotExist(columnFamily, admin, tableName);}}private void ifTableExist(List<String> columnFamily, Admin admin, String tableName) {for (String column : columnFamily) {System.out.println("Table Exist!");//如果沒有表就要創建表用如下方法HColumnDescriptor newFamily = new HColumnDescriptor(column.getBytes());System.out.println(1);//try catch的原因:有可能該字段之前已經添加過了,就不用添加了,但是有些沒添加的還要添加,所以先在這里把異// 常處理掉,后面的字段可以進行添加,不處理的話后面的字段加不上,這里直接拋出異常try {admin.addColumn(TableName.valueOf(tableName), newFamily);} catch (IOException e) {e.printStackTrace();}System.out.println("ColumnFamily has added!");}}private void ifTableNotExist(List<String> columnFamily, Admin admin, String tableName) throws IOException {System.out.println("Table Not Exist!");HTableDescriptor tableCreate = new HTableDescriptor(TableName.valueOf(tableName));for (String column : columnFamily) {System.out.println(column);HColumnDescriptor columnName = new HColumnDescriptor(column.getBytes());tableCreate.addFamily(columnName);}admin.createTable(tableCreate);System.out.println("Table and columnFamily have established!");} }測試代碼:
package com.nevt;import com.nevt.service.HBaseService; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource; import java.io.IOException;@SpringBootTest() class HBaseTest {@ResourceHBaseService hbaseService;@Testvoid testWrite() throws IOException {hbaseService.writeHydrogenFactory("data:hydrogen_station_data",10002); // hbaseService.writeHydrogenFactory("data:hydrogen_vehicle_data",10003); // hbaseService.writeHydrogenFactory("data:test2", 10003);} }查看hbase中的數據:
并且查看列族是否增加成功:
查看postgre中的數據:
可以對應上。
這樣就實現了postgre和hbase之間的交互。
另外附上hbase的客戶端的一些操作語句:
(1)刪除表
先disable再drop
disable “表名”
drop “表名”
(2)刪除列族
alter ‘ table name ’, ‘delete’ => ‘ column family ’
(3)插看某表具體信息
desc “表名”
總結
以上是生活随笔為你收集整理的springboot整合postgre和hbase实现互相交互功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot整合HBase将数据
- 下一篇: 记录vmware的bug failed