本文共 79484 字,大约阅读时间需要 264 分钟。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*- import osimport csvimport xlrdfrom datetime import datetime, timedeltaimport copydef convert_xlsx_to_cvs(param_src_file_path, param_obj_file_path): """ 转换文件格式 :param param_src_file_path: 源文件路径 :param param_obj_file_path: 目标文件路径 :return: nothing """ for file in os.listdir(param_src_file_path): if file.split('.')[1] != "xlsx": print("文件过滤") continue workbook = xlrd.open_workbook("{0}{1}".format(param_src_file_path, file)) sheet_table = workbook.sheet_by_index(0) # file create file_name = os.path.splitext(file)[0] if '-' in file_name: file_name = file_name.split('-')[1] file_path = '{1}{0}.cvs'.format(file_name, param_obj_file_path) file_path = file_path.replace(' ', '') if os.path.exists(file_path): os.remove(file_path) # convert with open(file_path, 'w', encoding='utf-8') as inner_file: write = csv.writer(inner_file) for row_item in range(sheet_table.nrows): row_value = sheet_table.row_values(row_item) write.writerow(row_value)def clean_cvs_file_content(param_file_path): """ 清洗csv 文件内容 :param param_file_path: :return: """ for file in os.listdir(param_file_path): file_content = "" # 第一遍清理 with open("{0}{1}".format(param_file_path, file), 'r', encoding='utf-8') as inner_file: skip_flag = False for line in inner_file.readlines(): # 清理空行 if line == '\n': continue # 清理无数据的行 if line == ",,,,,,,\n": continue if line == ",,,,,,,,,\n": continue if line == ",,,,,,,,\n": continue if line == ",,,,,,\n": continue # 清理合并最后一行 if line.count(',') < 7: line = line.replace('\n', ' ') skip_flag = True else: if skip_flag: line = '\n' + line skip_flag = False file_content += line # 第二遍清理 file_content_copy = "" for line in file_content.split('\n'): line += '\n' # 清理合并异常行 if line.count(',') < 7: ctrl_index = file_content_copy.rfind('\n') file_content_copy = file_content_copy[0:ctrl_index] file_content_copy += line # 第三遍清洗 file_content_copy_third = "" for line in file_content_copy.split('\n'): line += '\n' # 替换特殊合并字符 line_split = line.split(',') join_line = "" for inner_line in line_split: if ("\"decimal" in inner_line) or ("\"number" in inner_line) or ('date_format' in inner_line) or ('concat' in inner_line): join_line += inner_line join_line += '@' elif '\n' in inner_line: join_line += inner_line else: join_line += inner_line join_line += ',' line = join_line file_content_copy_third += line with open("{0}{1}".format(param_file_path, file), 'w', encoding='utf-8') as inner_file: inner_file.write(file_content_copy_third)def analyze_file_key_content(param_file_path): """ 文件分析 :param param_file_path: 文件路径 :return: """ list_ret = [] for file in os.listdir(param_file_path): if file.split('.')[1] != "cvs": print("文件过滤") continue file_path = "{0}{1}".format(param_file_path, file) # 目标英文表名 target_table = None # 目标中文表名 target_table_desc = None # 源表名 src_table = None # 加載策略 load_strategy = None # 目标表主键 target_table_key = None # 分区字段 party_key = None with open(file_path, 'r', encoding='utf-8') as inner_file: for line in inner_file.readlines(): line_split = line.split(',') line_split_quotation = line.split('\"') if line_split[0] == "目标英文表名": target_table = line_split[1] if line_split[0] == "目标中文表名": target_table_desc = line_split[1] if line_split[0] == "源表名": src_table = line_split[1] if line_split[0] == "加载策略": load_strategy = line_split[1] if line_split[0] == "目标表主键": if len(line_split_quotation) > 1: target_table_key = line_split_quotation[1] else: target_table_key = line_split[1] if line_split[0] == "分区字段": if len(line_split_quotation) > 1: party_key = line_split_quotation[1] else: party_key = line_split[1] list_ret.append({"target_table": target_table, "target_table_desc": target_table_desc, "src_table": src_table, "load_strategy": load_strategy, "file_path": file_path, "target_table_key": target_table_key, "party_key": party_key}) return list_retdef analyze_file_content(param_key_content, param_target_file_path): """ 分析文件内容 :param param_key_content: 文件关键内容 :param param_target_file_path 目标路径 :return: """ for file_key_info in param_key_content: load_strategy = file_key_info["load_strategy"] if load_strategy == "F1 - Full Overwrite": analyze_file_f1_strategy(file_key_info, param_target_file_path) elif load_strategy == "F2 - Update/Insert": analyze_file_f2_strategy(file_key_info, param_target_file_path) elif load_strategy == "F3 - Append": analyze_file_f3_strategy(file_key_info, param_target_file_path) analyze_file_f3_strategy_full(file_key_info, param_target_file_path)def analyze_file_annotate(file_key_info): """ 分析文件注釋 :param file_key_info: 文件關鍵内容 :return: """ param_file_path = file_key_info["file_path"] # 表的对应关系 with open(param_file_path, 'r', encoding='utf-8') as inner_file: line_content = inner_file.readlines() # 有效内容截取 start_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "修改记录": start_index = current_index + 2 current_index += 1 line_version_annotate = line_content[start_index:] return line_version_annotatedef analyze_file_f1_strategy(param_key_content, param_target_file_path): """ 策略F1的文件内容解析 :param param_key_content: 文件路径 :param param_target_file_path 目标文件 :return: """ load_strategy = param_key_content["load_strategy"] target_table = param_key_content["target_table"] target_table_desc = param_key_content["target_table_desc"] src_table = param_key_content["src_table"] param_file_path = param_key_content["file_path"] target_table_key = param_key_content["target_table_key"] party_key = param_key_content["party_key"] # 分区字段数据清洗 src_db = None # debug info print("\n******************************** 文件关键信息(开始) *************************************") print("文件路径:", param_file_path) print("目标英文表名:", target_table) print("目标中文表名:", target_table_desc) print("目标表主键:", target_table_key) print("源表名:", src_table) print("加载策略:", load_strategy) print("********************************* 文件关键信息(结束) ************************************\n") # 表的对应关系(不分 group 的情况下) with open(param_file_path, 'r', encoding='utf-8') as inner_file: # 原始内容 line_content = inner_file.readlines() # group 分组计数(最多检测10组) group_count = 0 for index in range(10): if "Group {0}".format(index) in ''.join(line_content): group_count += 1 print("分组数量:{0}".format(group_count)) # 注释信息 sql_annotate = "-- ************************************** Base Info ************************************** \n" sql_annotate += "-- Target Table English Name:{0} \n".format(target_table) sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc) sql_annotate += "-- Create Date:{0} \n".format(datetime.now()) # 待写入的语句信息 sql_all_write = "" # sql 头部内容 sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;""" # 分组数大于等于1时 if group_count > 0: for index in range(group_count): # sql 头部内容 '''sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0}select""".format(target_table)''' sql_content_join = """\nfrom """ # 组内容拆分 index += 1 group_name = "Group {0}".format(index) end_name = "修改记录" if index < group_count: end_name = "Group {0}".format(index + 1) print("开始处理: {0}".format(group_name)) start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == group_name: start_index = current_index if line_split[0] == end_name: end_index = current_index current_index += 1 effect_lines = line_content[start_index:end_index] # 源表名 src_table = effect_lines[1].split(',')[1] print("源表名:{0}".format(src_table)) # 组内 sql info '''if index <= 1: if party_key.strip() == '': sql_content_pre = "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "insert overwrite table data_lake.{0} \n" \ "partition({1}) \n"\ "select \n".format(target_table, party_key) else: sql_content_pre = "union all\n" \ "select \n".format(target_table)''' if party_key.strip() == '': sql_content_pre = "set hive.exec.dynamic.partition=true;\n"\ "set hive.exec.dynamic.partition.mode=nonstrict;\n"\ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n"\ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n"\ "set hive.exec.dynamic.partition.mode=nonstrict;\n"\ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n"\ "insert overwrite table data_lake.{0} \n" \ "partition({1}) \n"\ "select \n".format(target_table, party_key) # 有效的数据内容 group_lines = effect_lines[4:] join_index = 0 for line in group_lines: if line.split(',')[0] == "关联条件": break join_index += 1 group_effect_lines = group_lines[:join_index] end_index = 0 for line in group_lines: if line.split(',')[0] == "条件语句(Where / Group By / Having)": break end_index += 1 connect_lines = group_lines[join_index + 2:end_index] # 组内容(调试) print(group_effect_lines) print(connect_lines) # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group {0}: {1} \n".format(index, target_table.replace('\n', '')) \ + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 有效语句拼接 for line in group_effect_lines: line_split = line.split(',') # 源库 if not src_db: src_db = 'sdata_full' # 内容拼接 if line_split[0] == "" or line_split[1] == "": reflect_data = line_split[7].replace('\n', '') if reflect_data == "": reflect_data = "\'\'" if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \ reflect_data[0] != '\'': reflect_data = '\'' + reflect_data if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" if 'etl_dt' in line_split[4].lower(): reflect_data = 'current_timestamp()' if 'date_format' in reflect_data.lower(): reflect_data = reflect_data.replace('"','') reflect_data = reflect_data.replace('@',',') reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6]) '''if reflect_data == "${TX_DATE}": line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])''' sql_content_pre += " {0} as {1},\n".format(reflect_data, line_split[4]) else: if 'case' in line_split[7]: if line_split[7][0] == "\"": tmp_key_list = list(line_split[7]) tmp_key_list[line_split[7].rfind('"')] = '' tmp_key_list[line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = line_split[1].lower().replace('\n', ' ') if line_split[3].lower().strip() != line_split[6].lower().strip(): '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \ or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \ or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \ or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \ or (('string' in line_split[3]) and ('date' in line_split[6])) \ and (line_split[3] != line_split[6]):''' line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key, line_split[4]) # 数据再次清洗 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 关联语句拼接 if len(connect_lines) > 0: for line in connect_lines: line_split = line.split(',') if line_split[0] != '': sql_content_join += "{0}.{1} {2}\n{3} {4}.{5} {6}\non {7}".format(line_split[0], line_split[1], line_split[2], line_split[3], line_split[4], line_split[5], line_split[6], line_split[7] .replace('\n', '')) else: sql_content_join += "\n{0} {1}.{2} {3}\non {4}".format(line_split[3], line_split[4], line_split[5], line_split[6], line_split[7].replace('\n', '')) else: sql_content_join += "{0}.{1}".format(src_db, src_table) # 注释语句拼接 sql_annotate += "-- Source table:{0}.{1} --\n".format(src_db, src_table) sql_all_write += sql_content_pre sql_all_write += sql_content_join '''if index == group_count: sql_all_write += "\n;\n"''' sql_all_write += "\n;\n" else: # sql 头部内容 if party_key.strip() == '': sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0} select""".format(target_table) else: sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0} partition({1})select""".format(target_table, party_key) sql_content_join = """\nfrom """ # 有效内容截取 start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "源库名": start_index = current_index + 1 if line_split[0] == "关联条件": end_index = current_index current_index = current_index + 1 line_content_sql = line_content[start_index:end_index] # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group 1: {0} \n".format(target_table) + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 有效语句拼接 for line in line_content_sql: line_split = line.split(',') # 源库 if not src_db: src_db = 'sdata_full' # 内容拼接 if line_split[0] == "" or line_split[1] == "": reflect_data = line_split[7].replace('\n', '') if reflect_data == "": reflect_data = "\'\'" if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \ reflect_data[0] != '\'': reflect_data = '\'' + reflect_data if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" if 'etl_dt' in line_split[4].lower(): reflect_data = 'current_timestamp()' if 'date_format' in reflect_data.lower(): reflect_data = reflect_data.replace('"','') reflect_data = reflect_data.replace('@',',') reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6]) '''if reflect_data == "${TX_DATE}": line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])''' sql_content_pre += " {0} as {1},\n".format(reflect_data, line_split[4]) else: if 'case' in line_split[7]: if line_split[7][0] == "\"": tmp_key_list = list(line_split[7]) tmp_key_list[line_split[7].rfind('"')] = '' tmp_key_list[line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in line_split[7].lower(): tmp_key = "\'" + line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = line_split[1].lower().replace('\n', ' ') if line_split[3].lower().strip() != line_split[6].lower().strip(): '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \ or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \ or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \ or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \ or (('string' in line_split[3]) and ('date' in line_split[6])) \ and (line_split[3] != line_split[6]):''' line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key, line_split[4]) # 数据再次清洗 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 表关联关系 start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "左库": start_index = current_index + 1 if line_split[0] == "条件语句(Where / Group By / Having)": end_index = current_index current_index = current_index + 1 line_content_sql = line_content[start_index:end_index] # 没有关联条件时 if len(line_content_sql) > 0: for line in line_content_sql: line_split = line.split(',') if line_split[0] != '': sql_content_join += "{0}.{1} {2}\n{3} {4}.{5} {6}\non {7}".format(line_split[0], line_split[1], line_split[2], line_split[3], line_split[4], line_split[5], line_split[6], line_split[7].replace('\n', '')) else: sql_content_join += "\n{0} {1}.{2} {3}\non {4}".format(line_split[3], line_split[4], line_split[5], line_split[6], line_split[7].replace('\n', '')) else: sql_content_join += "{0}.{1}".format(src_db, src_table) sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table) sql_all_write += sql_content_pre sql_all_write += sql_content_join sql_all_write += "\n;\n" # 注释完善 sql_annotate += "-- *********************************** Partiton Column ********************************** \n" sql_annotate += "-- ETL Frequency: Daily --\n" sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy) sql_annotate += "-- *********************************** Revision History ********************************* \n" sql_annotate += "-- Date Revised Revised by Revision Note \n" for line in analyze_file_annotate(param_key_content): line_split = line.split(',') name = "" date = "" desc = "" print(line_split) if len(line_split) >= 4: name = line_split[0] try: date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2) except Exception as error: print(error) date = line_split[1] desc = line_split[3] sql_annotate += "-- {0} {1} {2} \n".format( name, date, desc ) sql_annotate += "\n\n" sql_all_write = sql_annotate + sql_all_write # 回写文件 with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as tmp_file: tmp_file.write(sql_all_write)def analyze_file_f2_strategy(param_key_content, param_target_file_path): """ 策略F2的文件内容解析 :param param_key_content: 文件路径 :param param_target_file_path 目标文件 :return: """ # 信息准备 load_strategy = param_key_content["load_strategy"] target_table = param_key_content["target_table"] target_table_desc = param_key_content["target_table_desc"] src_table = param_key_content["src_table"] param_file_path = param_key_content["file_path"] target_table_key = param_key_content["target_table_key"] party_key = param_key_content["party_key"] src_db = None effect_lines = [] map_table_union_key = {} map_table_union_key1={} # 逗号转换 target_table_key = target_table_key.replace(',', ',') target_table_key = target_table_key.split(',') # debug info print("\n******************************** 文件关键信息(开始) *************************************") print("文件路径:", param_file_path) print("目标英文表名:", target_table) print("目标中文表名:", target_table_desc) print("目标表主键:", target_table_key) print("源表名:", src_table) print("加载策略:", load_strategy) print("********************************* 文件关键信息(结束) ************************************\n") # 表的对应关系(不分 group 的情况下) with open(param_file_path, 'r', encoding='utf-8') as inner_file: # 原始内容 line_content = inner_file.readlines() # group 分组计数(最多检测10组) group_count = 0 for index in range(10): if "Group {0}".format(index) in ''.join(line_content): group_count += 1 print("分组数量:{0}".format(group_count)) # 注释信息 sql_annotate = "-- ************************************** Base Info ************************************** \n" sql_annotate += "-- Target Table English Name:{0} --\n".format(target_table) sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc) sql_annotate += "-- Create Date:{0} \n".format(datetime.now()) # 待写入的语句信息 sql_all_write = "" # 分组数大于等于1时 if group_count > 0: for index in range(group_count): # sql 头部固定信息 if party_key.strip() == '': sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1})\n" \ "select \n".format(target_table, party_key) '''if load_strategy == "F2 - Update/Insert": sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1})\n" \ "select \n".format(target_table, party_key) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table)''' # 关联信息 sql_content_join = """\nfrom """ # 组内容拆分 index += 1 group_name = "Group {0}".format(index) end_name = "修改记录" if index < group_count: end_name = "Group {0}".format(index + 1) print("开始处理: {0}".format(group_name)) start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == group_name: start_index = current_index if line_split[0] == end_name: end_index = current_index current_index += 1 effect_lines = line_content[start_index:end_index] # 源表名 src_table = effect_lines[1].split(',')[1] print("源表名:{0}".format(src_table)) # 有效的数据内容 group_lines = effect_lines[4:] join_index = 0 for line in group_lines: if line.split(',')[0] == "关联条件": break join_index += 1 group_effect_lines = group_lines[:join_index] end_index = 0 for line in group_lines: if line.split(',')[0] == "条件语句(Where / Group By / Having)": break end_index += 1 connect_lines = group_lines[join_index + 2:end_index] # 组内容 print(group_effect_lines) print(connect_lines) # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group {0}: {1} \n".format(index, target_table) + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 第一遍遍历 语句内容拼接 for line in group_effect_lines: line_split = line.split(',') # 源库名 if not src_db: src_db = line_split[0] # 数据内容清洗 if line_split[7] == "current_date\n": line_split[7] = "current_date()" # 内容拼接 if line_split[0] == "" or line_split[1] == "": reflect_data = line_split[7].replace('\n', '') if reflect_data == "": reflect_data = "\'\'" if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \ reflect_data[0] != '\'': reflect_data = '\'' + reflect_data if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" if 'etl_dt' in line_split[4].lower(): reflect_data = 'current_timestamp()' if 'date_format' in reflect_data.lower(): reflect_data = reflect_data.replace('"','') reflect_data = reflect_data.replace('@',',') reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6]) '''if reflect_data == "${TX_DATE}": line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])''' sql_content_pre += " {0} as {1},\n".format(reflect_data, line_split[4]) else: if 'case' in line_split[7]: if line_split[7][0] == "\"": tmp_key_list = list(line_split[7]) tmp_key_list[line_split[7].rfind('"')] = '' tmp_key_list[line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = line_split[1].lower().replace('\n', ' ') if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" if line_split[3].lower().strip() != line_split[6].lower().strip(): '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \ or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \ or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \ or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \ or (('string' in line_split[3]) and ('date' in line_split[6])) \ and (line_split[3] != line_split[6]):''' line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key, line_split[4]) # 关联主键 target_item = line_split[4].strip().replace('\n', '') if target_item in target_table_key: map_table_union_key[target_item] = line_split[1].strip().replace('\n', '') #关联主要信息 if target_item in target_table_key: line_split[7] = line_split[7].replace('\'','') map_table_union_key1[target_item] = line_split[7].strip().replace('\n', '') # 数据再次清洗 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 第二遍遍历 表关联关系 sql_content_join += " {0}.{1}\n".format(src_db, src_table) sql_content_join += "\nwhere data_dt = concat(substring('${TX_DATE}',1,4),substring('${TX_DATE}',6," \ "2),substring('${TX_DATE}',9,2))" \ "\nunion all" \ "\nselect \n" for line in group_effect_lines: line_split = line.split(',') tmp_key = line_split[4] sql_content_join += " {0},\n".format(tmp_key) # 表关联关系数据清洗 sql_content_join_list = list(sql_content_join) sql_content_join_list[sql_content_join.rfind(',')] = '' sql_content_join = ''.join(sql_content_join_list) # 表的关联关系 print("表关联映射关系:" + str(map_table_union_key)) join_str = "" for join_item in map_table_union_key: if map_table_union_key[join_item]!='': print(join_item) join_str += "a.{0} = b.{1}\n and ".format(join_item, map_table_union_key[join_item]) else: print(join_item) join_str += "a.{0} = '{1}'\n and ".format(join_item, map_table_union_key1[join_item]) join_str = join_str[:join_str.rfind('\n and ')] # 关联关系拼接 sql_content_join += "from data_lake.{0} a\n".format(target_table) sql_content_join += """ \nwhere not exists ( select 1 from {0}.{1} b where {2} and b.data_dt = concat(substring('{3}',1,4),substring('{3}',6,2),substring('{3}',9,2)) )""".format(src_db, src_table, join_str, "${TX_DATE}") # 源表名 sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table) # 语句拼接 sql_all_write += sql_content_pre sql_all_write += sql_content_join sql_all_write += "\n;\n" else: # sql content if party_key.strip() == '': sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1})\n" \ "select \n".format(target_table, party_key) '''if load_strategy == "F2 - Update/Insert": sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1})\n" \ "select \n".format(target_table, party_key) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table)''' # 关联信息 sql_content_join = """\nfrom """ # 有效信息截取 start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "源库名": start_index = current_index + 1 if line_split[0] == "关联条件": end_index = current_index current_index = current_index + 1 effect_lines = line_content[start_index:end_index] # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group 1: {0} \n".format(target_table) + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 第一遍遍历 语句内容拼接 for line in effect_lines: line_split = line.split(',') # 源库名 if not src_db: src_db = line_split[0] # 数据内容清洗 if line_split[7] == "current_date\n" or line_split[7] == "current_date": line_split[7] = "current_date()" # 内容拼接 if line_split[0] == "" or line_split[1] == "": reflect_data = line_split[7].replace('\n', '') if reflect_data == "": reflect_data = "\'\'" if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \ reflect_data[0] != '\'': reflect_data = '\'' + reflect_data if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" if 'etl_dt' in line_split[4].lower(): reflect_data = 'current_timestamp()' if 'date_format' in reflect_data.lower(): reflect_data = reflect_data.replace('"','') reflect_data = reflect_data.replace('@',',') reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6]) '''if reflect_data == "${TX_DATE}": line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])''' sql_content_pre += " {0} as {1},\n".format(reflect_data, line_split[4]) else: if 'case' in line_split[7]: if line_split[7][0] == "\"": tmp_key_list = list(line_split[7]) tmp_key_list[line_split[7].rfind('"')] = '' tmp_key_list[line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in line_split[7].lower(): tmp_key = line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = line_split[1].lower().replace('\n', ' ') if line_split[3].lower().strip() != line_split[6].lower().strip(): '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \ or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \ or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \ or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \ or (('string' in line_split[3]) and ('date' in line_split[6])) \ and (line_split[3] != line_split[6]):''' line_split[6] = line_split[6].replace('@',',') line_split[6] = line_split[6].replace('"','') if ("decimal" in line_split[6]) and ('@' in line_split[6]): line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key, line_split[4]) # 关联主键 target_item = line_split[4].strip().replace('\n', '') if target_item in target_table_key: map_table_union_key[target_item] = line_split[1].strip().replace('\n', '') #关联主要信息 if target_item in target_table_key: line_split[7] = line_split[7].replace('\'','') map_table_union_key1[target_item] = line_split[7].strip().replace('\n', '') # 数据再次清洗 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 第二遍遍历 表关联关系 sql_content_join += " {0}.{1}\n".format(src_db, src_table) sql_content_join += "\nwhere data_dt = concat(substring('${TX_DATE}',1,4),substring('${TX_DATE}',6,2)," \ "substring('${TX_DATE}',9,2))" \ "\nunion all" \ "\nselect \n" for line in effect_lines: line_split = line.split(',') tmp_key = line_split[4] sql_content_join += " {0},\n".format(tmp_key) # 表关联关系数据清洗 sql_content_join_list = list(sql_content_join) sql_content_join_list[sql_content_join.rfind(',')] = '' sql_content_join = ''.join(sql_content_join_list) # 表的关联关系 print("表关联映射关系:" + str(map_table_union_key)) join_str = "" for join_item in map_table_union_key: if map_table_union_key[join_item]!='': print(join_item) join_str += "a.{0} = b.{1}\n and ".format(join_item, map_table_union_key[join_item]) else: print(join_item) print(map_table_union_key1[join_item]) join_str += "a.{0} = b.{1}\n and ".format(join_item, map_table_union_key1[join_item]) join_str = join_str[:join_str.rfind('\n and ')] # 关联关系拼接 sql_content_join += "from data_lake.{0} a\n".format(target_table) sql_content_join += """ \nwhere not exists ( select 1 from {0}.{1} b where {2} and b.data_dt = concat(substring('{3}',1,4),substring('{3}',6,2),substring('{3}',9,2)) )""".format(src_db, src_table, join_str, "${TX_DATE}") # 源表名 sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table) # 语句拼接 sql_all_write += sql_content_pre sql_all_write += sql_content_join sql_all_write += "\n;\n" # 注释完善 sql_annotate += "-- *********************************** Partiton Column ********************************** \n" sql_annotate += "-- ETL Frequency: Daily \n" sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy) sql_annotate += "-- *********************************** Revision History ********************************* \n" sql_annotate += "-- Date Revised Revised by Revision Note \n" for line in analyze_file_annotate(param_key_content): line_split = line.split(',') name = "" date = "" desc = "" print(line_split) if len(line_split) >= 4: name = line_split[0] try: date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2) except Exception as error: print(error) date = line_split[1] desc = line_split[3] sql_annotate += "-- {0} {1} {2} \n".format( name, date, desc ) sql_annotate += "\n\n" sql_all_write = sql_annotate + sql_all_write # 回写文件 with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as inner_file: inner_file.write(sql_all_write)def analyze_file_f3_strategy(param_key_content, param_target_file_path): """ 策略F3的文件内容解析 :param param_key_content: 文件路径 :param param_target_file_path 目标文件 :return: """ param_file_path = param_key_content["file_path"] param_key_f3_content = copy.deepcopy(param_key_content) with open(param_file_path, 'r', encoding='utf-8') as inner_file: line_content = inner_file.readlines() # 有效内容截取 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "源库名": first_line = line_content[current_index + 1].split(',') if 'data_dt' in param_key_f3_content["party_key"].lower().strip(): param_key_f3_content["party_key"] = param_key_f3_content["party_key"].lower().strip().replace('data_dt,','') param_key_f3_content["party_key"] = param_key_f3_content["party_key"].replace(',data_dt','') param_key_f3_content["party_key"] = param_key_f3_content["party_key"].replace('data_dt','') if first_line[0] == "sdata_full": #param_key_f3_content["load_strategy"] = "F1 - Full Overwrite" #param_key_f3_content["party_key"] = "" analyze_file_f1_strategy(param_key_f3_content, param_target_file_path) else: #param_key_f3_content["load_strategy"]= "F2 - Update/Insert" #param_key_f3_content["party_key"] = "" analyze_file_f2_strategy(param_key_f3_content, param_target_file_path) break current_index = current_index + 1def analyze_file_f3_strategy_full(param_key_content, param_target_file_path): """ 策略F1的文件内容解析 :param param_key_content: 文件路径 :param param_target_file_path 目标文件 :return: """ # 新增f3策略下全表 param_key_content["target_table"] = param_key_content["target_table"] + '_full' param_key_content["target_table_desc"] = param_key_content["target_table_desc"] + '-历史累全量' load_strategy = param_key_content["load_strategy"] target_table = param_key_content["target_table"] target_table_desc = param_key_content["target_table_desc"] src_table = param_key_content["src_table"] param_file_path = param_key_content["file_path"] target_table_key = param_key_content["target_table_key"] party_key = param_key_content["party_key"] src_db = None # debug info print("\n******************************** 文件关键信息(开始) *************************************") print("文件路径:", param_file_path) print("目标英文表名:", target_table) print("目标中文表名:", target_table_desc) print("目标表主键:", target_table_key) print("源表名:", src_table) print("分区字段:", party_key) print("加载策略:", load_strategy) print("********************************* 文件关键信息(结束) ************************************\n") with open(param_file_path, 'r', encoding='utf-8') as inner_file: line_content = inner_file.readlines() # group 分组计数(最多检测10组) group_count = 0 for index in range(10): if "Group {0}".format(index) in ''.join(line_content): group_count += 1 print("分组数量:{0}".format(group_count)) # 注释信息 sql_annotate = "-- ************************************** Base Info ************************************** \n" sql_annotate += "-- Target Table English Name:{0} \n".format(target_table) sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc) sql_annotate += "-- Create Date:{0} \n".format(datetime.now()) # 待写入的语句信息 sql_all_write = "" # 分组数大于1时 if group_count > 0: for index in range(group_count): index += 1 group_name = "Group {0}".format(index) end_name = "修改记录" if index < group_count: end_name = "Group {0}".format(index + 1) print("开始处理: {0}".format(group_name)) start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == group_name: start_index = current_index if line_split[0] == end_name: end_index = current_index current_index += 1 effect_lines = line_content[start_index:end_index] # 源表名 src_table = effect_lines[1].split(',')[1] print("源表名:{0}".format(src_table)) # 分区字段数据清洗 party_key = party_key.replace(',', ',') # 组内 sql info if party_key.strip() == '': sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1}) \n"\ "select \n".format(target_table, party_key) # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group {0}: {1} \n".format(index, target_table) + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 有效的数据内容 effect_lines = effect_lines[4:] join_index = 0 for line in effect_lines: if line.split(',')[0] == "关联条件": break join_index += 1 effect_lines = effect_lines[:join_index] # 有效数据的有效内容的拼接 for line in effect_lines: inner_line_split = line.split(',') # 源表名 if not src_db: src_db = inner_line_split[0] # 字段映射 if inner_line_split[0] == "" or inner_line_split[1] == "": if '$' in inner_line_split[7]: tmp_value = "\"{0}\"".format(inner_line_split[7]).replace('\n', '') else: tmp_value = inner_line_split[7].replace('\n', '') tmp_desc = inner_line_split[5] if '-' in tmp_value: tmp_value_split = tmp_value.split('-') tmp_value = tmp_value_split[0].lower() tmp_value = tmp_value.replace('\'', '') tmp_value = tmp_value.replace('\"', '') tmp_value = tmp_value.replace('’', '') tmp_value = tmp_value.replace('‘', '') tmp_value = "\'{0}\'".format(tmp_value) tmp_desc = tmp_value_split[1].lower() if tmp_value != '': if ('-' not in tmp_value) and (' ' not in tmp_value) and tmp_value[-1] == '\'' and \ tmp_value[0] != '\'': tmp_value = '\'' + tmp_value # 数据内容清洗 tmp_value = tmp_value.replace('\n', '').strip() if tmp_value == "current_date\n" or tmp_value == "current_date": tmp_value = "current_date()" if tmp_value == "": tmp_value = "\'\'" if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]): line_split[6] = "decimal(38,10)" if 'etl_dt' in inner_line_split[4].lower(): tmp_value = 'current_timestamp()' if 'date_format' in tmp_value.lower(): tmp_value = tmp_value.replace('"','') tmp_value = tmp_value.replace('@',',') tmp_value = "cast({0} as {1})".format(tmp_value, inner_line_split[6]) '''if tmp_value == "${TX_DATE}" or tmp_value == "\"${TX_DATE}\"": tmp_value = tmp_value.replace('\"', '') inner_line_split[6] = inner_line_split[6].replace('@',',') inner_line_split[6] = inner_line_split[6].replace('"','') tmp_value = "cast('{0}' as {1})".format(tmp_value, inner_line_split[6])''' # 拼接 sql_content_pre += " {0} as {1},\n".format(tmp_value.replace('\n', ''), inner_line_split[4]) else: if 'case' in inner_line_split[7]: if inner_line_split[7][0] == "\"": tmp_key_list = list(inner_line_split[7]) tmp_key_list[inner_line_split[7].rfind('"')] = '' tmp_key_list[inner_line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = inner_line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in inner_line_split[7].lower(): tmp_key = inner_line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in inner_line_split[7].lower(): tmp_key = inner_line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = inner_line_split[1].lower().replace('\n', ' ') if inner_line_split[3].lower().strip() != inner_line_split[6].lower().strip(): '''(inner_line_split[3] == 'tinyint' or inner_line_split[3] == 'int' or 'number' in inner_line_split[3] \ or (('varchar' in inner_line_split[3]) and ('decimal' in inner_line_split[6])) \ or (('bigint' in inner_line_split[3]) and ('varchar' in inner_line_split[6]))) \ or (('string' in inner_line_split[3]) and ('timestamp' in inner_line_split[6])) \ or (('string' in inner_line_split[3]) and ('date' in inner_line_split[6])) \ and (inner_line_split[3] != inner_line_split[6]):''' inner_line_split[6] = inner_line_split[6].replace('@',',') inner_line_split[6] = inner_line_split[6].replace('"','') if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]): inner_line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, inner_line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key,inner_line_split[4]) '''sql_content_pre += " {0} as {1},\n".format( inner_line_split[1].lower().replace('\n', ''), inner_line_split[4])''' # 源表名 sql_annotate += "-- Source table {2}:{0}.{1} \n".format(src_db, src_table, index) # 数据清洗 去掉最后一个逗号 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 语句拼接 sql_all_write += sql_content_pre sql_all_write += "\nfrom {0}.{1}".format(src_db, src_table) # union all 表直接取分号 # if index == group_count: sql_all_write += "\n;\n" else: # 有效信息截取 start_index = 0 end_index = 0 current_index = 0 for line in line_content: line_split = line.split(',') if line_split[0] == "源库名": start_index = current_index + 1 if line_split[0] == "关联条件": end_index = current_index current_index = current_index + 1 effect_lines = line_content[start_index:end_index] # 组内 sql info if party_key.strip() == '': sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "select \n".format(target_table) else: sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \ "set hive.exec.dynamic.partition.mode=nonstrict;\n" \ "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \ "insert overwrite table data_lake.{0} \n" \ "partition({1}) \n"\ "select \n".format(target_table, party_key) # 注释拼接 sql_content_pre = "-- ************************************************************************** \n" \ + sql_content_pre sql_content_pre = "-- Group 1: {0} \n".format(target_table) + sql_content_pre sql_content_pre = "\n-- ************************************************************************** \n" \ + sql_content_pre # 有效数据的有效内容的拼接 for line in effect_lines: inner_line_split = line.split(',') # 源表名 if not src_db: src_db = inner_line_split[0] # 字段映射 if inner_line_split[0] == "" or inner_line_split[1] == "": if '$' in inner_line_split[7]: tmp_value = "\"{0}\"".format(inner_line_split[7]).replace('\n', '') else: tmp_value = inner_line_split[7].replace('\n', '') if '-' in tmp_value: tmp_value_split = tmp_value.split('-') tmp_value = tmp_value_split[0].lower() tmp_value = tmp_value.replace('\'', '') tmp_value = tmp_value.replace('\"', '') tmp_value = tmp_value.replace('’', '') tmp_value = tmp_value.replace('‘', '') tmp_value = "\'{0}\'".format(tmp_value) if tmp_value != '': if ('-' not in tmp_value) and (' ' not in tmp_value) and tmp_value[-1] == '\'' and tmp_value[0] \ != '\'': tmp_value = '\'' + tmp_value # 数据内容清洗 tmp_value = tmp_value.replace('\n', '').strip() if tmp_value == "current_date\n" or tmp_value == "current_date": tmp_value = "current_date()" if tmp_value == "": tmp_value = "\'\'" if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]): inner_line_split[6] = "decimal(38,10)" if 'etl_dt' in inner_line_split[4].lower(): tmp_value = 'current_timestamp()' if 'date_format' in tmp_value.lower(): tmp_value = tmp_value.replace('"','') tmp_value = tmp_value.replace('@',',') tmp_value = "cast({0} as {1})".format(tmp_value, inner_line_split[6]) '''if tmp_value == "${TX_DATE}" or tmp_value == "\"${TX_DATE}\"": tmp_value = tmp_value.replace('\"', '') inner_line_split[6] = inner_line_split[6].replace('@',',') inner_line_split[6] = inner_line_split[6].replace('"','') tmp_value = "cast('{0}' as {1})".format(tmp_value, inner_line_split[6])''' # 拼接 sql_content_pre += " {0} as {1},\n".format(tmp_value.replace('\n', ''), inner_line_split[4]) else: if 'case' in inner_line_split[7]: if inner_line_split[7][0] == "\"": tmp_key_list = list(inner_line_split[7]) tmp_key_list[inner_line_split[7].rfind('"')] = '' tmp_key_list[inner_line_split[7].find('"')] = '' tmp_key = ''.join(tmp_key_list) tmp_key = tmp_key.replace('\n', ' ') else: tmp_key = inner_line_split[7].replace('\n', ' ') tmp_key = tmp_key.replace(u"’", "\'") if ' end' in tmp_key: tmp_key = "({0})".format(tmp_key) else: tmp_key = "({0} end )".format(tmp_key) tmp_key = tmp_key.replace('when ', ' when ') tmp_key = tmp_key.replace('then ', ' then ') tmp_key = tmp_key.replace('else ', ' else ') tmp_key = tmp_key.replace(' ', ' ') elif 'date_format' in inner_line_split[7].lower(): tmp_key = inner_line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') elif 'concat' in inner_line_split[7].lower(): tmp_key = inner_line_split[7].replace('"','') tmp_key = tmp_key.replace('@',',') tmp_key = tmp_key.replace('\n','') else: tmp_key = inner_line_split[1].lower().replace('\n', ' ') if inner_line_split[3].lower().strip() != inner_line_split[6].lower().strip(): '''(inner_line_split[3] == 'tinyint' or inner_line_split[3] == 'int' or 'number' in inner_line_split[3] \ or (('varchar' in inner_line_split[3]) and ('decimal' in inner_line_split[6])) \ or (('bigint' in inner_line_split[3]) and ('varchar' in inner_line_split[6]))) \ or (('string' in inner_line_split[3]) and ('timestamp' in inner_line_split[6])) \ or (('string' in inner_line_split[3]) and ('date' in inner_line_split[6])) \ and (inner_line_split[3] != inner_line_split[6]):''' inner_line_split[6] = inner_line_split[6].replace('@',',') inner_line_split[6] = inner_line_split[6].replace('"','') if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]): inner_line_split[6] = "decimal(38,10)" tmp_key = "cast({0} as {1})".format(tmp_key, inner_line_split[6]) sql_content_pre += " {0} as {1},\n".format(tmp_key,inner_line_split[4]) '''sql_content_pre += " {0} as {1},\n".format( inner_line_split[1].lower().replace('\n', ''), inner_line_split[4])''' # 源表名 sql_annotate += "-- Source table {2}:{0}.{1} \n".format(src_db, src_table, "1") # 数据清洗 去掉最后一个逗号 sql_content_pre_list = list(sql_content_pre) sql_content_pre_list[sql_content_pre.rfind(',')] = '' sql_content_pre = ''.join(sql_content_pre_list) # 语句拼接 sql_all_write += sql_content_pre sql_all_write += "\nfrom {0}.{1}".format(src_db, src_table) sql_all_write += "\n;\n" # 注释完善 sql_annotate += "-- *********************************** Partiton Column ********************************** \n" sql_annotate += "-- ETL Frequency: Daily \n" sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy) sql_annotate += "-- *********************************** Revision History ********************************* \n" sql_annotate += "-- Date Revised Revised by Revision Note \n" for line in analyze_file_annotate(param_key_content): line_split = line.split(',') name = "" date = "" desc = "" print(line_split) if len(line_split) >= 4: name = line_split[0] try: date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2) except Exception as error: print(error) date = line_split[1] desc = line_split[3] sql_annotate += "-- {0} {1} {2} \n".format( name, date, desc ) sql_annotate += "\n\n" sql_all_write = sql_annotate + sql_all_write # 回写 with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as save_file: save_file.write(sql_all_write)if __name__ == "__main__": print("开始转换处理工作流") # 输入参数量 param_scr_file_path = "./raw_data/" param_mid_file_path = "./csv_data/" param_object_file_path = "./sql_data/" # 结果文件夹准备 if not os.path.exists(param_mid_file_path): os.mkdir(param_mid_file_path) if not os.path.exists(param_object_file_path): os.mkdir(param_object_file_path) # 转换成 cvs 文件 convert_xlsx_to_cvs(param_scr_file_path, param_mid_file_path) # 文件内容清洗 clean_cvs_file_content(param_mid_file_path) # 分析拼接文件关键内容 key_info = analyze_file_key_content(param_mid_file_path) # 分析文件内容 analyze_file_content(key_info, param_object_file_path) print("结束转换处理工作流")
转载地址:http://npoen.baihongyu.com/