博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python自动生成hive sql脚本
阅读量:3905 次
发布时间:2019-05-23

本文共 79484 字,大约阅读时间需要 264 分钟。

# -*- coding: utf-8 -*- # -*- coding: utf-8 -*- import osimport csvimport xlrdfrom datetime import datetime, timedeltaimport copydef convert_xlsx_to_cvs(param_src_file_path, param_obj_file_path):    """    转换文件格式    :param param_src_file_path: 源文件路径    :param param_obj_file_path: 目标文件路径    :return: nothing    """    for file in os.listdir(param_src_file_path):        if file.split('.')[1] != "xlsx":            print("文件过滤")            continue        workbook = xlrd.open_workbook("{0}{1}".format(param_src_file_path, file))        sheet_table = workbook.sheet_by_index(0)        # file create        file_name = os.path.splitext(file)[0]        if '-' in file_name:            file_name = file_name.split('-')[1]        file_path = '{1}{0}.cvs'.format(file_name, param_obj_file_path)        file_path = file_path.replace(' ', '')        if os.path.exists(file_path):            os.remove(file_path)        # convert        with open(file_path, 'w', encoding='utf-8') as inner_file:            write = csv.writer(inner_file)            for row_item in range(sheet_table.nrows):                row_value = sheet_table.row_values(row_item)                write.writerow(row_value)def clean_cvs_file_content(param_file_path):    """    清洗csv 文件内容    :param param_file_path:    :return:    """    for file in os.listdir(param_file_path):        file_content = ""        # 第一遍清理        with open("{0}{1}".format(param_file_path, file), 'r', encoding='utf-8') as inner_file:            skip_flag = False            for line in inner_file.readlines():                # 清理空行                if line == '\n':                    continue                # 清理无数据的行                if line == ",,,,,,,\n":                    continue                if line == ",,,,,,,,,\n":                    continue                if line == ",,,,,,,,\n":                    continue                if line == ",,,,,,\n":                    continue                # 清理合并最后一行                if line.count(',') < 7:                    line = line.replace('\n', ' ')                    skip_flag = True                else:                    if skip_flag:                        line = '\n' + line                        skip_flag = False                file_content += line        # 第二遍清理        file_content_copy = ""        for line in file_content.split('\n'):            line += '\n'            # 清理合并异常行            if line.count(',') < 7:                ctrl_index = file_content_copy.rfind('\n')                file_content_copy = file_content_copy[0:ctrl_index]            file_content_copy += line        # 第三遍清洗        file_content_copy_third = ""        for line in file_content_copy.split('\n'):            line += '\n'            # 替换特殊合并字符            line_split = line.split(',')            join_line = ""            for inner_line in line_split:                if ("\"decimal" in inner_line) or ("\"number" in inner_line) or ('date_format' in inner_line) or ('concat' in inner_line):                    join_line += inner_line                    join_line += '@'                elif '\n' in inner_line:                    join_line += inner_line                else:                    join_line += inner_line                    join_line += ','            line = join_line            file_content_copy_third += line        with open("{0}{1}".format(param_file_path, file), 'w', encoding='utf-8') as inner_file:            inner_file.write(file_content_copy_third)def analyze_file_key_content(param_file_path):    """    文件分析    :param param_file_path: 文件路径    :return:    """    list_ret = []    for file in os.listdir(param_file_path):        if file.split('.')[1] != "cvs":            print("文件过滤")            continue        file_path = "{0}{1}".format(param_file_path, file)        # 目标英文表名        target_table = None        # 目标中文表名        target_table_desc = None        # 源表名        src_table = None        # 加載策略        load_strategy = None        # 目标表主键        target_table_key = None        # 分区字段        party_key = None        with open(file_path, 'r', encoding='utf-8') as inner_file:            for line in inner_file.readlines():                line_split = line.split(',')                line_split_quotation = line.split('\"')                if line_split[0] == "目标英文表名":                    target_table = line_split[1]                if line_split[0] == "目标中文表名":                    target_table_desc = line_split[1]                if line_split[0] == "源表名":                    src_table = line_split[1]                if line_split[0] == "加载策略":                    load_strategy = line_split[1]                if line_split[0] == "目标表主键":                    if len(line_split_quotation) > 1:                        target_table_key = line_split_quotation[1]                    else:                        target_table_key = line_split[1]                if line_split[0] == "分区字段":                    if len(line_split_quotation) > 1:                        party_key = line_split_quotation[1]                    else:                        party_key = line_split[1]        list_ret.append({"target_table": target_table,                         "target_table_desc": target_table_desc,                         "src_table": src_table,                         "load_strategy": load_strategy,                         "file_path": file_path,                         "target_table_key": target_table_key,                         "party_key": party_key})    return list_retdef analyze_file_content(param_key_content, param_target_file_path):    """    分析文件内容    :param param_key_content: 文件关键内容    :param param_target_file_path 目标路径    :return:    """    for file_key_info in param_key_content:        load_strategy = file_key_info["load_strategy"]        if load_strategy == "F1 - Full Overwrite":            analyze_file_f1_strategy(file_key_info, param_target_file_path)        elif load_strategy == "F2 - Update/Insert":            analyze_file_f2_strategy(file_key_info, param_target_file_path)        elif load_strategy == "F3 - Append":            analyze_file_f3_strategy(file_key_info, param_target_file_path)            analyze_file_f3_strategy_full(file_key_info, param_target_file_path)def analyze_file_annotate(file_key_info):    """    分析文件注釋    :param file_key_info: 文件關鍵内容    :return:    """    param_file_path = file_key_info["file_path"]    # 表的对应关系    with open(param_file_path, 'r', encoding='utf-8') as inner_file:        line_content = inner_file.readlines()        # 有效内容截取        start_index = 0        current_index = 0        for line in line_content:            line_split = line.split(',')            if line_split[0] == "修改记录":                start_index = current_index + 2            current_index += 1        line_version_annotate = line_content[start_index:]    return line_version_annotatedef analyze_file_f1_strategy(param_key_content, param_target_file_path):    """    策略F1的文件内容解析    :param param_key_content: 文件路径    :param param_target_file_path 目标文件    :return:    """    load_strategy = param_key_content["load_strategy"]    target_table = param_key_content["target_table"]    target_table_desc = param_key_content["target_table_desc"]    src_table = param_key_content["src_table"]    param_file_path = param_key_content["file_path"]    target_table_key = param_key_content["target_table_key"]    party_key = param_key_content["party_key"]    # 分区字段数据清洗    src_db = None    # debug info    print("\n******************************** 文件关键信息(开始) *************************************")    print("文件路径:", param_file_path)    print("目标英文表名:", target_table)    print("目标中文表名:", target_table_desc)    print("目标表主键:", target_table_key)    print("源表名:", src_table)    print("加载策略:", load_strategy)    print("********************************* 文件关键信息(结束) ************************************\n")    # 表的对应关系(不分 group 的情况下)    with open(param_file_path, 'r', encoding='utf-8') as inner_file:        # 原始内容        line_content = inner_file.readlines()        # group 分组计数(最多检测10组)        group_count = 0        for index in range(10):            if "Group {0}".format(index) in ''.join(line_content):                group_count += 1        print("分组数量:{0}".format(group_count))        # 注释信息        sql_annotate = "-- ************************************** Base Info ************************************** \n"        sql_annotate += "-- Target Table English Name:{0} \n".format(target_table)        sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc)        sql_annotate += "-- Create Date:{0} \n".format(datetime.now())        # 待写入的语句信息        sql_all_write = ""                # sql 头部内容        sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;"""        # 分组数大于等于1时        if group_count > 0:            for index in range(group_count):                # sql 头部内容                '''sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0}select""".format(target_table)'''                sql_content_join = """\nfrom """                # 组内容拆分                index += 1                group_name = "Group {0}".format(index)                end_name = "修改记录"                if index < group_count:                    end_name = "Group {0}".format(index + 1)                print("开始处理: {0}".format(group_name))                start_index = 0                end_index = 0                current_index = 0                for line in line_content:                    line_split = line.split(',')                    if line_split[0] == group_name:                        start_index = current_index                    if line_split[0] == end_name:                        end_index = current_index                    current_index += 1                effect_lines = line_content[start_index:end_index]                # 源表名                src_table = effect_lines[1].split(',')[1]                print("源表名:{0}".format(src_table))                                # 组内 sql info                '''if index <= 1:                    if party_key.strip() == '':                        sql_content_pre = "insert overwrite table data_lake.{0} \n" \                                          "select \n".format(target_table)                    else:                        sql_content_pre = "insert overwrite table data_lake.{0} \n" \                                           "partition({1}) \n"\                                           "select \n".format(target_table, party_key)                               else:                    sql_content_pre = "union all\n" \                                      "select \n".format(target_table)'''                      if party_key.strip() == '':                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n"\                    "set hive.exec.dynamic.partition.mode=nonstrict;\n"\                    "set hive.exec.max.dynamic.partitions.pernode = 1000;\n"\                    "insert overwrite table data_lake.{0} \n" \                                      "select \n".format(target_table)                else:                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n"\                    "set hive.exec.dynamic.partition.mode=nonstrict;\n"\                    "set hive.exec.max.dynamic.partitions.pernode = 1000;\n"\                    "insert overwrite table data_lake.{0} \n" \                    "partition({1}) \n"\                    "select \n".format(target_table, party_key)                                # 有效的数据内容                group_lines = effect_lines[4:]                join_index = 0                for line in group_lines:                    if line.split(',')[0] == "关联条件":                        break                    join_index += 1                group_effect_lines = group_lines[:join_index]                end_index = 0                for line in group_lines:                    if line.split(',')[0] == "条件语句(Where / Group By / Having)":                        break                    end_index += 1                connect_lines = group_lines[join_index + 2:end_index]                # 组内容(调试)                print(group_effect_lines)                print(connect_lines)                # 注释拼接                sql_content_pre = "-- ************************************************************************** \n" \                                  + sql_content_pre                sql_content_pre = "--   Group {0}: {1} \n".format(index, target_table.replace('\n', '')) \                                  + sql_content_pre                sql_content_pre = "\n-- ************************************************************************** \n" \                                  + sql_content_pre                # 有效语句拼接                for line in group_effect_lines:                    line_split = line.split(',')                    # 源库                    if not src_db:                        src_db = 'sdata_full'                    # 内容拼接                    if line_split[0] == "" or line_split[1] == "":                        reflect_data = line_split[7].replace('\n', '')                        if reflect_data == "":                            reflect_data = "\'\'"                        if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \                                reflect_data[0] != '\'':                            reflect_data = '\'' + reflect_data                                                if ("decimal" in line_split[6]) and ('@' in line_split[6]):                            line_split[6] = "decimal(38,10)"                                                if 'etl_dt' in line_split[4].lower():                            reflect_data = 'current_timestamp()'                                                if 'date_format' in reflect_data.lower():                            reflect_data = reflect_data.replace('"','')                            reflect_data = reflect_data.replace('@',',')                            reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6])                                                '''if reflect_data == "${TX_DATE}":                            line_split[6] = line_split[6].replace('@',',')                            line_split[6] = line_split[6].replace('"','')                            reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])'''                        sql_content_pre += "    {0}    as {1},\n".format(reflect_data, line_split[4])                    else:                        if 'case' in line_split[7]:                            if line_split[7][0] == "\"":                                tmp_key_list = list(line_split[7])                                tmp_key_list[line_split[7].rfind('"')] = ''                                tmp_key_list[line_split[7].find('"')] = ''                                tmp_key = ''.join(tmp_key_list)                                tmp_key = tmp_key.replace('\n', ' ')                            else:                                tmp_key = line_split[7].replace('\n', ' ')                            tmp_key = tmp_key.replace(u"’", "\'")                            if ' end' in tmp_key:                                tmp_key = "({0})".format(tmp_key)                            else:                                tmp_key = "({0} end )".format(tmp_key)                            tmp_key = tmp_key.replace('when ', ' when ')                            tmp_key = tmp_key.replace('then ', ' then ')                            tmp_key = tmp_key.replace('else ', ' else ')                            tmp_key = tmp_key.replace('  ', ' ')                        elif 'date_format' in line_split[7].lower():                            tmp_key = line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        elif 'concat' in line_split[7].lower():                            tmp_key = line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        else:                            tmp_key = line_split[1].lower().replace('\n', ' ')                                                if line_split[3].lower().strip() != line_split[6].lower().strip():                            '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \                                or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \                                or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \                                or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \                                or (('string' in line_split[3]) and ('date' in line_split[6])) \                                and (line_split[3] != line_split[6]):'''                            line_split[6] = line_split[6].replace('@',',')                            line_split[6] = line_split[6].replace('"','')                                                        if ("decimal" in line_split[6]) and ('@' in line_split[6]):                                line_split[6] = "decimal(38,10)"                                                        tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6])                        sql_content_pre += "    {0}    as {1},\n".format(tmp_key, line_split[4])                # 数据再次清洗                sql_content_pre_list = list(sql_content_pre)                sql_content_pre_list[sql_content_pre.rfind(',')] = ''                sql_content_pre = ''.join(sql_content_pre_list)                # 关联语句拼接                if len(connect_lines) > 0:                    for line in connect_lines:                        line_split = line.split(',')                        if line_split[0] != '':                            sql_content_join += "{0}.{1} {2}\n{3} {4}.{5} {6}\non {7}".format(line_split[0],                                                                                              line_split[1],                                                                                              line_split[2],                                                                                              line_split[3],                                                                                              line_split[4],                                                                                              line_split[5],                                                                                              line_split[6],                                                                                              line_split[7]                                                                                              .replace('\n', ''))                        else:                            sql_content_join += "\n{0} {1}.{2} {3}\non {4}".format(line_split[3],                                                                                   line_split[4],                                                                                   line_split[5],                                                                                   line_split[6],                                                                                   line_split[7].replace('\n', ''))                else:                    sql_content_join += "{0}.{1}".format(src_db, src_table)                # 注释语句拼接                sql_annotate += "-- Source table:{0}.{1} --\n".format(src_db, src_table)                sql_all_write += sql_content_pre                sql_all_write += sql_content_join                '''if index == group_count:                    sql_all_write += "\n;\n"'''                sql_all_write += "\n;\n"        else:            # sql 头部内容            if party_key.strip() == '':                sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0} select""".format(target_table)            else:                sql_content_pre = """set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;set hive.exec.max.dynamic.partitions.pernode = 1000;insert overwrite table data_lake.{0} partition({1})select""".format(target_table, party_key)            sql_content_join = """\nfrom """            # 有效内容截取            start_index = 0            end_index = 0            current_index = 0            for line in line_content:                line_split = line.split(',')                if line_split[0] == "源库名":                    start_index = current_index + 1                if line_split[0] == "关联条件":                    end_index = current_index                current_index = current_index + 1            line_content_sql = line_content[start_index:end_index]            # 注释拼接            sql_content_pre = "-- ************************************************************************** \n" \                              + sql_content_pre            sql_content_pre = "--   Group 1: {0} \n".format(target_table) + sql_content_pre            sql_content_pre = "\n-- ************************************************************************** \n" \                              + sql_content_pre            # 有效语句拼接            for line in line_content_sql:                line_split = line.split(',')                # 源库                if not src_db:                    src_db = 'sdata_full'                                    # 内容拼接                if line_split[0] == "" or line_split[1] == "":                    reflect_data = line_split[7].replace('\n', '')                    if reflect_data == "":                        reflect_data = "\'\'"                    if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \                            reflect_data[0] != '\'':                        reflect_data = '\'' + reflect_data                                        if ("decimal" in line_split[6]) and ('@' in line_split[6]):                        line_split[6] = "decimal(38,10)"                                        if 'etl_dt' in line_split[4].lower():                        reflect_data = 'current_timestamp()'                                        if 'date_format' in reflect_data.lower():                            reflect_data = reflect_data.replace('"','')                            reflect_data = reflect_data.replace('@',',')                            reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6])                    '''if reflect_data == "${TX_DATE}":                        line_split[6] = line_split[6].replace('@',',')                        line_split[6] = line_split[6].replace('"','')                        reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])'''                    sql_content_pre += "    {0}    as {1},\n".format(reflect_data, line_split[4])                else:                    if 'case' in line_split[7]:                        if line_split[7][0] == "\"":                            tmp_key_list = list(line_split[7])                            tmp_key_list[line_split[7].rfind('"')] = ''                            tmp_key_list[line_split[7].find('"')] = ''                            tmp_key = ''.join(tmp_key_list)                            tmp_key = tmp_key.replace('\n', ' ')                        else:                            tmp_key = line_split[7].replace('\n', ' ')                        tmp_key = tmp_key.replace(u"’", "\'")                        if ' end' in tmp_key:                            tmp_key = "({0})".format(tmp_key)                        else:                            tmp_key = "({0} end )".format(tmp_key)                        tmp_key = tmp_key.replace('when ', ' when ')                        tmp_key = tmp_key.replace('then ', ' then ')                        tmp_key = tmp_key.replace('else ', ' else ')                        tmp_key = tmp_key.replace('  ', ' ')                    elif 'date_format' in line_split[7].lower():                        tmp_key = "\'" + line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    elif 'concat' in line_split[7].lower():                        tmp_key = line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    else:                        tmp_key = line_split[1].lower().replace('\n', ' ')                                         if line_split[3].lower().strip() != line_split[6].lower().strip():                        '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \                            or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \                            or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \                            or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \                            or (('string' in line_split[3]) and ('date' in line_split[6])) \                            and (line_split[3] != line_split[6]):'''                                                    line_split[6] = line_split[6].replace('@',',')                        line_split[6] = line_split[6].replace('"','')                                                if ("decimal" in line_split[6]) and ('@' in line_split[6]):                            line_split[6] = "decimal(38,10)"                                                    tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6])                    sql_content_pre += "    {0}    as {1},\n".format(tmp_key, line_split[4])            # 数据再次清洗            sql_content_pre_list = list(sql_content_pre)            sql_content_pre_list[sql_content_pre.rfind(',')] = ''            sql_content_pre = ''.join(sql_content_pre_list)            # 表关联关系            start_index = 0            end_index = 0            current_index = 0            for line in line_content:                line_split = line.split(',')                if line_split[0] == "左库":                    start_index = current_index + 1                if line_split[0] == "条件语句(Where / Group By / Having)":                    end_index = current_index                current_index = current_index + 1            line_content_sql = line_content[start_index:end_index]            # 没有关联条件时            if len(line_content_sql) > 0:                for line in line_content_sql:                    line_split = line.split(',')                    if line_split[0] != '':                        sql_content_join += "{0}.{1} {2}\n{3} {4}.{5} {6}\non {7}".format(line_split[0],                                                                                          line_split[1],                                                                                          line_split[2],                                                                                          line_split[3],                                                                                          line_split[4],                                                                                          line_split[5],                                                                                          line_split[6],                                                                                          line_split[7].replace('\n',                                                                                                                ''))                    else:                        sql_content_join += "\n{0} {1}.{2} {3}\non {4}".format(line_split[3],                                                                               line_split[4],                                                                               line_split[5],                                                                               line_split[6],                                                                               line_split[7].replace('\n', ''))            else:                sql_content_join += "{0}.{1}".format(src_db, src_table)            sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table)            sql_all_write += sql_content_pre            sql_all_write += sql_content_join            sql_all_write += "\n;\n"        # 注释完善        sql_annotate += "-- *********************************** Partiton Column ********************************** \n"        sql_annotate += "-- ETL Frequency: Daily --\n"        sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy)        sql_annotate += "-- *********************************** Revision History ********************************* \n"        sql_annotate += "-- Date Revised    Revised by     Revision Note \n"        for line in analyze_file_annotate(param_key_content):            line_split = line.split(',')            name = ""            date = ""            desc = ""            print(line_split)            if len(line_split) >= 4:                name = line_split[0]                try:                    date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2)                except Exception as error:                    print(error)                    date = line_split[1]                desc = line_split[3]            sql_annotate += "-- {0}  {1}  {2} \n".format(                name,                date,                desc            )        sql_annotate += "\n\n"        sql_all_write = sql_annotate + sql_all_write        # 回写文件        with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as tmp_file:            tmp_file.write(sql_all_write)def analyze_file_f2_strategy(param_key_content, param_target_file_path):    """    策略F2的文件内容解析    :param param_key_content: 文件路径    :param param_target_file_path 目标文件    :return:    """    # 信息准备    load_strategy = param_key_content["load_strategy"]    target_table = param_key_content["target_table"]    target_table_desc = param_key_content["target_table_desc"]    src_table = param_key_content["src_table"]    param_file_path = param_key_content["file_path"]    target_table_key = param_key_content["target_table_key"]    party_key = param_key_content["party_key"]    src_db = None    effect_lines = []    map_table_union_key = {}    map_table_union_key1={}    # 逗号转换    target_table_key = target_table_key.replace(',', ',')    target_table_key = target_table_key.split(',')    # debug info    print("\n******************************** 文件关键信息(开始) *************************************")    print("文件路径:", param_file_path)    print("目标英文表名:", target_table)    print("目标中文表名:", target_table_desc)    print("目标表主键:", target_table_key)    print("源表名:", src_table)    print("加载策略:", load_strategy)    print("********************************* 文件关键信息(结束) ************************************\n")    # 表的对应关系(不分 group 的情况下)    with open(param_file_path, 'r', encoding='utf-8') as inner_file:        # 原始内容        line_content = inner_file.readlines()        # group 分组计数(最多检测10组)        group_count = 0        for index in range(10):            if "Group {0}".format(index) in ''.join(line_content):                group_count += 1        print("分组数量:{0}".format(group_count))        # 注释信息        sql_annotate = "-- ************************************** Base Info ************************************** \n"        sql_annotate += "-- Target Table English Name:{0} --\n".format(target_table)        sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc)        sql_annotate += "-- Create Date:{0} \n".format(datetime.now())        # 待写入的语句信息        sql_all_write = ""        # 分组数大于等于1时        if group_count > 0:            for index in range(group_count):                # sql 头部固定信息                if party_key.strip() == '':                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "select \n".format(target_table)                else:                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "partition({1})\n" \                                  "select \n".format(target_table, party_key)                '''if load_strategy == "F2 - Update/Insert":                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "partition({1})\n" \                                  "select \n".format(target_table, party_key)                else:                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "select \n".format(target_table)'''                # 关联信息                sql_content_join = """\nfrom """                # 组内容拆分                index += 1                group_name = "Group {0}".format(index)                end_name = "修改记录"                if index < group_count:                    end_name = "Group {0}".format(index + 1)                print("开始处理: {0}".format(group_name))                start_index = 0                end_index = 0                current_index = 0                for line in line_content:                    line_split = line.split(',')                    if line_split[0] == group_name:                        start_index = current_index                    if line_split[0] == end_name:                        end_index = current_index                    current_index += 1                effect_lines = line_content[start_index:end_index]                # 源表名                src_table = effect_lines[1].split(',')[1]                print("源表名:{0}".format(src_table))                # 有效的数据内容                group_lines = effect_lines[4:]                join_index = 0                for line in group_lines:                    if line.split(',')[0] == "关联条件":                        break                    join_index += 1                group_effect_lines = group_lines[:join_index]                end_index = 0                for line in group_lines:                    if line.split(',')[0] == "条件语句(Where / Group By / Having)":                        break                    end_index += 1                connect_lines = group_lines[join_index + 2:end_index]                # 组内容                print(group_effect_lines)                print(connect_lines)                # 注释拼接                sql_content_pre = "-- ************************************************************************** \n" \                                  + sql_content_pre                sql_content_pre = "--   Group {0}: {1} \n".format(index, target_table) + sql_content_pre                sql_content_pre = "\n-- ************************************************************************** \n" \                                  + sql_content_pre                # 第一遍遍历 语句内容拼接                for line in group_effect_lines:                    line_split = line.split(',')                    # 源库名                    if not src_db:                        src_db = line_split[0]                    # 数据内容清洗                    if line_split[7] == "current_date\n":                        line_split[7] = "current_date()"                    # 内容拼接                    if line_split[0] == "" or line_split[1] == "":                        reflect_data = line_split[7].replace('\n', '')                        if reflect_data == "":                            reflect_data = "\'\'"                        if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \                                reflect_data[0] != '\'':                            reflect_data = '\'' + reflect_data                                                if ("decimal" in line_split[6]) and ('@' in line_split[6]):                            line_split[6] = "decimal(38,10)"                        if 'etl_dt' in line_split[4].lower():                            reflect_data = 'current_timestamp()'                                                if 'date_format' in reflect_data.lower():                            reflect_data = reflect_data.replace('"','')                            reflect_data = reflect_data.replace('@',',')                            reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6])                        '''if reflect_data == "${TX_DATE}":                            line_split[6] = line_split[6].replace('@',',')                            line_split[6] = line_split[6].replace('"','')                            reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])'''                        sql_content_pre += "    {0}    as {1},\n".format(reflect_data, line_split[4])                    else:                        if 'case' in line_split[7]:                            if line_split[7][0] == "\"":                                tmp_key_list = list(line_split[7])                                tmp_key_list[line_split[7].rfind('"')] = ''                                tmp_key_list[line_split[7].find('"')] = ''                                tmp_key = ''.join(tmp_key_list)                                tmp_key = tmp_key.replace('\n', ' ')                            else:                                tmp_key = line_split[7].replace('\n', ' ')                            tmp_key = tmp_key.replace(u"’", "\'")                            if ' end' in tmp_key:                                tmp_key = "({0})".format(tmp_key)                            else:                                tmp_key = "({0} end )".format(tmp_key)                            tmp_key = tmp_key.replace('when ', ' when ')                            tmp_key = tmp_key.replace('then ', ' then ')                            tmp_key = tmp_key.replace('else ', ' else ')                            tmp_key = tmp_key.replace('  ', ' ')                        elif 'date_format' in line_split[7].lower():                            tmp_key = line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        elif 'concat' in line_split[7].lower():                            tmp_key = line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        else:                            tmp_key = line_split[1].lower().replace('\n', ' ')                                                 if ("decimal" in line_split[6]) and ('@' in line_split[6]):                            line_split[6] = "decimal(38,10)"                                                if line_split[3].lower().strip() != line_split[6].lower().strip():                            '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \                                or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \                                or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \                                or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \                                or (('string' in line_split[3]) and ('date' in line_split[6])) \                                and (line_split[3] != line_split[6]):'''                                                        line_split[6] = line_split[6].replace('@',',')                            line_split[6] = line_split[6].replace('"','')                                                        if ("decimal" in line_split[6]) and ('@' in line_split[6]):                                line_split[6] = "decimal(38,10)"                                                            tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6])                        sql_content_pre += "    {0}    as {1},\n".format(tmp_key, line_split[4])                    # 关联主键                    target_item = line_split[4].strip().replace('\n', '')                    if target_item in target_table_key:                        map_table_union_key[target_item] = line_split[1].strip().replace('\n', '')                    #关联主要信息                    if target_item in target_table_key:                        line_split[7] = line_split[7].replace('\'','')                        map_table_union_key1[target_item] = line_split[7].strip().replace('\n', '')                # 数据再次清洗                sql_content_pre_list = list(sql_content_pre)                sql_content_pre_list[sql_content_pre.rfind(',')] = ''                sql_content_pre = ''.join(sql_content_pre_list)                # 第二遍遍历 表关联关系                sql_content_join += " {0}.{1}\n".format(src_db, src_table)                sql_content_join += "\nwhere data_dt = concat(substring('${TX_DATE}',1,4),substring('${TX_DATE}',6," \                                    "2),substring('${TX_DATE}',9,2))" \                                    "\nunion all" \                                    "\nselect \n"                for line in group_effect_lines:                    line_split = line.split(',')                    tmp_key = line_split[4]                    sql_content_join += "    {0},\n".format(tmp_key)                # 表关联关系数据清洗                sql_content_join_list = list(sql_content_join)                sql_content_join_list[sql_content_join.rfind(',')] = ''                sql_content_join = ''.join(sql_content_join_list)                # 表的关联关系                print("表关联映射关系:" + str(map_table_union_key))                join_str = ""                for join_item in map_table_union_key:                 if map_table_union_key[join_item]!='':                   print(join_item)                   join_str += "a.{0} = b.{1}\n    and ".format(join_item, map_table_union_key[join_item])                 else:                   print(join_item)                   join_str += "a.{0} = '{1}'\n    and ".format(join_item, map_table_union_key1[join_item])                join_str = join_str[:join_str.rfind('\n    and ')]                # 关联关系拼接                sql_content_join += "from data_lake.{0} a\n".format(target_table)                sql_content_join += """ \nwhere not exists (    select          1      from {0}.{1}   b    where {2}    and b.data_dt = concat(substring('{3}',1,4),substring('{3}',6,2),substring('{3}',9,2))    )""".format(src_db,                src_table,                join_str,                "${TX_DATE}")                # 源表名                sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table)                # 语句拼接                sql_all_write += sql_content_pre                sql_all_write += sql_content_join                sql_all_write += "\n;\n"        else:            # sql content            if party_key.strip() == '':                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "select \n".format(target_table)            else:                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "partition({1})\n" \                                  "select \n".format(target_table, party_key)            '''if load_strategy == "F2 - Update/Insert":                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "partition({1})\n" \                                  "select \n".format(target_table, party_key)            else:                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                                  "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                                  "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                                  "insert overwrite table data_lake.{0} \n" \                                  "select \n".format(target_table)'''            # 关联信息            sql_content_join = """\nfrom """            # 有效信息截取            start_index = 0            end_index = 0            current_index = 0            for line in line_content:                line_split = line.split(',')                if line_split[0] == "源库名":                    start_index = current_index + 1                if line_split[0] == "关联条件":                    end_index = current_index                current_index = current_index + 1                effect_lines = line_content[start_index:end_index]            # 注释拼接            sql_content_pre = "-- ************************************************************************** \n" \                              + sql_content_pre            sql_content_pre = "--   Group 1: {0} \n".format(target_table) + sql_content_pre            sql_content_pre = "\n-- ************************************************************************** \n" \                              + sql_content_pre            # 第一遍遍历 语句内容拼接            for line in effect_lines:                line_split = line.split(',')                # 源库名                if not src_db:                    src_db = line_split[0]                # 数据内容清洗                if line_split[7] == "current_date\n" or line_split[7] == "current_date":                    line_split[7] = "current_date()"                # 内容拼接                if line_split[0] == "" or line_split[1] == "":                    reflect_data = line_split[7].replace('\n', '')                    if reflect_data == "":                        reflect_data = "\'\'"                    if ('-' not in reflect_data) and (' ' not in reflect_data) and reflect_data[-1] == '\'' and \                            reflect_data[0] != '\'':                        reflect_data = '\'' + reflect_data                                        if ("decimal" in line_split[6]) and ('@' in line_split[6]):                        line_split[6] = "decimal(38,10)"                    if 'etl_dt' in line_split[4].lower():                        reflect_data = 'current_timestamp()'                                        if 'date_format' in reflect_data.lower():                            reflect_data = reflect_data.replace('"','')                            reflect_data = reflect_data.replace('@',',')                            reflect_data = "cast({0} as {1})".format(reflect_data, line_split[6])                    '''if reflect_data == "${TX_DATE}":                        line_split[6] = line_split[6].replace('@',',')                        line_split[6] = line_split[6].replace('"','')                        reflect_data = "cast('{0}' as {1})".format(reflect_data, line_split[6])'''                    sql_content_pre += "    {0}    as {1},\n".format(reflect_data, line_split[4])                else:                    if 'case' in line_split[7]:                        if line_split[7][0] == "\"":                            tmp_key_list = list(line_split[7])                            tmp_key_list[line_split[7].rfind('"')] = ''                            tmp_key_list[line_split[7].find('"')] = ''                            tmp_key = ''.join(tmp_key_list)                            tmp_key = tmp_key.replace('\n', ' ')                        else:                            tmp_key = line_split[7].replace('\n', ' ')                        tmp_key = tmp_key.replace(u"’", "\'")                        if ' end' in tmp_key:                            tmp_key = "({0})".format(tmp_key)                        else:                            tmp_key = "({0} end )".format(tmp_key)                        tmp_key = tmp_key.replace('when ', ' when ')                        tmp_key = tmp_key.replace('then ', ' then ')                        tmp_key = tmp_key.replace('else ', ' else ')                        tmp_key = tmp_key.replace('  ', ' ')                    elif 'date_format' in line_split[7].lower():                        tmp_key = line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    elif 'concat' in line_split[7].lower():                        tmp_key = line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    else:                        tmp_key = line_split[1].lower().replace('\n', ' ')                                                 if line_split[3].lower().strip() != line_split[6].lower().strip():                        '''(line_split[3] == 'tinyint' or line_split[3] == 'int' or 'number' in line_split[3] \                        or (('varchar' in line_split[3]) and ('decimal' in line_split[6])) \                        or (('bigint' in line_split[3]) and ('varchar' in line_split[6]))) \                        or (('string' in line_split[3]) and ('timestamp' in line_split[6])) \                        or (('string' in line_split[3]) and ('date' in line_split[6])) \                            and (line_split[3] != line_split[6]):'''                        line_split[6] = line_split[6].replace('@',',')                        line_split[6] = line_split[6].replace('"','')                        if ("decimal" in line_split[6]) and ('@' in line_split[6]):                            line_split[6] = "decimal(38,10)"                        tmp_key = "cast({0} as {1})".format(tmp_key, line_split[6])                    sql_content_pre += "    {0}    as {1},\n".format(tmp_key, line_split[4])                # 关联主键                target_item = line_split[4].strip().replace('\n', '')                if target_item in target_table_key:                    map_table_union_key[target_item] = line_split[1].strip().replace('\n', '')                #关联主要信息                if target_item in target_table_key:                    line_split[7] = line_split[7].replace('\'','')                    map_table_union_key1[target_item] = line_split[7].strip().replace('\n', '')            # 数据再次清洗            sql_content_pre_list = list(sql_content_pre)            sql_content_pre_list[sql_content_pre.rfind(',')] = ''            sql_content_pre = ''.join(sql_content_pre_list)            # 第二遍遍历 表关联关系            sql_content_join += " {0}.{1}\n".format(src_db, src_table)            sql_content_join += "\nwhere data_dt = concat(substring('${TX_DATE}',1,4),substring('${TX_DATE}',6,2)," \                                "substring('${TX_DATE}',9,2))" \                                "\nunion all" \                                "\nselect \n"            for line in effect_lines:                line_split = line.split(',')                tmp_key = line_split[4]                sql_content_join += "    {0},\n".format(tmp_key)            # 表关联关系数据清洗            sql_content_join_list = list(sql_content_join)            sql_content_join_list[sql_content_join.rfind(',')] = ''            sql_content_join = ''.join(sql_content_join_list)            # 表的关联关系            print("表关联映射关系:" + str(map_table_union_key))            join_str = ""            for join_item in map_table_union_key:                if map_table_union_key[join_item]!='':                  print(join_item)                  join_str += "a.{0} = b.{1}\n    and ".format(join_item, map_table_union_key[join_item])                else:                  print(join_item)                  print(map_table_union_key1[join_item])                  join_str += "a.{0} = b.{1}\n    and ".format(join_item, map_table_union_key1[join_item])            join_str = join_str[:join_str.rfind('\n    and ')]            # 关联关系拼接            sql_content_join += "from data_lake.{0} a\n".format(target_table)            sql_content_join += """ \nwhere not exists (    select           1      from {0}.{1}   b    where {2}    and b.data_dt = concat(substring('{3}',1,4),substring('{3}',6,2),substring('{3}',9,2))    )""".format(src_db,                src_table,                join_str,                "${TX_DATE}")            # 源表名            sql_annotate += "-- Source table:{0}.{1} \n".format(src_db, src_table)            # 语句拼接            sql_all_write += sql_content_pre            sql_all_write += sql_content_join            sql_all_write += "\n;\n"    # 注释完善    sql_annotate += "-- *********************************** Partiton Column ********************************** \n"    sql_annotate += "-- ETL Frequency: Daily \n"    sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy)    sql_annotate += "-- *********************************** Revision History ********************************* \n"    sql_annotate += "-- Date Revised    Revised by     Revision Note \n"    for line in analyze_file_annotate(param_key_content):        line_split = line.split(',')        name = ""        date = ""        desc = ""        print(line_split)        if len(line_split) >= 4:            name = line_split[0]            try:                date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2)            except Exception as error:                print(error)                date = line_split[1]            desc = line_split[3]        sql_annotate += "-- {0}  {1}  {2} \n".format(            name,            date,            desc        )    sql_annotate += "\n\n"    sql_all_write = sql_annotate + sql_all_write    # 回写文件    with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as inner_file:        inner_file.write(sql_all_write)def analyze_file_f3_strategy(param_key_content, param_target_file_path):    """    策略F3的文件内容解析    :param param_key_content: 文件路径    :param param_target_file_path 目标文件    :return:    """    param_file_path = param_key_content["file_path"]        param_key_f3_content = copy.deepcopy(param_key_content)        with open(param_file_path, 'r', encoding='utf-8') as inner_file:        line_content = inner_file.readlines()        # 有效内容截取        current_index = 0        for line in line_content:            line_split = line.split(',')            if line_split[0] == "源库名":                first_line = line_content[current_index + 1].split(',')                if 'data_dt' in param_key_f3_content["party_key"].lower().strip():                    param_key_f3_content["party_key"] = param_key_f3_content["party_key"].lower().strip().replace('data_dt,','')                    param_key_f3_content["party_key"] = param_key_f3_content["party_key"].replace(',data_dt','')                    param_key_f3_content["party_key"] = param_key_f3_content["party_key"].replace('data_dt','')                if first_line[0] == "sdata_full":                    #param_key_f3_content["load_strategy"] = "F1 - Full Overwrite"                    #param_key_f3_content["party_key"] = ""                    analyze_file_f1_strategy(param_key_f3_content, param_target_file_path)                else:                    #param_key_f3_content["load_strategy"]= "F2 - Update/Insert"                    #param_key_f3_content["party_key"] = ""                    analyze_file_f2_strategy(param_key_f3_content, param_target_file_path)                break            current_index = current_index + 1def analyze_file_f3_strategy_full(param_key_content, param_target_file_path):    """    策略F1的文件内容解析    :param param_key_content: 文件路径    :param param_target_file_path 目标文件    :return:    """        # 新增f3策略下全表    param_key_content["target_table"] = param_key_content["target_table"] + '_full'    param_key_content["target_table_desc"] = param_key_content["target_table_desc"] + '-历史累全量'        load_strategy = param_key_content["load_strategy"]    target_table = param_key_content["target_table"]    target_table_desc = param_key_content["target_table_desc"]    src_table = param_key_content["src_table"]    param_file_path = param_key_content["file_path"]    target_table_key = param_key_content["target_table_key"]    party_key = param_key_content["party_key"]    src_db = None    # debug info    print("\n******************************** 文件关键信息(开始) *************************************")    print("文件路径:", param_file_path)    print("目标英文表名:", target_table)    print("目标中文表名:", target_table_desc)    print("目标表主键:", target_table_key)    print("源表名:", src_table)    print("分区字段:", party_key)    print("加载策略:", load_strategy)    print("********************************* 文件关键信息(结束) ************************************\n")    with open(param_file_path, 'r', encoding='utf-8') as inner_file:        line_content = inner_file.readlines()        # group 分组计数(最多检测10组)        group_count = 0        for index in range(10):            if "Group {0}".format(index) in ''.join(line_content):                group_count += 1        print("分组数量:{0}".format(group_count))        # 注释信息        sql_annotate = "-- ************************************** Base Info ************************************** \n"        sql_annotate += "-- Target Table English Name:{0} \n".format(target_table)        sql_annotate += "-- Target Table Chinese Name:{0} \n".format(target_table_desc)        sql_annotate += "-- Create Date:{0} \n".format(datetime.now())        # 待写入的语句信息        sql_all_write = ""        # 分组数大于1时        if group_count > 0:            for index in range(group_count):                index += 1                group_name = "Group {0}".format(index)                end_name = "修改记录"                if index < group_count:                    end_name = "Group {0}".format(index + 1)                print("开始处理: {0}".format(group_name))                start_index = 0                end_index = 0                current_index = 0                for line in line_content:                    line_split = line.split(',')                    if line_split[0] == group_name:                        start_index = current_index                    if line_split[0] == end_name:                        end_index = current_index                    current_index += 1                effect_lines = line_content[start_index:end_index]                # 源表名                src_table = effect_lines[1].split(',')[1]                print("源表名:{0}".format(src_table))                # 分区字段数据清洗                party_key = party_key.replace(',', ',')                # 组内 sql info                if party_key.strip() == '':                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                              "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                              "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                              "insert overwrite table data_lake.{0} \n" \                              "select \n".format(target_table)                else:                    sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                              "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                              "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                              "insert overwrite table data_lake.{0} \n" \                              "partition({1}) \n"\                              "select \n".format(target_table, party_key)                # 注释拼接                sql_content_pre = "-- ************************************************************************** \n" \                                  + sql_content_pre                sql_content_pre = "--   Group {0}: {1} \n".format(index, target_table) + sql_content_pre                sql_content_pre = "\n-- ************************************************************************** \n" \                                  + sql_content_pre                # 有效的数据内容                effect_lines = effect_lines[4:]                join_index = 0                for line in effect_lines:                    if line.split(',')[0] == "关联条件":                        break                    join_index += 1                effect_lines = effect_lines[:join_index]                # 有效数据的有效内容的拼接                for line in effect_lines:                    inner_line_split = line.split(',')                    # 源表名                    if not src_db:                        src_db = inner_line_split[0]                    # 字段映射                    if inner_line_split[0] == "" or inner_line_split[1] == "":                        if '$' in inner_line_split[7]:                            tmp_value = "\"{0}\"".format(inner_line_split[7]).replace('\n', '')                        else:                            tmp_value = inner_line_split[7].replace('\n', '')                        tmp_desc = inner_line_split[5]                        if '-' in tmp_value:                            tmp_value_split = tmp_value.split('-')                            tmp_value = tmp_value_split[0].lower()                            tmp_value = tmp_value.replace('\'', '')                            tmp_value = tmp_value.replace('\"', '')                            tmp_value = tmp_value.replace('’', '')                            tmp_value = tmp_value.replace('‘', '')                            tmp_value = "\'{0}\'".format(tmp_value)                            tmp_desc = tmp_value_split[1].lower()                        if tmp_value != '':                            if ('-' not in tmp_value) and (' ' not in tmp_value) and tmp_value[-1] == '\'' and \                                    tmp_value[0] != '\'':                                tmp_value = '\'' + tmp_value                        # 数据内容清洗                        tmp_value = tmp_value.replace('\n', '').strip()                        if tmp_value == "current_date\n" or tmp_value == "current_date":                            tmp_value = "current_date()"                        if tmp_value == "":                            tmp_value = "\'\'"                                                if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]):                            line_split[6] = "decimal(38,10)"                                                if 'etl_dt' in inner_line_split[4].lower():                            tmp_value = 'current_timestamp()'                                                if 'date_format' in tmp_value.lower():                            tmp_value = tmp_value.replace('"','')                            tmp_value = tmp_value.replace('@',',')                            tmp_value = "cast({0} as {1})".format(tmp_value, inner_line_split[6])                        '''if tmp_value == "${TX_DATE}" or tmp_value == "\"${TX_DATE}\"":                            tmp_value = tmp_value.replace('\"', '')                            inner_line_split[6] = inner_line_split[6].replace('@',',')                            inner_line_split[6] = inner_line_split[6].replace('"','')                            tmp_value = "cast('{0}' as {1})".format(tmp_value, inner_line_split[6])'''                        # 拼接                        sql_content_pre += "    {0}    as {1},\n".format(tmp_value.replace('\n', ''),                                                                         inner_line_split[4])                    else:                        if 'case' in inner_line_split[7]:                            if inner_line_split[7][0] == "\"":                                tmp_key_list = list(inner_line_split[7])                                tmp_key_list[inner_line_split[7].rfind('"')] = ''                                tmp_key_list[inner_line_split[7].find('"')] = ''                                tmp_key = ''.join(tmp_key_list)                                tmp_key = tmp_key.replace('\n', ' ')                            else:                                tmp_key = inner_line_split[7].replace('\n', ' ')                            tmp_key = tmp_key.replace(u"’", "\'")                            if ' end' in tmp_key:                                tmp_key = "({0})".format(tmp_key)                            else:                                tmp_key = "({0} end )".format(tmp_key)                            tmp_key = tmp_key.replace('when ', ' when ')                            tmp_key = tmp_key.replace('then ', ' then ')                            tmp_key = tmp_key.replace('else ', ' else ')                            tmp_key = tmp_key.replace('  ', ' ')                        elif 'date_format' in inner_line_split[7].lower():                            tmp_key = inner_line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        elif 'concat' in inner_line_split[7].lower():                            tmp_key = inner_line_split[7].replace('"','')                            tmp_key = tmp_key.replace('@',',')                            tmp_key = tmp_key.replace('\n','')                        else:                            tmp_key = inner_line_split[1].lower().replace('\n', ' ')                                                 if inner_line_split[3].lower().strip() != inner_line_split[6].lower().strip():                            '''(inner_line_split[3] == 'tinyint' or inner_line_split[3] == 'int' or 'number' in inner_line_split[3] \                                or (('varchar' in inner_line_split[3]) and ('decimal' in inner_line_split[6])) \                                or (('bigint' in inner_line_split[3]) and ('varchar' in inner_line_split[6]))) \                                or (('string' in inner_line_split[3]) and ('timestamp' in inner_line_split[6])) \                                or (('string' in inner_line_split[3]) and ('date' in inner_line_split[6])) \                                and (inner_line_split[3] != inner_line_split[6]):'''                                                        inner_line_split[6] = inner_line_split[6].replace('@',',')                            inner_line_split[6] = inner_line_split[6].replace('"','')                            if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]):                                inner_line_split[6] = "decimal(38,10)"                            tmp_key = "cast({0} as {1})".format(tmp_key, inner_line_split[6])                        sql_content_pre += "    {0}    as {1},\n".format(tmp_key,inner_line_split[4])                        '''sql_content_pre += "    {0}    as {1},\n".format(                            inner_line_split[1].lower().replace('\n', ''),                            inner_line_split[4])'''                # 源表名                sql_annotate += "-- Source table {2}:{0}.{1} \n".format(src_db, src_table, index)                # 数据清洗 去掉最后一个逗号                sql_content_pre_list = list(sql_content_pre)                sql_content_pre_list[sql_content_pre.rfind(',')] = ''                sql_content_pre = ''.join(sql_content_pre_list)                # 语句拼接                sql_all_write += sql_content_pre                sql_all_write += "\nfrom {0}.{1}".format(src_db, src_table)                # union all 表直接取分号                # if index == group_count:                sql_all_write += "\n;\n"        else:            # 有效信息截取            start_index = 0            end_index = 0            current_index = 0            for line in line_content:                line_split = line.split(',')                if line_split[0] == "源库名":                    start_index = current_index + 1                if line_split[0] == "关联条件":                    end_index = current_index                current_index = current_index + 1            effect_lines = line_content[start_index:end_index]            # 组内 sql info            if party_key.strip() == '':                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                              "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                              "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                              "insert overwrite table data_lake.{0} \n" \                              "select \n".format(target_table)            else:                sql_content_pre = "set hive.exec.dynamic.partition=true;\n" \                              "set hive.exec.dynamic.partition.mode=nonstrict;\n" \                              "set hive.exec.max.dynamic.partitions.pernode = 1000;\n" \                              "insert overwrite table data_lake.{0} \n" \                              "partition({1}) \n"\                              "select \n".format(target_table, party_key)            # 注释拼接            sql_content_pre = "-- ************************************************************************** \n" \                              + sql_content_pre            sql_content_pre = "--   Group 1: {0} \n".format(target_table) + sql_content_pre            sql_content_pre = "\n-- ************************************************************************** \n" \                              + sql_content_pre            # 有效数据的有效内容的拼接            for line in effect_lines:                inner_line_split = line.split(',')                # 源表名                if not src_db:                    src_db = inner_line_split[0]                # 字段映射                if inner_line_split[0] == "" or inner_line_split[1] == "":                    if '$' in inner_line_split[7]:                        tmp_value = "\"{0}\"".format(inner_line_split[7]).replace('\n', '')                    else:                        tmp_value = inner_line_split[7].replace('\n', '')                    if '-' in tmp_value:                        tmp_value_split = tmp_value.split('-')                        tmp_value = tmp_value_split[0].lower()                        tmp_value = tmp_value.replace('\'', '')                        tmp_value = tmp_value.replace('\"', '')                        tmp_value = tmp_value.replace('’', '')                        tmp_value = tmp_value.replace('‘', '')                        tmp_value = "\'{0}\'".format(tmp_value)                    if tmp_value != '':                        if ('-' not in tmp_value) and (' ' not in tmp_value) and tmp_value[-1] == '\'' and tmp_value[0] \                                != '\'':                            tmp_value = '\'' + tmp_value                    # 数据内容清洗                    tmp_value = tmp_value.replace('\n', '').strip()                    if tmp_value == "current_date\n" or tmp_value == "current_date":                        tmp_value = "current_date()"                    if tmp_value == "":                        tmp_value = "\'\'"                                        if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]):                        inner_line_split[6] = "decimal(38,10)"                                        if 'etl_dt' in inner_line_split[4].lower():                        tmp_value = 'current_timestamp()'                                        if 'date_format' in tmp_value.lower():                            tmp_value = tmp_value.replace('"','')                            tmp_value = tmp_value.replace('@',',')                            tmp_value = "cast({0} as {1})".format(tmp_value, inner_line_split[6])                    '''if tmp_value == "${TX_DATE}" or tmp_value == "\"${TX_DATE}\"":                        tmp_value = tmp_value.replace('\"', '')                        inner_line_split[6] = inner_line_split[6].replace('@',',')                        inner_line_split[6] = inner_line_split[6].replace('"','')                        tmp_value = "cast('{0}' as {1})".format(tmp_value, inner_line_split[6])'''                    # 拼接                    sql_content_pre += "    {0}    as {1},\n".format(tmp_value.replace('\n', ''), inner_line_split[4])                else:                    if 'case' in inner_line_split[7]:                        if inner_line_split[7][0] == "\"":                            tmp_key_list = list(inner_line_split[7])                            tmp_key_list[inner_line_split[7].rfind('"')] = ''                            tmp_key_list[inner_line_split[7].find('"')] = ''                            tmp_key = ''.join(tmp_key_list)                            tmp_key = tmp_key.replace('\n', ' ')                        else:                            tmp_key = inner_line_split[7].replace('\n', ' ')                        tmp_key = tmp_key.replace(u"’", "\'")                        if ' end' in tmp_key:                            tmp_key = "({0})".format(tmp_key)                        else:                            tmp_key = "({0} end )".format(tmp_key)                        tmp_key = tmp_key.replace('when ', ' when ')                        tmp_key = tmp_key.replace('then ', ' then ')                        tmp_key = tmp_key.replace('else ', ' else ')                        tmp_key = tmp_key.replace('  ', ' ')                    elif 'date_format' in inner_line_split[7].lower():                        tmp_key = inner_line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    elif 'concat' in inner_line_split[7].lower():                        tmp_key = inner_line_split[7].replace('"','')                        tmp_key = tmp_key.replace('@',',')                        tmp_key = tmp_key.replace('\n','')                    else:                        tmp_key = inner_line_split[1].lower().replace('\n', ' ')                                         if inner_line_split[3].lower().strip() != inner_line_split[6].lower().strip():                        '''(inner_line_split[3] == 'tinyint' or inner_line_split[3] == 'int' or 'number' in inner_line_split[3] \                            or (('varchar' in inner_line_split[3]) and ('decimal' in inner_line_split[6])) \                            or (('bigint' in inner_line_split[3]) and ('varchar' in inner_line_split[6]))) \                            or (('string' in inner_line_split[3]) and ('timestamp' in inner_line_split[6])) \                            or (('string' in inner_line_split[3]) and ('date' in inner_line_split[6])) \                            and (inner_line_split[3] != inner_line_split[6]):'''                                                    inner_line_split[6] = inner_line_split[6].replace('@',',')                        inner_line_split[6] = inner_line_split[6].replace('"','')                        if ("decimal" in inner_line_split[6]) and ('@' in inner_line_split[6]):                            inner_line_split[6] = "decimal(38,10)"                        tmp_key = "cast({0} as {1})".format(tmp_key, inner_line_split[6])                    sql_content_pre += "    {0}    as {1},\n".format(tmp_key,inner_line_split[4])                                        '''sql_content_pre += "    {0}    as {1},\n".format(                        inner_line_split[1].lower().replace('\n', ''),                        inner_line_split[4])'''            # 源表名            sql_annotate += "-- Source table {2}:{0}.{1} \n".format(src_db, src_table, "1")            # 数据清洗 去掉最后一个逗号            sql_content_pre_list = list(sql_content_pre)            sql_content_pre_list[sql_content_pre.rfind(',')] = ''            sql_content_pre = ''.join(sql_content_pre_list)            # 语句拼接            sql_all_write += sql_content_pre            sql_all_write += "\nfrom {0}.{1}".format(src_db, src_table)            sql_all_write += "\n;\n"        # 注释完善        sql_annotate += "-- *********************************** Partiton Column ********************************** \n"        sql_annotate += "-- ETL Frequency: Daily \n"        sql_annotate += "-- ETL Policy: {0} \n".format(load_strategy)        sql_annotate += "-- *********************************** Revision History ********************************* \n"        sql_annotate += "-- Date Revised    Revised by     Revision Note \n"        for line in analyze_file_annotate(param_key_content):            line_split = line.split(',')            name = ""            date = ""            desc = ""            print(line_split)            if len(line_split) >= 4:                name = line_split[0]                try:                    date = datetime.strptime("1900-01-01", '%Y-%m-%d') + timedelta(float(line_split[1]) - 2)                except Exception as error:                    print(error)                    date = line_split[1]                desc = line_split[3]            sql_annotate += "-- {0}  {1}  {2} \n".format(                name,                date,                desc            )        sql_annotate += "\n\n"        sql_all_write = sql_annotate + sql_all_write        # 回写        with open("{0}{1}.sql".format(param_target_file_path, target_table), 'a+', encoding='utf-8') as save_file:            save_file.write(sql_all_write)if __name__ == "__main__":    print("开始转换处理工作流")    # 输入参数量    param_scr_file_path = "./raw_data/"    param_mid_file_path = "./csv_data/"    param_object_file_path = "./sql_data/"    # 结果文件夹准备    if not os.path.exists(param_mid_file_path):        os.mkdir(param_mid_file_path)    if not os.path.exists(param_object_file_path):        os.mkdir(param_object_file_path)    # 转换成 cvs 文件    convert_xlsx_to_cvs(param_scr_file_path, param_mid_file_path)    # 文件内容清洗    clean_cvs_file_content(param_mid_file_path)    # 分析拼接文件关键内容    key_info = analyze_file_key_content(param_mid_file_path)    # 分析文件内容    analyze_file_content(key_info, param_object_file_path)    print("结束转换处理工作流")

转载地址:http://npoen.baihongyu.com/

你可能感兴趣的文章
Java设计模式——观察者模式
查看>>
技术的热门度曲线
查看>>
Java数据库操作(JDBC)——eclipse连接oracle11g教程
查看>>
Java_JDBC_MySQL学习笔记
查看>>
Java数据库连接池 学习笔记
查看>>
Servlet & Jsp Web——Servlet开发(一)
查看>>
web服务器基本原理及Tomcat配置
查看>>
MyEclipse 2017配置Tomcat8
查看>>
HTTP协议简介
查看>>
VISIO 2010,不规则封闭图形填充方法
查看>>
双目立体视觉的数学原理
查看>>
特征值和特征向量
查看>>
AVR(Mega32)读写内部EEPROM
查看>>
牛人主页(主页有很多论文代码)
查看>>
博士学位在就业时有没有用?
查看>>
从一个男人身上看出他的修养和抱负
查看>>
免费编程入门教程资源推荐搜集,分享给想开始学习程序开发的同学
查看>>
5分钟学会用鼠标快速手绘数学公式的两种方法
查看>>
分享一些OpenCV实现立体视觉的经验
查看>>
施一公:优秀博士如何养成(全文) 清华大学演讲
查看>>