iceberg问题小结
生活随笔
收集整理的這篇文章主要介紹了
iceberg问题小结
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
20220402
pyspark讀寫iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
這里要指定pyspark的路徑,如果是服務(wù)器的話最好用spark所在的pyspark路徑
import os
java8_location = r'D:\Java\jdk1.8.0_301/' # 設(shè)置你自己的路徑
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 讀iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目標(biāo)地址,不同的服務(wù)器集群,要拷貝對(duì)應(yīng)的兩個(gè)hive文件到當(dāng)?shù)乜蛻舳说膒yspar conf文件夾下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夾,把iceberg下的兩個(gè)hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把這個(gè)文件放在pyspark的jars文件夾
20220315
self.config_iceberg = {"host": "192.168.1.55","port": 8881,"user": "root","catalog": "iceberg","schema": "ice_ods",}import trinoif connected_type == "iceberg":self.conn = trino.dbapi.connect(**self.config_iceberg)iceberg和trino的關(guān)系連接
sink寫
source讀
分區(qū)數(shù)最好和kafkatopic的分區(qū)數(shù)一樣,否則用默認(rèn)的200個(gè)分區(qū)很慢
按天分區(qū)相當(dāng)于一天只有一個(gè)目錄
https://blog.csdn.net/xuronghao/article/details/106184831
spark寫入iceberg
partition具體分區(qū)
hadoop_prod是具體的catalog,tb是數(shù)據(jù)庫
通過catalog兩種方式hive或者h(yuǎn)adoop來創(chuàng)建數(shù)倉
def save_to_db(data,database_type):"""保存至數(shù)據(jù)庫:param data: 要保存的數(shù)據(jù):return: 無返回值"""if database_type == '生產(chǎn)':trino_engine = create_engine("trino://root@192.168.1.55:8881/iceberg/ice_dwt") # 生產(chǎn)庫else:trino_engine = create_engine('trino://root@192.168.40.11:8882/iceberg/ice_dwt') # 測(cè)試庫times = int(np.ceil(data.shape[0] / 1000))for i in tqdm(range(times)):data.iloc[i * 1000 : (i + 1) * 1000, :].to_sql(name="dwt_dm_bi_b2b_customer_churn_wide",con=trino_engine,index=False,if_exists="append",schema="ice_dwt",method="multi",)logger.debug("存入數(shù)據(jù)庫成功")
可以用此方式插入iceberg雖然速度慢一點(diǎn)
20220314
spark要寫入iceberg需要一個(gè)alluxio-2.7.3-client.jar這個(gè)jar包
在alluxio下載下來的zip包里面
spark讀寫iceberg沒有測(cè)試成功
總結(jié)
以上是生活随笔為你收集整理的iceberg问题小结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux命令历史
- 下一篇: LTV 即用户生命周期价值