• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Feature:Spark支持GBK文件读取功能

SPARK wangting 3周前 (11-01) 18次浏览

Feature:Spark支持GBK文件读取功能

需求分析


Spark默认的Text输入格式处理同Hadoop相同,均默认以UTF-8编码处理。

如果源本件本身是其他格式编码的,如GBK,在Spark处理后读出就会是乱码

例如一张有关餐馆信息的数据可能是:

4504812|聚品面馆
5623102|金米粒抓饭
5623112|湘下茶饭
4501392|吉东家焖锅店
1130032|红太阳清真食堂

但通过Spark-SQL读到一张Text格式的Hive表中就会成为如下形式:

4504812|¾ÛÆ·Ãæ¹Ý
5623102|½ðÃ×Á£×¥·¹
5623112|Ïæϲ跹
4501392|¼ª¶«¼Òì˹øµê
1130032|ºìÌ«ÑôÇåÕæʳÌÃ

鉴于厂内GBK格式编码的文件大量存在,我们决定在Spark中加入对GBK文件读取的支持。

设计方案


整体方案如下:

  • 实现一个通用的CustomEncodeLineRecordReader用于从输入分片中以指定编码格式读取记录。
  • 实现一个GBKInputFormat用于GBK编码,实现时通过CustomEncodeLineRecordReader指定编码为GKB读出k-v记录。

InputFormat

Hadoop中所有类型的输入格式均继承InputFormat接口。

public interface InputFormat<K, V> {
  /**
   * Logically split the set of input files for the job.
   *
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple.
   *
   * @param job job configuration.
   * @param numSplits the desired number of splits, a hint.
   * @return an array of {@link InputSplit}s for the job.
   */
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  /**
   * Get the {@link RecordReader} for the given {@link InputSplit}.
   *
   * <p>It is the responsibility of the <code>RecordReader</code> to respect
   * record boundaries while processing the logical split to present a
   * record-oriented view to the individual task.</p>
   *
   * @param split the {@link InputSplit}
   * @param job the job that this split belongs to
   * @return a {@link RecordReader}
   */
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;
}

RecordReader

通过getSplits获取输入数据分片,通过getRecordReader接口获取一个RecordReader用于从输入分片读出一条条(key, value)形式的记录。

public interface RecordReader<K, V> {
  /**
   * Reads the next key/value pair from the input for processing.
   *
   * @param key the key to read data into
   * @param value the value to read data into
   * @return true iff a key/value was read, false if at EOF
   */
  boolean next(K key, V value) throws IOException;
  /**
   * Create an object of the appropriate type to be used as a key.
   *
   * @return a new key object.
   */
  K createKey();
  /**
   * Create an object of the appropriate type to be used as a value.
   *
   * @return a new value object.
   */
  V createValue();
  /**
   * Returns the current position in the input.
   *
   * @return the current position in the input.
   * @throws IOException
   */
  long getPos() throws IOException;
  /**
   * Close this {@link InputSplit} to future operations.
   *
   * @throws IOException
   */
  public void close() throws IOException;
  /**
   * How much of the input has the {@link RecordReader} consumed i.e.
   * has been processed by?
   *
   * @return progress from <code>0.0</code> to <code>1.0</code>.
   * @throws IOException
   */
  float getProgress() throws IOException;
}

next用于读出下一个键值对,该方法是CustomEncodeRecordReader需要实现的核心方法。

传统的TextInputFormat使用的LineRecordReader默认是以行问单位处理文件,将每行起始位置相对于文件开头的偏移作为key,Text作为value。

Text内部实现其实是封装了输入文本的字节流,但Text内默认是使用UTF-8编码去解读。

相应地,我们也以文件开头的偏移作为key,Text作为value,不同的是在将Text作为value返回之前,我们将封装的字节流从GBK编码转化为UTF-8编码即可。

后续的流程,就和普通Text文本的处理过程完全相同了。

代码实现


Spark core中添加两个文件:CustomEncodeLineRecordReader.scala和GBKFileInputFormat.scala

分别实现如下:

CustomEncodeLineRecordReader

CustomEncodeLineRecordReader用于以指定编码格式从HDFS某路径逐行读取文件内容。

CustomEncodeLineRecordReader实现 展开源码

CustomEncodeLineRecordReader重载了FileInputFormat的next方法,在每次读取出Value时通过方法将字节流由指定的编码转换为UTF-8编码。

GBKFileInputFormat

GBKFileInputFormat用作GBK编码文件的InputFormat。

 /**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License") you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.input
import org.apache.hadoop.fs._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.mapred._
/**
 * FileInputFormat for gbk encoded files. Files are broken into lines.Either linefeed
 * or carriage-return are used to signal end of line.  Keys are the position in the file,
 * and values are the line of text and will be converted to UTF-8 Text.
 */
class GBKFileInputFormat extends FileInputFormat[LongWritable, Text]
  with JobConfigurable {
  private var compressionCodecs: CompressionCodecFactory = null
  def configure(conf: JobConf) = {
    compressionCodecs = new CompressionCodecFactory(conf)
  }
  override protected def isSplitable(fs: FileSystem, file: Path): Boolean = {
    val codec: CompressionCodec = compressionCodecs.getCodec(file)
    if (null == codec) {
      return true
    }
    return codec.isInstanceOf[SplittableCompressionCodec]
  }
  override def getRecordReader(
            genericSplit: InputSplit,
            job: JobConf,
            reporter: Reporter): RecordReader[LongWritable, Text] = {
    reporter.setStatus(genericSplit.toString)
    val delimiter: String = job.get("textinputformat.record.delimiter")
    var recordDelimiterBytes: Array[Byte] = null
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes("GBK")
    }
    return new CustomEncodeLineRecordReader("GBK", job, genericSplit.asInstanceOf[FileSplit],
        recordDelimiterBytes)
  }
}

功能测试


使用如下文本testFile测试:

4504812|聚品面馆
5623102|金米粒抓饭
5623112|湘下茶饭
4501392|吉东家焖锅店
1130032|红太阳清真食堂

GBK文件生成

使用如下python脚本转成GBK文件

import os
import codecs
def ReadFile(filePath,encoding="utf-8"):
    with codecs.open(filePath,"r",encoding) as f:
        return f.read()
def WriteFile(filePath,u,encoding="gbk"):
    with codecs.open(filePath,"w",encoding) as f:
        f.write(u)
def UTF8_2_GBK(src,dst):
    content = ReadFile(src,encoding="utf-8")
    WriteFile(dst,content,encoding="gbk")
UTF8_2_GBK("TestFile""GBKTestFile")

将测试文件GBKTestFile上传至HDFS

$hadoop fs -put GBKTestFile /user/spark/testdir/gbk/

测试GBKFileInputFormat读取

启动spark-shell

执行如下语句:

import org.apache.hadoop.io.{LongWritable, Text}
sc.hadoopFile("/user/spark/testdir/gbk/GBKTestFile"
    classOf[org.apache.spark.input.GBKFileInputFormat], 
    classOf[LongWritable], classOf[Text])
    .map(pair => pair._2.toString)
    .saveAsTextFile("/user/spark/testdir/output")

查看output目录下生成的文件: part-00000,part-00001内容:

4504812|聚品面馆
5623102|金米粒抓饭
5623112|湘下茶饭
4501392|吉东家焖锅店
1130032|红太阳清真食堂

测试GBKFileInputFormat建表查询

启动spark-sql,建表并load数据

CREATE  TABLE `restaurant`(
  `firm_id` int,
  `name` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
STORED AS INPUTFORMAT
  'org.apache.spark.input.GBKFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
 
load data inpath '/user/spark/testdir/GBKTestFile' into table restaurant;

查询数据:

select from restaurant;

输出:

4504812 聚品面馆
5623102 金米粒抓饭
5623112 湘下茶饭
4501392 吉东家焖锅店
1130032 红太阳清真食堂
Time taken: 1.399 seconds, Fetched 5 row(s)

查询功能正常,整理代码,提交。


喜欢 (0)