• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

20160126 解决DataFrame load空文件报错问题

SPARK wangting 3周前 (11-13) 28次浏览

复现问题


Spark 1.4中使用spark-shell执行如下操作

scala> sqlContext.read.load("/app/dc/yarn/QA/p.txt")

报如下错误:

Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://nmg01-mulan-hdfs.dmop.baidu.com:54310/app/dc/yarn/QA/p.txt is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10]
        at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247)
        at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:640)
        at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:629)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hdfs://nmg01-mulan-hdfs.dmop.baidu.com:54310/app/dc/yarn/QA/p.txt is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10]
        at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:422)
        at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237)
        at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more

追溯源码


根据调用栈和日志打印信息,分析错误出现在rcfile格式检查阶段。

DataFrameReader

private[spark] def defaultDataSourceName: String =
  getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_= {
  val loader = Utils.getContextOrSparkClassLoader
  if (builtinSources.contains(provider)) {
    return loader.loadClass(builtinSources(provider))
  }
  try {
    loader.loadClass(provider)
  catch {
    case cnf: java.lang.ClassNotFoundException =>
      try {
        loader.loadClass(provider + ".DefaultSource")
      catch {
        case cnf: java.lang.ClassNotFoundException =>
          if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
            sys.error("The ORC data source must be used with Hive support enabled.")
          else {
            sys.error(s"Failed to load class for data source: $provider")
          }
      }
  }
}
/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
    sqlContext: SQLContext,
    userSpecifiedSchema: Option[StructType],
    partitionColumns: Array[String],
    provider: String,
    options: Map[String, String]): ResolvedDataSource = {
  val clazz: Class[_= lookupDataSource(provider)
  def className: String = clazz.getCanonicalName
  val relation = userSpecifiedSchema match {
    case Some(schema: StructType) => clazz.newInstance() match {
      case dataSource: SchemaRelationProvider =>
        dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
  ...
      // For partitioned relation r, r.schema's column ordering can be different from the column
      // ordering of data.logicalPlan (partition columns are all moved after data column).  This
      // will be adjusted within InsertIntoHadoopFsRelation.
      sqlContext.executePlan(
        InsertIntoHadoopFsRelation(
          r,
          data.logicalPlan,
          mode)).toRdd
      r
    case _ =>
      sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
  }
  new ResolvedDataSource(clazz, relation)
}

 

由HiveFileFormatUtils.checkInputFormat完成,从这部分代码入手排查:

if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
  // Check if the file format of the file matches that of the table.
  boolean flag = HiveFileFormatUtils.checkInputFormat(
      fs, conf, tbd.getTable().getInputFileFormatClass(), files);
  if (!flag) {
    throw new HiveException(
        "Wrong file format. Please check the file's format.");
  }
}

checkInputFormat实现如下:

/**
 * checks if files are in same format as the given input format.
 */
@SuppressWarnings("unchecked")
public static boolean checkInputFormat(FileSystem fs, HiveConf conf,
    Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files)
    throws HiveException {
  if (files.size() > 0) {
    Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
    if (checkerCls == null
        && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
      // we get a text input format here, we can not determine a file is text
      // according to its content, so we can do is to test if other file
      // format can accept it. If one other file format can accept this file,
      // we treat this file as text file, although it maybe not.
      return checkTextInputFormat(fs, conf, files);
    }
    if (checkerCls != null) {
      InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache
          .get(checkerCls);
      try {
        if (checkerInstance == null) {
          checkerInstance = checkerCls.newInstance();
          inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance);
        }
        return checkerInstance.validateInput(fs, conf, files);
      catch (Exception e) {
        throw new HiveException(e);
      }
    }
    return true;
  }
  return false;
}

添加打印信息,运行时files.size(),inputFormatCls,checkerInstance值如下:

files.size() = 2
inputFormatCls: org.apache.hadoop.hive.ql.io.RCFileInputFormat
checkerInstance class: org.apache.hadoop.hive.ql.io.RCFileInputFormat

可见是在RCFileInputFormat.validateInput一步出了问题。

RCFileInputFormat.validateInput

查看RCFileInputFormat.validateInput实现,代码如下:

@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
    ArrayList<FileStatus> files) throws IOException {
  if (files.size() <= 0) {
    return false;
  }
  for (int fileId = 0; fileId < files.size(); fileId++) {
    RCFile.Reader reader = null;
    try {
      reader = new RCFile.Reader(fs, files.get(fileId)
          .getPath(), conf);
      reader.close();
      reader = null;
    catch (IOException e) {
      return false;
    finally {
      if (null != reader) {
        reader.close();
      }
    }
  }
  return true;
}

这里会对输入目录的每个文件构造一个RCFIile.Reader,来检测其是否是RCFile格式。

RCFIile.Reader

追溯Reader代码,寻找异常抛出点:

/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
    long start, long length) throws IOException {
  tolerateCorruptions = conf.getBoolean(
    TOLERATE_CORRUPTIONS_CONF_STR, false);
  conf.setInt("io.file.buffer.size", bufferSize);
  this.file = file;
  in = openFile(fs, file, bufferSize, length);
  this.conf = conf;
  end = start + length;
  boolean succeed = false;
  try {
    if (start > 0) {
      seek(0);
      init();       <-- 异常从init()方法中抛出
      seek(start);
    else {
      init();
    }
    succeed = true;
  finally {
    ...
  }
  ...
}

定位到异常从init()方法中抛出:

   private void init() throws IOException {
      byte[] magic = new byte[MAGIC.length];
      System.out.println("tp1.1"); //WangDbg
      in.readFully(magic);
      System.out.println("tp1.2"); //WangDbg
      if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
        byte vers = in.readByte();
        if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
          throw new IOException(file + " is a version " + vers +
                                " SequenceFile instead of an RCFile.");
        }
        version = ORIGINAL_VERSION;
      else {
        ...
      }
      ...
    }

运行时tp1.1打印出,而tp1.2没有,说明异常发生在in.readFully()中。

这是peta的FSDataInputStream的一个接口,作用是从当前输入流读入字节倒传入到Byte Buffer数组中。

分析结论


运行时打印抛异常时正在读取的文件路径:

/app/dc/spark/wangtcc/warehouse/test.db/people_rc1/_SUCCESS

原来出错是因为RCFile Reader将HDFS中标志数据状态的文件_SUCCESS也当做数据文件来进行校验了。

_SUCCESS文件是一个空文件,而RCFile Reader将其打开期望从中读出3个字节校验是否与RCFILE格式中的”魔数”一致。

结果就抛出了异常。

对比ORCFILE,因为起校验阶段checkerCls = getInputFormatChecker一步得到的checkerCls为空,所以直接略过了validateInput方法。

因而没有出问题。

按道理在做校验的时候应该跳过像_SUCCESS这样的空文件。

因此修改代码如下:

for (int fileId = 0; fileId < files.size(); fileId++) {
  RCFile.Reader reader = null;
  file = files.get(fileId);
  // skip empty files under the input directory
  if(file.getLen() == 0) {  
    continue;   <-- 略过空文件
  }
  try {
    reader = new RCFile.Reader(fs, file.getPath(), conf);
    reader.close();
    reader = null;
  catch (IOException e) {
    return false;
  finally {
    if (null != reader) {
      reader.close();
    }
  }
}

针对输入目录的每个文件,检查其长度是否为0,空文件直接跳过即可。

Spark 1.4中:

– sqlContext通过DataFrameReader的load()接口读取文件时时默认格式为parquet。

– 如需读取其他结构化文件格式(orc,jdbc,json)可通过spark.sql.sources.default配置。

– 如需读取text文本构造DataFrame需要通过textFile生成RDD,再通过applySchema接口指定Schema信息转化为DataFrame。


喜欢 (0)