复现问题
Spark 1.4中使用spark-shell执行如下操作
scala> sqlContext.read.load("/app/dc/yarn/QA/p.txt") |
报如下错误:
Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://nmg01-mulan-hdfs.dmop.baidu.com:54310/app/dc/yarn/QA/p.txt is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10] at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:640) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:629) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.RuntimeException: hdfs://nmg01-mulan-hdfs.dmop.baidu.com:54310/app/dc/yarn/QA/p.txt is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:422) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more |
追溯源码
根据调用栈和日志打印信息,分析错误出现在rcfile格式检查阶段。
DataFrameReader
private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") |
/** Given a provider name, look up the data source class definition. */def lookupDataSource(provider: String): Class[_] = { val loader = Utils.getContextOrSparkClassLoader if (builtinSources.contains(provider)) { return loader.loadClass(builtinSources(provider)) } try { loader.loadClass(provider) } catch { case cnf: java.lang.ClassNotFoundException => try { loader.loadClass(provider + ".DefaultSource") } catch { case cnf: java.lang.ClassNotFoundException => if (provider.startsWith("org.apache.spark.sql.hive.orc")) { sys.error("The ORC data source must be used with Hive support enabled.") } else { sys.error(s"Failed to load class for data source: $provider") } } }} |
/** Create a [[ResolvedDataSource]] for reading data in. */def apply( sqlContext: SQLContext, userSpecifiedSchema: Option[StructType], partitionColumns: Array[String], provider: String, options: Map[String, String]): ResolvedDataSource = { val clazz: Class[_] = lookupDataSource(provider) def className: String = clazz.getCanonicalName val relation = userSpecifiedSchema match { case Some(schema: StructType) => clazz.newInstance() match { case dataSource: SchemaRelationProvider => dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) ... // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. sqlContext.executePlan( InsertIntoHadoopFsRelation( r, data.logicalPlan, mode)).toRdd r case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") } new ResolvedDataSource(clazz, relation)} |
由HiveFileFormatUtils.checkInputFormat完成,从这部分代码入手排查:
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { // Check if the file format of the file matches that of the table. boolean flag = HiveFileFormatUtils.checkInputFormat( fs, conf, tbd.getTable().getInputFileFormatClass(), files); if (!flag) { throw new HiveException( "Wrong file format. Please check the file's format."); }} |
checkInputFormat实现如下:
/** * checks if files are in same format as the given input format. */@SuppressWarnings("unchecked")public static boolean checkInputFormat(FileSystem fs, HiveConf conf, Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files) throws HiveException { if (files.size() > 0) { Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls); if (checkerCls == null && inputFormatCls.isAssignableFrom(TextInputFormat.class)) { // we get a text input format here, we can not determine a file is text // according to its content, so we can do is to test if other file // format can accept it. If one other file format can accept this file, // we treat this file as text file, although it maybe not. return checkTextInputFormat(fs, conf, files); } if (checkerCls != null) { InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache .get(checkerCls); try { if (checkerInstance == null) { checkerInstance = checkerCls.newInstance(); inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance); } return checkerInstance.validateInput(fs, conf, files); } catch (Exception e) { throw new HiveException(e); } } return true; } return false;} |
添加打印信息,运行时files.size(),inputFormatCls,checkerInstance值如下:
files.size() = 2inputFormatCls: org.apache.hadoop.hive.ql.io.RCFileInputFormatcheckerInstance class: org.apache.hadoop.hive.ql.io.RCFileInputFormat |
可见是在RCFileInputFormat.validateInput一步出了问题。
RCFileInputFormat.validateInput
查看RCFileInputFormat.validateInput实现,代码如下:
@Overridepublic boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws IOException { if (files.size() <= 0) { return false; } for (int fileId = 0; fileId < files.size(); fileId++) { RCFile.Reader reader = null; try { reader = new RCFile.Reader(fs, files.get(fileId) .getPath(), conf); reader.close(); reader = null; } catch (IOException e) { return false; } finally { if (null != reader) { reader.close(); } } } return true;} |
这里会对输入目录的每个文件构造一个RCFIile.Reader,来检测其是否是RCFile格式。
RCFIile.Reader
追溯Reader代码,寻找异常抛出点:
/** Create a new RCFile reader. */public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, long start, long length) throws IOException { tolerateCorruptions = conf.getBoolean( TOLERATE_CORRUPTIONS_CONF_STR, false); conf.setInt("io.file.buffer.size", bufferSize); this.file = file; in = openFile(fs, file, bufferSize, length); this.conf = conf; end = start + length; boolean succeed = false; try { if (start > 0) { seek(0); init(); <-- 异常从init()方法中抛出 seek(start); } else { init(); } succeed = true; } finally { ... } ...} |
定位到异常从init()方法中抛出:
private void init() throws IOException { byte[] magic = new byte[MAGIC.length]; System.out.println("tp1.1"); //WangDbg in.readFully(magic); System.out.println("tp1.2"); //WangDbg if (Arrays.equals(magic, ORIGINAL_MAGIC)) { byte vers = in.readByte(); if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { throw new IOException(file + " is a version " + vers + " SequenceFile instead of an RCFile."); } version = ORIGINAL_VERSION; } else { ... } ... } |
运行时tp1.1打印出,而tp1.2没有,说明异常发生在in.readFully()中。
这是peta的FSDataInputStream的一个接口,作用是从当前输入流读入字节倒传入到Byte Buffer数组中。
分析结论
运行时打印抛异常时正在读取的文件路径:
/app/dc/spark/wangtcc/warehouse/test.db/people_rc1/_SUCCESS |
原来出错是因为RCFile Reader将HDFS中标志数据状态的文件_SUCCESS也当做数据文件来进行校验了。
_SUCCESS文件是一个空文件,而RCFile Reader将其打开期望从中读出3个字节校验是否与RCFILE格式中的”魔数”一致。
结果就抛出了异常。
对比ORCFILE,因为起校验阶段checkerCls = getInputFormatChecker一步得到的checkerCls为空,所以直接略过了validateInput方法。
因而没有出问题。
按道理在做校验的时候应该跳过像_SUCCESS这样的空文件。
因此修改代码如下:
for (int fileId = 0; fileId < files.size(); fileId++) { RCFile.Reader reader = null; file = files.get(fileId); // skip empty files under the input directory if(file.getLen() == 0) { continue; <-- 略过空文件 } try { reader = new RCFile.Reader(fs, file.getPath(), conf); reader.close(); reader = null; } catch (IOException e) { return false; } finally { if (null != reader) { reader.close(); } }} |
针对输入目录的每个文件,检查其长度是否为0,空文件直接跳过即可。
Spark 1.4中:
– sqlContext通过DataFrameReader的load()接口读取文件时时默认格式为parquet。
– 如需读取其他结构化文件格式(orc,jdbc,json)可通过spark.sql.sources.default配置。
– 如需读取text文本构造DataFrame需要通过textFile生成RDD,再通过applySchema接口指定Schema信息转化为DataFrame。