程序描述: sparkstreaming读取kafka文件,把需要的数据实时落地
例如只落地去重字段可以计算 实时pvuv,实时累计的pvuv
两种方式, 第一种每个批次落地只产生一个文件
第二种每个批次落地产生partitions个数个文件, 区别只在标红的地方
方式一, 每10分钟一个批次,只会产生一个文件
package com.suda
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* Created by wangting1 on 2018/1/3.
*
*/
object WeiboLetterEventMain {
val log : Logger = Logger.getLogger(WeiboLetterEventMain.getClass)
def main(args: Array[String]) : Unit = {
val topicSet=Set(“xx”)
val group=”xx_group1″
val brokers=”xx”
val kafkaParams = Map(“metadata.broker.list” -> brokers,
“group.id” -> group)
val checkpointDirectory = “/user/suda/temp/checkpoint/realtime_weibo_letter_event”
val sparkConf = new SparkConf().setAppName(“realtime_weibo_letter_event”)
val ssc = new StreamingContext(sparkConf, Seconds(600))//10分钟
ssc.checkpoint(checkpointDirectory)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.foreachRDD{(rdd, time:Time) =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
import spark.sql
val millis = DateUtils.getFloorStamp(time.milliseconds – 1000, 10)
//避免整点批次时间的数据到下一个批次中
val dayStr = DateUtils.getDay(millis)
val hourStr = DateUtils.formatHH(millis)
val minuteStr = DateUtils.formatMm(millis)
val rdd01=rdd.map(_._2).map(x => x.split(“\t”, -1)).filter(x => x.length >= 10 && x(2) == “2028810631” && x(9).contains(“order_id”))
.map{x =>
try{
val temp = x(9).split(‘,’)
val msgid = temp(1).split(“=>”)(1)
val order_id = temp(2).split(“=>”)(1)
(msgid, order_id)
}catch {
case e: Exception => {
log.info(“WeiboLetterEventMain error”)
e.printStackTrace()
null
}
}
}.filter(null!=_).distinct()
if (!rdd01.isEmpty()){
rdd01.toDF(“msg_id”,”order_id”).repartition(1).createOrReplaceTempView(“tem_table_weibo_letter_event”)
sql(
s”insert into table mds_weibo_letter_event partition(dt=$dayStr ) ”
+ “select msg_id, order_id from tem_table_weibo_letter_event “
)
}
if(time.milliseconds % (1000 * 60 * 10) == 0) {
//每10分钟的时候打标记
MakeFlag.makeFlag(“tem_table_weibo_letter_event”, dayStr, hourStr, minuteStr)
}
}// end foreachRdd
ssc.start()
ssc.awaitTermination()
}//end main
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
val user = “suda”
val warehouseLocation = “/user/” + user + “/spark-warehouse”
val scratchdir = “/user/” + user + “/hive-” + user
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.config(“spark.sql.warehouse.dir”, warehouseLocation)
.config(“hive.exec.scratchdir”, scratchdir)
.enableHiveSupport()
.getOrCreate()
}
instance
}
}
}//end object
方法二,即使repartition(1) 也会产生分区个数个文件,主要原因在于group by语句
package com.suda
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* Created by wangting1 on 2018/1/3.
*
*/
object WeiboLetterEventMain {
val log : Logger = Logger.getLogger(WeiboLetterEventMain.getClass)
def main(args: Array[String]) : Unit = {
val topicSet=Set(“xx”)
val group=”xx_group1″
val brokers=”xx”
val kafkaParams = Map(“metadata.broker.list” -> brokers,
“group.id” -> group)
val checkpointDirectory = “/user/suda/temp/checkpoint/realtime_weibo_letter_event”
val sparkConf = new SparkConf().setAppName(“realtime_weibo_letter_event”)
val ssc = new StreamingContext(sparkConf, Seconds(600))//10分钟一个批次
ssc.checkpoint(checkpointDirectory)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.foreachRDD{(rdd, time:Time) =>
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
import spark.sql
val millis = DateUtils.getFloorStamp(time.milliseconds – 1000, 10)
//避免整点批次时间的数据到下一个批次中
val dayStr = DateUtils.getDay(millis)
val hourStr = DateUtils.formatHH(millis)
val minuteStr = DateUtils.formatMm(millis)
val rdd01=rdd.map(_._2).map(x => x.split(“\t”, -1)).filter(x => x.length >= 10 && x(2) == “2028810631” && x(9).contains(“order_id”))
.map{x =>
try{
val temp = x(9).split(‘,’)
val msgid = temp(1).split(“=>”)(1)
val order_id = temp(2).split(“=>”)(1)
(msgid, order_id)
}catch {
case e: Exception => {
log.info(“WeiboLetterEventMain error”)
e.printStackTrace()
null
}
}
}.filter(null!=_)
if (!rdd01.isEmpty()){
rdd01.toDF(“msg_id”,”order_id”).repartition(1).createOrReplaceTempView(“tem_table_weibo_letter_event”)
sql(
s”insert into table mds_weibo_letter_event partition(dt=$dayStr ) ”
+ “select msg_id, order_id from tem_table_weibo_letter_event group by msg_id,order_id”
)
}
if(time.milliseconds % (1000 * 60 * 10) == 0) {
//每10分钟的时候打标记
MakeFlag.makeFlag(“tem_table_weibo_letter_event”, dayStr, hourStr, minuteStr)
}
}// end foreachRdd
ssc.start()
ssc.awaitTermination()
}//end main
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
val user = “suda”
val warehouseLocation = “/user/” + user + “/spark-warehouse”
val scratchdir = “/user/” + user + “/hive-” + user
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.config(“spark.sql.warehouse.dir”, warehouseLocation)
.config(“hive.exec.scratchdir”, scratchdir)
.enableHiveSupport()
.getOrCreate()
}
instance
}
}
}//end object