Feature:Spark支持GBK文件读取功能
需求分析
Spark默认的Text输入格式处理同Hadoop相同,均默认以UTF-8编码处理。
如果源本件本身是其他格式编码的,如GBK,在Spark处理后读出就会是乱码。
例如一张有关餐馆信息的数据可能是:
4504812|聚品面馆 5623102|金米粒抓饭 5623112|湘下茶饭 4501392|吉东家焖锅店 1130032|红太阳清真食堂 |
但通过Spark-SQL读到一张Text格式的Hive表中就会成为如下形式:
4504812|¾ÛÆ·Ãæ¹Ý 5623102|½ðÃ×Á£×¥·¹ 5623112|ÏæÏ²跹 4501392|¼ª¶«¼Òì˹øµê 1130032|ºìÌ«ÑôÇåÕæÊ³Ìà |
鉴于厂内GBK格式编码的文件大量存在,我们决定在Spark中加入对GBK文件读取的支持。
设计方案
整体方案如下:
- 实现一个通用的CustomEncodeLineRecordReader用于从输入分片中以指定编码格式读取记录。
- 实现一个GBKInputFormat用于GBK编码,实现时通过CustomEncodeLineRecordReader指定编码为GKB读出k-v记录。
InputFormat
Hadoop中所有类型的输入格式均继承InputFormat接口。
public interface InputFormat<K, V> { /** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} * for processing.</p> * * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the * input files are not physically split into chunks. For e.g. a split could * be <i><input-file-path, start, offset></i> tuple. * * @param job job configuration. * @param numSplits the desired number of splits, a hint. * @return an array of {@link InputSplit}s for the job. */ InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; /** * Get the {@link RecordReader} for the given {@link InputSplit}. * * <p>It is the responsibility of the <code>RecordReader</code> to respect * record boundaries while processing the logical split to present a * record-oriented view to the individual task.</p> * * @param split the {@link InputSplit} * @param job the job that this split belongs to * @return a {@link RecordReader} */ RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; } |
RecordReader
通过getSplits获取输入数据分片,通过getRecordReader接口获取一个RecordReader用于从输入分片读出一条条(key, value)形式的记录。
public interface RecordReader<K, V> { /** * Reads the next key/value pair from the input for processing. * * @param key the key to read data into * @param value the value to read data into * @return true iff a key/value was read, false if at EOF */ boolean next(K key, V value) throws IOException; /** * Create an object of the appropriate type to be used as a key. * * @return a new key object. */ K createKey(); /** * Create an object of the appropriate type to be used as a value. * * @return a new value object. */ V createValue(); /** * Returns the current position in the input. * * @return the current position in the input. * @throws IOException */ long getPos() throws IOException; /** * Close this {@link InputSplit} to future operations. * * @throws IOException */ public void close() throws IOException; /** * How much of the input has the {@link RecordReader} consumed i.e. * has been processed by? * * @return progress from <code>0.0</code> to <code>1.0</code>. * @throws IOException */ float getProgress() throws IOException; } |
next用于读出下一个键值对,该方法是CustomEncodeRecordReader需要实现的核心方法。
传统的TextInputFormat使用的LineRecordReader默认是以行问单位处理文件,将每行起始位置相对于文件开头的偏移作为key,Text作为value。
Text内部实现其实是封装了输入文本的字节流,但Text内默认是使用UTF-8编码去解读。
相应地,我们也以文件开头的偏移作为key,Text作为value,不同的是在将Text作为value返回之前,我们将封装的字节流从GBK编码转化为UTF-8编码即可。
后续的流程,就和普通Text文本的处理过程完全相同了。
代码实现
Spark core中添加两个文件:CustomEncodeLineRecordReader.scala和GBKFileInputFormat.scala
分别实现如下:
CustomEncodeLineRecordReader
CustomEncodeLineRecordReader用于以指定编码格式从HDFS某路径逐行读取文件内容。
CustomEncodeLineRecordReader重载了FileInputFormat的next方法,在每次读取出Value时通过方法将字节流由指定的编码转换为UTF-8编码。
GBKFileInputFormat
GBKFileInputFormat用作GBK编码文件的InputFormat。
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License") you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.input import org.apache.hadoop.fs. _ import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.io.compress. _ import org.apache.hadoop.mapred. _ /** * FileInputFormat for gbk encoded files. Files are broken into lines.Either linefeed * or carriage-return are used to signal end of line. Keys are the position in the file, * and values are the line of text and will be converted to UTF-8 Text. */ class GBKFileInputFormat extends FileInputFormat[LongWritable, Text] with JobConfigurable { private var compressionCodecs : CompressionCodecFactory = null def configure(conf : JobConf) = { compressionCodecs = new CompressionCodecFactory(conf) } override protected def isSplitable(fs : FileSystem, file : Path) : Boolean = { val codec : CompressionCodec = compressionCodecs.getCodec(file) if ( null == codec) { return true } return codec.isInstanceOf[SplittableCompressionCodec] } override def getRecordReader( genericSplit : InputSplit, job : JobConf, reporter : Reporter) : RecordReader[LongWritable, Text] = { reporter.setStatus(genericSplit.toString) val delimiter : String = job.get( "textinputformat.record.delimiter" ) var recordDelimiterBytes : Array[Byte] = null if ( null ! = delimiter) { recordDelimiterBytes = delimiter.getBytes( "GBK" ) } return new CustomEncodeLineRecordReader( "GBK" , job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes) } } |
功能测试
使用如下文本testFile测试:
4504812|聚品面馆 5623102|金米粒抓饭 5623112|湘下茶饭 4501392|吉东家焖锅店 1130032|红太阳清真食堂 |
GBK文件生成
使用如下python脚本转成GBK文件
import os import codecs def ReadFile(filePath,encoding = "utf-8" ): with codecs. open (filePath, "r" ,encoding) as f: return f.read() def WriteFile(filePath,u,encoding = "gbk" ): with codecs. open (filePath, "w" ,encoding) as f: f.write(u) def UTF8_2_GBK(src,dst): content = ReadFile(src,encoding = "utf-8" ) WriteFile(dst,content,encoding = "gbk" ) UTF8_2_GBK( "TestFile" , "GBKTestFile" ) |
将测试文件GBKTestFile上传至HDFS
$hadoop fs -put GBKTestFile /user/spark/testdir/gbk/ |
测试GBKFileInputFormat读取
启动spark-shell
执行如下语句:
import org.apache.hadoop.io.{LongWritable, Text} sc.hadoopFile( "/ , classOf[org.apache.spark.input.GBKFileInputFormat], classOf[LongWritable], classOf[Text]) .map(pair = > pair. _ 2 .toString) .saveAsTextFile( "/ ) |
查看output目录下生成的文件: part-00000,part-00001内容:
4504812|聚品面馆 5623102|金米粒抓饭 5623112|湘下茶饭 4501392|吉东家焖锅店 1130032|红太阳清真食堂 |
测试GBKFileInputFormat建表查询
启动spark-sql,建表并load数据
CREATE TABLE `restaurant`( `firm_id` int , ` name ` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT 'org.apache.spark.input.GBKFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' ; load data inpath '/ into table restaurant; |
查询数据:
select * from restaurant; |
输出:
4504812 聚品面馆 5623102 金米粒抓饭 5623112 湘下茶饭 4501392 吉东家焖锅店 1130032 红太阳清真食堂 Time taken: 1.399 seconds, Fetched 5 row(s) |
查询功能正常,整理代码,提交。