PostgreSQL和Excel的数据合并
生活随笔
收集整理的這篇文章主要介紹了
PostgreSQL和Excel的数据合并
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
用于將Excel中的數(shù)據(jù)和PG數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行聯(lián)合處理,涉及:
- 數(shù)據(jù)庫訪問
- Excel文件訪問
- 字典和列表的使用
- 文件讀寫
- 異常處理
Python3完整源碼如下:
# -*- coding: utf-8 -*-import xlrd import xlwt # import pandas as pd import psycopg2 # import openpyxl# 從數(shù)據(jù)庫中讀取的設(shè)施字典/類別字典/型號字典/規(guī)格字典/廠家字典/設(shè)施編碼/類別編碼{name: id} g_facility_dict = {} g_category_dict = {} g_provider_dict = {} g_model_dict = {} g_specs_dict = {} g_level_dict = {} g_facility_code_dict = {} g_category_code_dict = {} # 從數(shù)據(jù)庫中讀取的設(shè)備列表 g_device_asset_in_db = [] # 從XML文件中讀取的設(shè)備列表 g_device_asset_in_xml = [] # 需要操作的SQL列表 g_device_to_delete = [] g_device_to_insert = [] g_device_to_update = [] g_device_error = [] # 緩存當(dāng)前處理的設(shè)施 g_cache_facility_name = "" g_cache_device_order_in_facility = 1 # 緩存最大ID號 g_max_dev_id = 0# 源表和目標(biāo)表 G_TABLE_SOURCE = "amp_device_asset" G_TABLE_TARGET = "amp_device_asset"# 清理緩存 def make_clean():global g_device_asset_in_xmlglobal g_device_to_deleteglobal g_device_to_insertglobal g_device_to_updateglobal g_device_errorglobal g_cache_facility_nameglobal g_cache_device_order_in_facilityg_device_asset_in_xml = []g_device_to_delete = []g_device_to_insert = []g_device_to_update = []g_device_error = []g_cache_facility_name = ""g_cache_device_order_in_facility = 1# 連接數(shù)據(jù)庫 def connect_db():t_connection = psycopg2.connect(database="my_dbname",user="my_username", password="my_passwd",host="my_ipaddr", port="5432")t_cursor = t_connection.cursor()return t_connection, t_cursor# 關(guān)閉數(shù)據(jù)庫連接 def disconnect_db(connection, cursor):# 提交事務(wù)connection.commit()# 關(guān)閉連接cursor.close()connection.close()# 讀取數(shù)據(jù)庫中的所有設(shè)備臺賬數(shù)據(jù)和相關(guān)字典 def read_from_db(cursor):# 設(shè)施字典cursor.execute("SELECT id,name FROM amp_facility_asset;")# 獲取數(shù)據(jù),生成字典global g_facility_dictfor item in cursor.fetchall():g_facility_dict[item[1]] = item[0]# 類別字典cursor.execute("SELECT id,name FROM amp_category_define;")# 獲取數(shù)據(jù),生成字典global g_category_dictfor item in cursor.fetchall():g_category_dict[item[1]] = item[0]# 廠家字典cursor.execute("SELECT id,name FROM amp_service_provider;")# 獲取數(shù)據(jù),生成字典global g_provider_dictfor item in cursor.fetchall():g_provider_dict[item[1]] = item[0]# 型號字典cursor.execute("SELECT a.value, a.display_value from sys_dict_item a ""left join sys_dict_def b on a.dict_id = b.id ""where b.name = 'EQ_MODEL' order by a.value;")# 獲取數(shù)據(jù),生成字典global g_model_dictfor item in cursor.fetchall():g_model_dict[item[1]] = item[0]# 規(guī)格字典cursor.execute("SELECT a.value, a.display_value from sys_dict_item a ""left join sys_dict_def b on a.dict_id = b.id ""where b.name = 'EQ_SPECS' order by a.value;")# 獲取數(shù)據(jù),生成字典global g_specs_dictfor item in cursor.fetchall():g_specs_dict[item[1]] = item[0]# 評級字典cursor.execute("SELECT a.value, a.display_value from sys_dict_item a ""left join sys_dict_def b on a.dict_id = b.id ""where b.name = 'EQ_LEVEL' order by a.value;")# 獲取數(shù)據(jù),生成字典global g_level_dictfor item in cursor.fetchall():g_level_dict[item[1]] = item[0]# 設(shè)施編碼字典cursor.execute("select substr(di.code,1,1), f.name from amp_facility_asset f ""left join sys_dict_item di on f.facility_type = di.value ""left join sys_dict_def dd on dd.id = di.dict_id ""where f.facility_type in (1,2,3,4,5) and dd.description = '設(shè)施分類';")# 獲取數(shù)據(jù),生成字典global g_facility_code_dictfor item in cursor.fetchall():g_facility_code_dict[item[1]] = item[0]# 類別編碼字典cursor.execute("select category_code,name from amp_category_define where category_type = 240001 and pid > 0;")# 獲取數(shù)據(jù),生成字典global g_category_code_dictfor item in cursor.fetchall():g_category_code_dict[item[1]] = item[0]# 執(zhí)行SQL語句cursor.execute(f"SELECT a.id,a.name as name,f.name as f_name FROM {G_TABLE_SOURCE} a ""left join amp_facility_asset f on a.belong_facility = f.id ""order by f.name, a.name, a.id;")# 獲取數(shù)據(jù)global g_device_asset_in_dbfor item in cursor.fetchall():g_device_asset_in_db.append({"id": item[0],"name": item[1],"f_name": item[2]})# 獲取同一設(shè)施下的設(shè)備數(shù)量 def get_dev_cnt_under_facility(cursor, f_name):cursor.execute(f"SELECT count(*) FROM amp_device_asset where belong_facility = (select id from "f"amp_facility_asset where name = '{f_name}');")for item in cursor.fetchall():return item[0]# 讀取XML文件中的所有設(shè)備臺賬數(shù)據(jù) def read_xml_file(file_name, flag, fill=False):handle = xlrd.open_workbook(file_name)table = handle.sheet_by_index(0)print(f"文件名:{file_name}")print(f"總行數(shù):{str(table.nrows)}")print(f"總列數(shù):{str(table.ncols)}")facility_name = table.col_values(0)facility_code = table.col_values(1)col_2 = table.col_values(2)device_name = table.col_values(3)category_name = table.col_values(4)device_code = table.col_values(5)dev_level = table.col_values(6)dispatch_no = table.col_values(7)specs_name = table.col_values(8)model_name = table.col_values(9)provider_name = table.col_values(10)build_date = table.col_values(11)col_12 = table.col_values(12)col_13 = table.col_values(13)col_14 = table.col_values(14)col_15 = table.col_values(15)col_16 = table.col_values(16)col_17 = table.col_values(17)power_scope = ["" for x in range(len(facility_name))]if not fill:power_scope = table.col_values(18)col_19 = table.col_values(19)col_20 = table.col_values(20)list_dict = []for i in range(table.nrows):if device_name[i] is None or len(str(device_name[i]).strip()) == 0:if len(str(category_name[i])) > 0:device_name[i] = category_name[i]else:print(f"{file_name} L: {i}")list_dict.append({"facility_name": str(facility_name[i]).strip().replace("\n", "").replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),"facility_code": str(facility_code[i]).strip().replace("\n", ""),"col_2": str(col_2[i]).strip().replace("\n", ""),"device_name": str(device_name[i]).strip().replace("\n", ""),"category_name": str(category_name[i]).strip().replace("\n", "").replace("主變壓器", "變壓器").replace("GIS設(shè)備", "GIS"),"device_code": str(device_code[i]).strip().replace("\n", ""),"dev_level": str(dev_level[i]).strip().replace("\n", ""),"dispatch_no": str(dispatch_no[i]).strip().replace("\n", "").replace(".0", ""),"specs_name": str(specs_name[i]).strip().replace("\n", ""),"model_name": str(model_name[i]).strip().replace("\n", ""),"provider_name": str(provider_name[i]).strip().replace("\n", ""),"build_date": str(build_date[i]).strip().replace("\n", ""),"col_12": str(col_12[i]).strip().replace("\n", ""),"col_13": str(col_13[i]).strip().replace("\n", ""),"col_14": str(col_14[i]).strip().replace("\n", ""),"col_15": str(col_15[i]).strip().replace("\n", ""),"col_16": str(col_16[i]).strip().replace("\n", ""),"col_17": str(col_17[i]).strip().replace("\n", ""),"power_scope": str(power_scope[i]).strip().replace("\n", "").replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),"col_19": str(col_19[i]).strip().replace("\n", ""),"col_20": str(col_20[i]).strip().replace("\n", ""),"flag": flag})return list_dict# 從字典中獲取值 def fetch_from_dict(item, key_name, value_type, dict_name=None):current_item_name = key_namecurrent_item_str = item[key_name]try:if value_type == "str":return current_item_strelif value_type == "num":return dict_name[current_item_str] if (current_item_str in dict_name) else "null"else:return "null"except KeyError as kerr:print("[Error] when makeup sql: err=%s\n file=%s\t item_name=%s\t item_str=%s"% (kerr, item['flag'], current_item_name, current_item_str))item["col_20"] = f"{key_name} 在數(shù)據(jù)庫中未找到定義"g_device_error.append(item)raise Exception("[Error] : item_name=%s\t item_str=%s" % (current_item_name, current_item_str))# 生成SN碼 def generate_sn(cursor, item):var_category = fetch_from_dict(item, "category_name", "num", g_category_code_dict)var_facility = fetch_from_dict(item, "facility_name", "num", g_facility_code_dict)global g_cache_facility_nameglobal g_cache_device_order_in_facilitycurrent_facility_name = item["facility_name"]if current_facility_name != g_cache_facility_name:g_cache_facility_name = current_facility_nameg_cache_device_order_in_facility = get_dev_cnt_under_facility(cursor, current_facility_name) + 1else:g_cache_device_order_in_facility += 1sn = f"電-{var_category}-{g_cache_device_order_in_facility}-{item['flag']}{var_facility}01"if sn.find("null") >= 0:print("[Error] when makeup sn=%s,\t 文件=%s\t 設(shè)施名稱=%s\t 設(shè)備名稱=%s"% (sn, item['flag'], item['facility_name'], item['device_name']))if var_category is None or var_category == 'null':item["col_20"] = f"字段 '設(shè)備類別' 在數(shù)據(jù)庫中未找到定義"if var_facility is None or var_facility == 'null':item["col_20"] = f"字段 '設(shè)施名稱' 在數(shù)據(jù)庫中未找到定義"g_device_error.append(item)raise Exception("[Error] when makeup sn: err=%s,\t file=%s\t facility=%s\t device=%s"% (sn, item['flag'], item['facility_name'], item['device_name']))else:return sn# 將數(shù)據(jù)庫中的數(shù)據(jù)和XML文件中的數(shù)據(jù)進(jìn)行逐條處理 def process_data(cursor):# 根據(jù)設(shè)施名稱和設(shè)備名稱,將DB數(shù)據(jù)中的device.id對應(yīng)寫入到XML數(shù)據(jù)中global g_max_dev_idfor dev_in_xml in g_device_asset_in_xml:if dev_in_xml["facility_name"] == "設(shè)施名稱":continuefor dev_in_db in g_device_asset_in_db:if not dev_in_xml["device_name"] is None and not dev_in_db["name"] is None \and dev_in_xml["device_name"] == dev_in_db["name"] \and not dev_in_xml["facility_name"] is None and not dev_in_db["f_name"] is None \and dev_in_xml["facility_name"] == dev_in_db["f_name"]:dev_in_xml["id"] = dev_in_db["id"]g_device_asset_in_db.remove(dev_in_db)if dev_in_db["id"] > g_max_dev_id:g_max_dev_id = dev_in_db["id"]print("=======break:01")break# SQL語句device_sql_list = []# 數(shù)據(jù)庫中有,且XML文件中沒有的設(shè)備ID,存入列表g_device_asset_in_db_only中for dev_in_db in g_device_asset_in_db:sql_str = f"delete from {G_TABLE_TARGET} where id = {dev_in_db['id']};"g_device_to_delete.append(sql_str)device_sql_list.extend(g_device_to_delete)# XML文件中有,且數(shù)據(jù)庫中沒有的設(shè)備(ID屬性為空),為其分配IDglobal g_device_errorfor dev_in_xml in g_device_asset_in_xml:if dev_in_xml["facility_name"] == "設(shè)施名稱":continue"""if 'id' in dev_in_xml:print("=======pass")passsql_str = f"update {G_TABLE_TARGET} set " \f"dispatch_no = '{dev_in_xml['dispatch_no']}', " \f"power_scope_text='{dev_in_xml['power_scope']}' " \f"where id = {dev_in_xml['id']}; "g_device_to_update.append(sql_str)device_sql_list.extend(g_device_to_update)else:"""g_max_dev_id += 1dev_in_xml["id"] = g_max_dev_idtry:var_device = fetch_from_dict(dev_in_xml, "device_name", "str")var_qr = fetch_from_dict(dev_in_xml, "device_code", "str")var_facility = fetch_from_dict(dev_in_xml, "facility_name", "num", g_facility_dict)var_provider = fetch_from_dict(dev_in_xml, "provider_name", "num", g_provider_dict)var_category = fetch_from_dict(dev_in_xml, "category_name", "num", g_category_dict)var_model = fetch_from_dict(dev_in_xml, "model_name", "num", g_model_dict)var_specs = fetch_from_dict(dev_in_xml, "specs_name", "num", g_specs_dict)var_dispatch = fetch_from_dict(dev_in_xml, "dispatch_no", "str")var_power = fetch_from_dict(dev_in_xml, "power_scope", "str")var_level = fetch_from_dict(dev_in_xml, "dev_level", "num", g_level_dict)var_build = fetch_from_dict(dev_in_xml, "build_date", "str")var_sn = generate_sn(cursor, dev_in_xml)sql_str = f"insert into {G_TABLE_TARGET}(id, name, belong_facility, sn, mng_user_id, live_state, " \f"model, specs, category, manufacture, dispatch_no, power_scope_text, "\f"level, build_date, qr_code) values" \f"({g_max_dev_id},'{var_device}',{var_facility},'{var_sn}@{dev_in_xml['facility_code']}',1,1," \f"{var_model},{var_specs},{var_category},{var_provider},'{var_dispatch}','{var_power}',"\f"{var_level},'{var_build}','{var_qr}');"except Exception as err:print(err)continueg_device_to_insert.append(sql_str)device_sql_list.extend(g_device_to_insert)return device_sql_list# 寫文本文件 def write_sql_file(file_name, str_list):try:with open(file_name, 'w', encoding='utf-8') as sql_file:for sql_str in str_list:sql_file.write(str(sql_str) + "\n")except IOError as err:print(f'[Error] when write sql to file: {err}')# 寫Excel文件 def write_xml_file(file_name, str_list=[]):handle = xlwt.Workbook()sheet = handle.add_sheet('error_data')title = ['設(shè)施名稱', '設(shè)施編碼', '設(shè)施類別', '設(shè)備名稱', '設(shè)備類別', '設(shè)備編號', '設(shè)備評級', '調(diào)度號', '設(shè)備規(guī)格', '設(shè)備型號','制造廠家', '投運(yùn)日期', '出廠編號', '資產(chǎn)所屬', '電源接引', '接入方式', '供電方式', '設(shè)備分類', '供電范圍', '主要參數(shù)', '備注']for i in range(len(title)):sheet.write(0, i, title[i])for i in range(len(str_list)):dict_cell = list(str_list[i].values())for j in range(len(dict_cell)):if j == len(dict_cell)-2:breaksheet.write(i+1, j, dict_cell[j])handle.save(file_name)""" # 將表格寫入Excel def export_excel(export):# 將字典列表轉(zhuǎn)換為DataFramepf = pd.DataFrame(list(export))# 指定字段順序order = ['file', 'facility_name', 'device_name', 'sn']pf = pf[order]# 將列名替換為中文columns_map = {'file': '文件','facility_name': '設(shè)施名稱','device_name': '設(shè)備名稱','sn': 'sn'}pf.rename(columns=columns_map, inplace=True)# 指定生成的Excel表格名稱file_path = pd.ExcelWriter('error.xlsx')# 替換空單元格pf.fillna(' ', inplace=True)# 輸出pf.to_excel(file_path, encoding='utf-8', index=False)# 保存表格file_path.save() """def print_hi(name):# Use a breakpoint in the code line below to debug your script.print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint.# 處理一個文件 def process_file(file_name, cursor, flag):# 從XML文件中讀取設(shè)備臺賬信息:name,f_name,dispatch_no,power_scopemake_clean()global g_device_asset_in_xmlg_device_asset_in_xml = read_xml_file(file_name, flag)g_device_asset_in_xml = sorted(g_device_asset_in_xml, key=lambda r: r['facility_name'])process_data(cursor)write_sql_file(f"output/sql/{flag}-delete.sql", g_device_to_delete)write_sql_file(f"output/sql/{flag}-insert.sql", g_device_to_insert)write_sql_file(f"output/sql/{flag}-update.sql", g_device_to_update)write_xml_file(f"output/errors/{flag}-errors.xls", g_device_error)# Press the green button in the gutter to run the script. if __name__ == '__main__':# print_hi('PyCharm')# 從數(shù)據(jù)庫中讀取設(shè)備臺賬信息:id,name,f_nameg_conn, g_curs = connect_db()read_from_db(g_curs)print(f"find {len(g_device_asset_in_db)} device in db")global g_max_dev_id"""# 第一批用此代碼g_max_dev_id = 0process_file('input/S-P1.xls', g_curs, 'S')process_file('input/N-P1.xls', g_curs, 'N')process_file('input/D-P1.xls', g_curs, 'D')process_file('input/L-P1.xls', g_curs, 'L')"""# 第二批用此代碼g_max_dev_id = 9404process_file('input/feedback/S-P2.xls', g_curs, 'S')process_file('input/feedback/N-P2.xls', g_curs, 'N')process_file('input/feedback/D-P2.xls', g_curs, 'D')process_file('input/feedback/L-P2', g_curs, 'L')disconnect_db(g_conn, g_curs)# See PyCharm help at https://www.jetbrains.com/help/pycharm/總結(jié)
以上是生活随笔為你收集整理的PostgreSQL和Excel的数据合并的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: w10官网下载怎么激活 如何激活Wind
- 下一篇: Flask开发实践