• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

spark-streaming中用spark-sql(insert into table)方式落地文件,group by 落地影响文件个数

SPARK wangting 2年前 (2018-01-30) 665次浏览

程序描述: sparkstreaming读取kafka文件,把需要的数据实时落地

例如只落地去重字段可以计算 实时pvuv,实时累计的pvuv

 

两种方式, 第一种每个批次落地只产生一个文件

第二种每个批次落地产生partitions个数个文件, 区别只在标红的地方

 

方式一, 每10分钟一个批次,只会产生一个文件

package com.suda

import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
* Created by wangting1 on 2018/1/3.
*
*/
object WeiboLetterEventMain {
val log : Logger = Logger.getLogger(WeiboLetterEventMain.getClass)

def main(args: Array[String]) : Unit = {

val topicSet=Set(“xx”)
val group=”xx_group1″
val brokers=”xx”
val kafkaParams = Map(“metadata.broker.list” -> brokers,
“group.id” -> group)
val checkpointDirectory = “/user/suda/temp/checkpoint/realtime_weibo_letter_event”
val sparkConf = new SparkConf().setAppName(“realtime_weibo_letter_event”)
val ssc = new StreamingContext(sparkConf, Seconds(600))//10分钟
ssc.checkpoint(checkpointDirectory)

 

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

messages.foreachRDD{(rdd, time:Time) =>

val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
import spark.sql
val millis = DateUtils.getFloorStamp(time.milliseconds – 1000, 10)
//避免整点批次时间的数据到下一个批次中
val dayStr = DateUtils.getDay(millis)
val hourStr = DateUtils.formatHH(millis)
val minuteStr = DateUtils.formatMm(millis)

val rdd01=rdd.map(_._2).map(x => x.split(“\t”, -1)).filter(x => x.length >= 10 && x(2) == “2028810631” && x(9).contains(“order_id”))
.map{x =>
try{
val temp = x(9).split(‘,’)
val msgid = temp(1).split(“=>”)(1)
val order_id = temp(2).split(“=>”)(1)
(msgid, order_id)
}catch {
case e: Exception => {
log.info(“WeiboLetterEventMain error”)
e.printStackTrace()
null
}
}
}.filter(null!=_).distinct()

 

if (!rdd01.isEmpty()){
rdd01.toDF(“msg_id”,”order_id”).repartition(1).createOrReplaceTempView(“tem_table_weibo_letter_event”)

sql(
s”insert into table mds_weibo_letter_event partition(dt=$dayStr ) ”
+ “select msg_id, order_id from tem_table_weibo_letter_event
)
}

if(time.milliseconds % (1000 * 60 * 10) == 0) {
//每10分钟的时候打标记
MakeFlag.makeFlag(“tem_table_weibo_letter_event”, dayStr, hourStr, minuteStr)
}

 

}// end foreachRdd

 

ssc.start()
ssc.awaitTermination()

 

}//end main

 

 

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _

def getInstance(sparkConf: SparkConf): SparkSession = {
val user = “suda”
val warehouseLocation = “/user/” + user + “/spark-warehouse”
val scratchdir = “/user/” + user + “/hive-” + user
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.config(“spark.sql.warehouse.dir”, warehouseLocation)
.config(“hive.exec.scratchdir”, scratchdir)
.enableHiveSupport()
.getOrCreate()
}
instance
}
}

 

}//end object

 

 

 

方法二,即使repartition(1) 也会产生分区个数个文件,主要原因在于group by语句

 

package com.suda

import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
* Created by wangting1 on 2018/1/3.
*
*/
object WeiboLetterEventMain {
val log : Logger = Logger.getLogger(WeiboLetterEventMain.getClass)

def main(args: Array[String]) : Unit = {

val topicSet=Set(“xx”)
val group=”xx_group1″
val brokers=”xx”
val kafkaParams = Map(“metadata.broker.list” -> brokers,
“group.id” -> group)
val checkpointDirectory = “/user/suda/temp/checkpoint/realtime_weibo_letter_event”
val sparkConf = new SparkConf().setAppName(“realtime_weibo_letter_event”)
val ssc = new StreamingContext(sparkConf, Seconds(600))//10分钟一个批次
ssc.checkpoint(checkpointDirectory)

 

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

messages.foreachRDD{(rdd, time:Time) =>

val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
import spark.sql
val millis = DateUtils.getFloorStamp(time.milliseconds – 1000, 10)
//避免整点批次时间的数据到下一个批次中
val dayStr = DateUtils.getDay(millis)
val hourStr = DateUtils.formatHH(millis)
val minuteStr = DateUtils.formatMm(millis)

val rdd01=rdd.map(_._2).map(x => x.split(“\t”, -1)).filter(x => x.length >= 10 && x(2) == “2028810631” && x(9).contains(“order_id”))
.map{x =>
try{
val temp = x(9).split(‘,’)
val msgid = temp(1).split(“=>”)(1)
val order_id = temp(2).split(“=>”)(1)
(msgid, order_id)
}catch {
case e: Exception => {
log.info(“WeiboLetterEventMain error”)
e.printStackTrace()
null
}
}
}.filter(null!=_)

 

if (!rdd01.isEmpty()){
rdd01.toDF(“msg_id”,”order_id”).repartition(1).createOrReplaceTempView(“tem_table_weibo_letter_event”)

sql(
s”insert into table mds_weibo_letter_event partition(dt=$dayStr ) ”
+ “select msg_id, order_id from tem_table_weibo_letter_event group by msg_id,order_id
)
}

if(time.milliseconds % (1000 * 60 * 10) == 0) {
//每10分钟的时候打标记
MakeFlag.makeFlag(“tem_table_weibo_letter_event”, dayStr, hourStr, minuteStr)
}

 

}// end foreachRdd

 

ssc.start()
ssc.awaitTermination()

 

}//end main

 

 

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _

def getInstance(sparkConf: SparkConf): SparkSession = {
val user = “suda”
val warehouseLocation = “/user/” + user + “/spark-warehouse”
val scratchdir = “/user/” + user + “/hive-” + user
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.config(“spark.sql.warehouse.dir”, warehouseLocation)
.config(“hive.exec.scratchdir”, scratchdir)
.enableHiveSupport()
.getOrCreate()
}
instance
}
}

 

}//end object

 


喜欢 (0)