复现问题
Spark 1.4中使用spark-shell执行如下操作
scala> sqlContext. read . load ( "/app/dc/yarn/QA/p.txt" ) |
报如下错误:
Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times , most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost): java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs: //nmg01-mulan-hdfs .dmop.baidu.com:54310 /app/dc/yarn/QA/p .txt is not a Parquet file . expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10] at org.apache.parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:247) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:640) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$24.apply(newParquet.scala:629) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: hdfs: //nmg01-mulan-hdfs .dmop.baidu.com:54310 /app/dc/yarn/QA/p .txt is not a Parquet file . expected magic number at tail [80, 65, 82, 49] but found [44, 50, 48, 10] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:422) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:237) at org.apache.parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:233) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more |
追溯源码
根据调用栈和日志打印信息,分析错误出现在rcfile格式检查阶段。
DataFrameReader
private [spark] def defaultDataSourceName : String = getConf(DEFAULT _ DATA _ SOURCE _ NAME, "org.apache.spark.sql.parquet" ) |
/** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider : String) : Class[ _ ] = { val loader = Utils.getContextOrSparkClassLoader if (builtinSources.contains(provider)) { return loader.loadClass(builtinSources(provider)) } try { loader.loadClass(provider) } catch { case cnf : java.lang.ClassNotFoundException = > try { loader.loadClass(provider + ".DefaultSource" ) } catch { case cnf : java.lang.ClassNotFoundException = > if (provider.startsWith( "org.apache.spark.sql.hive.orc" )) { sys.error( "The ORC data source must be used with Hive support enabled." ) } else { sys.error(s "Failed to load class for data source: $provider" ) } } } } |
/** Create a [[ResolvedDataSource]] for reading data in. */ def apply( sqlContext : SQLContext, userSpecifiedSchema : Option[StructType], partitionColumns : Array[String], provider : String, options : Map[String, String]) : ResolvedDataSource = { val clazz : Class[ _ ] = lookupDataSource(provider) def className : String = clazz.getCanonicalName val relation = userSpecifiedSchema match { case Some(schema : StructType) = > clazz.newInstance() match { case dataSource : SchemaRelationProvider = > dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) ... // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. sqlContext.executePlan( InsertIntoHadoopFsRelation( r, data.logicalPlan, mode)).toRdd r case _ = > sys.error(s "${clazz.getCanonicalName} does not allow create table as select." ) } new ResolvedDataSource(clazz, relation) } |
由HiveFileFormatUtils.checkInputFormat完成,从这部分代码入手排查:
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { // Check if the file format of the file matches that of the table. boolean flag = HiveFileFormatUtils.checkInputFormat( fs, conf, tbd.getTable().getInputFileFormatClass(), files); if (!flag) { throw new HiveException( "Wrong file format. Please check the file's format." ); } } |
checkInputFormat实现如下:
/** * checks if files are in same format as the given input format. */ @SuppressWarnings ( "unchecked" ) public static boolean checkInputFormat(FileSystem fs, HiveConf conf, Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files) throws HiveException { if (files.size() > 0 ) { Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls); if (checkerCls == null && inputFormatCls.isAssignableFrom(TextInputFormat. class )) { // we get a text input format here, we can not determine a file is text // according to its content, so we can do is to test if other file // format can accept it. If one other file format can accept this file, // we treat this file as text file, although it maybe not. return checkTextInputFormat(fs, conf, files); } if (checkerCls != null ) { InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache .get(checkerCls); try { if (checkerInstance == null ) { checkerInstance = checkerCls.newInstance(); inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance); } return checkerInstance.validateInput(fs, conf, files); } catch (Exception e) { throw new HiveException(e); } } return true ; } return false ; } |
添加打印信息,运行时files.size(),inputFormatCls,checkerInstance值如下:
files.size() = 2 inputFormatCls: org.apache.hadoop.hive.ql.io.RCFileInputFormat checkerInstance class : org.apache.hadoop.hive.ql.io.RCFileInputFormat |
可见是在RCFileInputFormat.validateInput一步出了问题。
RCFileInputFormat.validateInput
查看RCFileInputFormat.validateInput实现,代码如下:
@Override public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws IOException { if (files.size() <= 0 ) { return false ; } for ( int fileId = 0 ; fileId < files.size(); fileId++) { RCFile.Reader reader = null ; try { reader = new RCFile.Reader(fs, files.get(fileId) .getPath(), conf); reader.close(); reader = null ; } catch (IOException e) { return false ; } finally { if ( null != reader) { reader.close(); } } } return true ; } |
这里会对输入目录的每个文件构造一个RCFIile.Reader,来检测其是否是RCFile格式。
RCFIile.Reader
追溯Reader代码,寻找异常抛出点:
/** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, long start, long length) throws IOException { tolerateCorruptions = conf.getBoolean( TOLERATE_CORRUPTIONS_CONF_STR, false ); conf.setInt( "io.file.buffer.size" , bufferSize); this .file = file; in = openFile(fs, file, bufferSize, length); this .conf = conf; end = start + length; boolean succeed = false ; try { if (start > 0 ) { seek( 0 ); init(); <-- 异常从init()方法中抛出 seek(start); } else { init(); } succeed = true ; } finally { ... } ... } |
定位到异常从init()方法中抛出:
private void init() throws IOException { byte [] magic = new byte [MAGIC.length]; System.out.println( "tp1.1" ); //WangDbg in.readFully(magic); System.out.println( "tp1.2" ); //WangDbg if (Arrays.equals(magic, ORIGINAL_MAGIC)) { byte vers = in.readByte(); if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { throw new IOException(file + " is a version " + vers + " SequenceFile instead of an RCFile." ); } version = ORIGINAL_VERSION; } else { ... } ... } |
运行时tp1.1打印出,而tp1.2没有,说明异常发生在in.readFully()中。
这是peta的FSDataInputStream的一个接口,作用是从当前输入流读入字节倒传入到Byte Buffer数组中。
分析结论
运行时打印抛异常时正在读取的文件路径:
/app/dc/spark/wangtcc/warehouse/test .db /people_rc1/_SUCCESS |
原来出错是因为RCFile Reader将HDFS中标志数据状态的文件_SUCCESS也当做数据文件来进行校验了。
_SUCCESS文件是一个空文件,而RCFile Reader将其打开期望从中读出3个字节校验是否与RCFILE格式中的”魔数”一致。
结果就抛出了异常。
对比ORCFILE,因为起校验阶段checkerCls = getInputFormatChecker一步得到的checkerCls为空,所以直接略过了validateInput方法。
因而没有出问题。
按道理在做校验的时候应该跳过像_SUCCESS这样的空文件。
因此修改代码如下:
for ( int fileId = 0 ; fileId < files.size(); fileId++) { RCFile.Reader reader = null ; file = files.get(fileId); // skip empty files under the input directory if (file.getLen() == 0 ) { continue ; <-- 略过空文件 } try { reader = new RCFile.Reader(fs, file.getPath(), conf); reader.close(); reader = null ; } catch (IOException e) { return false ; } finally { if ( null != reader) { reader.close(); } } } |
针对输入目录的每个文件,检查其长度是否为0,空文件直接跳过即可。
Spark 1.4中:
– sqlContext通过DataFrameReader的load()接口读取文件时时默认格式为parquet。
– 如需读取其他结构化文件格式(orc,jdbc,json)可通过spark.sql.sources.default配置。
– 如需读取text文本构造DataFrame需要通过textFile生成RDD,再通过applySchema接口指定Schema信息转化为DataFrame。