如何使用MaxCompute Spark读写阿里云Hbase
背景
Spark on MaxCompute可以訪問位于阿里云VPC內的實例(例如ECS、HBase、RDS),默認MaxCompute底層網絡和外網是隔離的,Spark on MaxCompute提供了一種方案通過配置spark.hadoop.odps.cupid.vpc.domain.list來訪問阿里云的vpc網絡環境的Hbase。Hbase標準版和增強版的配置不同,本文通過訪問阿里云的標準版和增強版的Hbase簡單的描述需要加的配置。
Hbase標準版
環境準備
Hbase的網絡環境是存在vpc下的,所以我們首先要添加安全組開放端口2181、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
設置對應vpc的安全組
找到對應的vpc id然后添加安全組設置端口
添加Hbase的白名單
在hbase的白名單添加
創建Hbase表
create 'test','cf'編寫Spark程序
需要的Hbase依賴
編寫代碼
object App {def main(args: Array[String]) {val spark = SparkSession.builder().appName("HbaseTest").config("spark.sql.catalogImplementation", "odps").config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api").config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api").getOrCreate()val sc = spark.sparkContextval config = HBaseConfiguration.create()val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);val jobConf = new JobConf(config)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")try{import spark._spark.sql("select '7', 88 ").rdd.map(row => {val name= row(0).asInstanceOf[String]val id = row(1).asInstanceOf[Integer]val put = new Put(Bytes.toBytes(id))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))(new ImmutableBytesWritable, put)}).saveAsHadoopDataset(jobConf)} finally {sc.stop()}} }提交到DataWorks
由于大于50m通過odps客戶端提交
進入數據開發新建spark節點
添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
這里的hbase域名需要hbase所有的機器,少一臺可能會造成網絡不通
Hbase增強版
環境準備
Hbase增強版的端口是30020、10600、16020.同時Hbase有白名單限制我們需要把對應的MaxCompute的IP加入到Hbase的白名單。
設置對應vpc的安全組
找到對應的vpc id然后添加安全組設置端口
添加Hbase的白名單
創建Hbase表?
create 'test','cf'編寫Spark程序
需要的Hbase依賴,引用的包必須是阿里云增強版的依賴
編寫代碼
object McToHbase {def main(args: Array[String]) {val spark = SparkSession.builder().appName("spark_sql_ddl").config("spark.sql.catalogImplementation", "odps").config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api").config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api").getOrCreate()val sc = spark.sparkContexttry{spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>val config = HBaseConfiguration.create()// 集群的連接地址(VPC內網地址)在控制臺頁面的數據庫連接界面獲得config.set("hbase.zookeeper.quorum", ":30020");import spark._// xml_template.comment.hbaseue.username_password.defaultconfig.set("hbase.client.username", "");config.set("hbase.client.password", "");val tableName = TableName.valueOf( "test")val conn = ConnectionFactory.createConnection(config)val table = conn.getTable(tableName);val puts = new util.ArrayList[Put]()iter.foreach(row => {val id = row(0).asInstanceOf[String]val name = row(1).asInstanceOf[String]val put = new Put(Bytes.toBytes(id))put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))puts.add(put)table.put(puts)})}} finally {sc.stop()}} }注意
hbase clinet會報org.apache.spark.SparkException: Task not serializable
原因是spark會把序列化對象以將其發送給其他的worker
解決方案
提交到DataWorks
由于大于50m通過odps客戶端提交
進入數據開發新建spark節點
添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
注意:
1.這個里需要添加增強版java api訪問地址,這里必須采用ip的形式。ip通過直接ping該地址獲取,這里的ip是172.16.0.10添加端口16000
2.這里的hbase域名需要hbase所有的機器,少一臺可能會造成網絡不通
大家如果對MaxCompute有更多咨詢或者建議,歡迎掃碼加入 MaxCompute開發者社區釘釘群,或點擊鏈接?申請加入。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的如何使用MaxCompute Spark读写阿里云Hbase的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深源恒际医疗票据OCR落地九省市 服务范
- 下一篇: 祝贺!两位 Apache Flink P