11.3. prsto查询hive的空文件报错

presto查询hive的connector的表数据的时候,如果hive表中存在空文件会导致presto查询报错。所以我们需要找到hdfs里面的空文件,并且将该空文件删除掉。目前我是编写了一个python
脚本程序进行自动查找空文件和删除空文件。

11.3.1. 报错异常

xxxx.parquet file is small or is not a parquet file

11.3.2. 解决办法

编写python脚本查找和删除hdfs空文件

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019/7/26 11:36
# @Author  : ganliang
# @File    : hdfsemptytool.py
# @Desc    : HDFS工具類,提供根据文件大小查找文件,删除文件等功能

import logging
import os
import re
import sys

LOGGING_CONFIG = {
    # "filename": "config.log",
    # "filemode": "w",
    "format": "%(asctime)s|%(process)d|%(thread)d|%(filename)s[%(funcName)s:%(lineno)d]|%(levelname)s|%(message)s",
    "level": logging.INFO
}
logging.basicConfig(**LOGGING_CONFIG)


def find_file_bysize(urls=None, empty_files=None, beginsize=0L, endsize=0L):
    """
    根据问价的大小来匹配文件
    :param urls: 需要收集的url列表
    :param empty_files: 收集到的空文件列表
    :param beginsize: 从多少字节开始
    :param endsize: 从多少字节结束
    :return:
    """
    if endsize == 0L: endsize = beginsize
    if endsize < beginsize: raise Exception("endsize is letter than beginsize")

    for url in urls:
        hdfs_cmd = "hdfs dfs -ls {0}".format(url)
        logging.debug(hdfs_cmd)
        std_in, std_out = os.popen2(hdfs_cmd)
        lines = std_out.readlines()

        for line in lines:
            line = line.strip()
            hdfs_attrs = re.split("\s+", line)
            logging.debug(line)
            if len(hdfs_attrs) < 6: continue
            url = hdfs_attrs[-1]
            permissiong = hdfs_attrs[0]
            # 目录
            if permissiong.startswith("d"):
                find_file_bysize([url], empty_files, beginsize, endsize)
            # 文件
            else:
                size = hdfs_attrs[4]
                if int(size) >= beginsize and int(size) <= endsize:
                    empty_files.append(url)
                    logging.info("find:{0}".format(line))
    return empty_files


def remover_file(empty_files=None):
    """
    删除空文件
    :param empty_files: 收集的空文件
    :return:
    """
    for empty_file in empty_files:
        delete_cmd = "hdfs dfs -rm -skipTrash {0}".format(empty_file)
        logging.info(delete_cmd)
        dstd_in, dstd_out = os.popen2(delete_cmd)
        logging.info("".join(dstd_out.readlines()))


def merge_files(url):
    """
    合并目录下的文件
    :param url: 目录文件
    :return:
    """
    pass


if __name__ == "__main__":

    args = sys.argv[1:]

    operation = str(args[0]).upper() if len(args) > 0 else ""


    def _find_hdfs_file_(args):
        urls, beginsize, endsize = args[1], 0L, 0L
        if len(args) >= 3: beginsize = args[2]
        if len(args) >= 4: endsize = args[3]
        hdfs_urls = [url for url in urls.split(",")]
        logging.info("\n".join(hdfs_urls))
        empty_files = find_file_bysize(hdfs_urls, [], beginsize, endsize)
        logging.info("total empty files:{0}".format(len(empty_files)))
        return empty_files


    ##找到匹配的文件
    if operation == "FIND":
        if len(args) < 2:
            logging.error("usage: python hdfstool.py find urls beginsize[0] endsize[0]")
            sys.exit(-1)
        _find_hdfs_file_(args)
    # 删除指定的文件
    elif operation == "REMOVE":
        if len(args) < 2:
            logging.error("usage: python hdfstool.py remover urls beginsize[0] endsize[0]")
            sys.exit(-1)
        empty_files = _find_hdfs_file_(args)
        remover_file(empty_files)
    else:
        logging.error("usage: python hdfstool.py find|remove urls beginsize[0] endsize[0]")