• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

离线数仓(四)

开发技术 开发技术 3小时前 3次浏览

5章 数仓搭建-DIM

5.1 商品维度表(全量)

  1.建表语句

离线数仓(四)

drop table if exists `dim_sku_info`;
create external table dim_sku_info (
    --来自ods_sku_info
    `id` string comment '商品id',
    `price` decimal(16,2) comment '商品价格',
    `sku_name` string comment '商品名称',
    `sku_desc` string comment '商品描述',
    `weight` decimal(16,2) comment '重量',
    `is_sale` boolean comment '是否在售',
    `spu_id` string comment 'spu编号',
    `category3_id` string comment '三级分类id',
    `tm_id` string comment '品牌id',
    `create_time` string comment '创建时间',
     --来自ods_spu_info
    `spu_name` string comment 'spu名称',
    --来自ods_base_category3
    `category3_name` string comment '三级分类名称',
    `category2_id` string comment '二级分类id',
    --来自ods_base_category2
    `category2_name` string comment '二级分类名称',
    `category1_id` string comment '一级分类id',
    --ods_base_category1
    `category1_name` string comment '一级分类名称',
    --来自ods_base_trademark
    `tm_name` string comment '品牌名称',
    --来自ods_sku_attr_value
    `sku_attr_values` ARRAY<STRUCT<attr_id:STRING,value_id:STRING,attr_name:STRING,value_name:STRING>> comment '平台属性',
    --来自ods_sku_sale_attr_value
    `sku_sale_attr_values` ARRAY<STRUCT<sale_attr_id:STRING,sale_attr_value_id:STRING,sale_attr_name:STRING,sale_attr_value_name:STRING>> comment '销售属性'
) comment '商品维度表'
partitioned by (`dt` string)
stored as parquet   --指定文件格式parquet
location '/warehouse/gmall/dim/dim_sku_info/'   --指定HDFS数据存储路径
tblproperties ("parquet.compression"="lzo");    --指定压缩格式为lzo,但文件后缀为parquet

  2.数据装载

    1)Hive读取索引文件问题

      (1)两种方式,分别查询数据有多少行

select * from ods_log;
Time taken: 1.285 seconds, Fetched: 8838 row(s)
select count(*) from ods_log;

离线数仓(四)

      (2)两次查询结果不一致

        原因是select * from ods_log不执行MR操作,直接采用的是ods_log建表语句中指定的DeprecatedLzoTextInputFormat,能够识别lzo.index为索引文件。

        select count(*) from ods_log执行MR操作,会先经过hive.input.format,其默认值为CombineHiveInputFormat,其会先将索引文件当成小文件合并,将其当做普通文件处理。更严重的是,这会导致LZO文件无法切片。

set hive.input.format;

离线数仓(四)

        解决办法:修改CombineHiveInputFormat为HiveInputFormat

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

      (3)再次查询

select count(*) from ods_log;

离线数仓(四)

    2)首日装载

with sku_info as (
    select id sku_id,
           price,
           sku_name,
           sku_desc,
           weight,
           is_sale,
           spu_id,
           category3_id,
           tm_id,
           create_time
    from ods_sku_info
    where dt = '2021-06-08'
),
     spu_info as (
         select id spu_id, spu_name
         from ods_spu_info
         where dt = '2021-06-08'
     ),
     base_category3 as (
         select id category3_id, name category3_name, category2_id
         from ods_base_category3
         where dt = '2021-06-08'
     ),
     base_category2 as (
         select id category2_id, name category2_name, category1_id
         from ods_base_category2
         where dt = '2021-06-08'
     ),
     base_category1 as (
         select id category1_id, name category1_name
         from ods_base_category1
         where dt = '2021-06-08'
     ),
     base_trademark as (
         select id tm_id, tm_name
         from ods_base_trademark
         where dt = '2021-06-08'
     ),
     sku_attr_info as (
        select sku_id,collect_list(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,
            'value_name',value_name)) sku_attr_values
        from ods_sku_attr_value
        where dt = '2021-06-08'
        group by sku_id
     ),
     sku_sale_attr_info as (
         select sku_id,collect_list(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,
             'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sku_sale_attr_values
         from ods_sku_sale_attr_value
         where dt = '2021-06-08'
         group by sku_id
     )
insert overwrite table dim_sku_info partition (dt = '2021-06-08')
select sku_info.sku_id,
       price,
       sku_name,
       sku_desc,
       weight,
       is_sale,
       sku_info.spu_id,
       sku_info.category3_id,
       sku_info.tm_id,
       create_time,
       spu_name,
       category3_name,
       base_category3.category2_id,
       category2_name,
       base_category2.category1_id,
       category1_name,
       tm_name,
       sku_attr_values,
       sku_sale_attr_values
from sku_info left join spu_info on spu_info.spu_id = sku_info.spu_id
left join base_category3 on base_category3.category3_id = sku_info.category3_id
left join base_category2 on base_category2.category1_id = base_category3.category2_id
left join base_category1 on base_category1.category1_id = base_category2.category1_id
left join base_trademark on base_trademark.tm_id = sku_info.tm_id
left join sku_attr_info on sku_attr_info.sku_id = sku_info.sku_id
left join sku_sale_attr_info on sku_sale_attr_info.sku_id = sku_info.sku_id;

    3)每日装载(和首日处理一样,只需要修改分区字段日期)

5.2 优惠券维度表(全量)

  1.建表语句

离线数仓(四)

drop table if exists dim_coupon_info;
create external table dim_coupon_info(
    --来自ods_coupon_info
    `id` string comment '购物券编号',
    `coupon_name` string comment '购物券名称',
    `coupon_type` string comment '购物券类型 1 现金券 2 折扣券 3 满减券 4 满件打折券',
    `condition_amount` decimal(16,2) comment '满额数',
    `condition_num` bigint comment '满件数',
    `activity_id` string comment '活动编号',
    `benefit_amount` decimal(16,2) comment '减金额',
    `benefit_discount` decimal(16,2) comment '折扣',
    `create_time` string comment '创建时间',
    `range_type` string comment '范围类型 1、商品 2、品类 3、品牌',
    `limit_num` bigint comment '最多领取次数',
    `taken_count` bigint comment '已领取次数',
    `start_time` string comment '可以领取的开始日期',
    `end_time` string comment '可以领取的结束日期',
    `operate_time` string comment '修改时间',
    `expire_time` string comment '过期时间'
) comment '优惠券维度表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_coupon_info/'
tblproperties ("parquet.compression"="lzo");

  2.数据装载

    1)首日装载

#设置动态分区
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table dim_coupon_info partition (dt)
select id,
       coupon_name,
       coupon_type,
       condition_amount,
       condition_num,
       activity_id,
       benefit_amount,
       benefit_discount,
       create_time,
       range_type,
       limit_num,
       taken_count,
       start_time,
       end_time,
       operate_time,
       expire_time,
       dt
from ods_coupon_info
where dt = '2021-06-08';

    2)每日装载(和首日导入相同,只需要修改分区字段时间)

    3)静态分区 和 动态分区 作用相同,都是为了向分区表导入数据

      静态分区: 分区字段是固定的,在 partition (dt=分区字段),只能将数据写入到同一个分区!

      动态分区: 查询的数据,需要插入到不同的分区,分区字段的值由select语句的最后一列的值决定,partition (dt),既可以将数据写入到同一个分区,也可以写入到不同的分区

5.3 活动维度表(全量)

  1.建表语句

离线数仓(四)

drop table if exists dim_activity_rule_info;
create external table dim_activity_rule_info(
    --来自ods_activity_rule
    `activity_rule_id` string comment '活动规则ID',
    `activity_id` string comment '活动ID',
    `condition_amount` decimal(16,2) comment '满减金额',
    `condition_num` bigint comment '满减件数',
    `benefit_amount` decimal(16,2) comment '优惠金额',
    `benefit_discount` decimal(16,2) comment '优惠折扣',
    `benefit_level` string comment '优惠级别',
    --来自ods_activity_info
    `activity_name` string comment '活动名称',
    `activity_type` string comment '活动类型',
    `start_time` string comment '开始时间',
    `end_time` string comment '结束时间',
    `create_time` string comment '创建时间'
) comment '活动信息表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_activity_rule_info/'
tblproperties ("parquet.compression"="lzo");

  2.数据装载

    1)首日装载

insert overwrite table dim_activity_rule_info partition (dt = '2021-06-08')
select
    t1.id activity_rule_id,
        t1.activity_id,
        t1.condition_amount,
        t1.condition_num,
        t1.benefit_amount,
        t1.benefit_discount,
        t1.benefit_level,
        t2.activity_name,
        t2.activity_type,
        t2.start_time,
        t2.end_time,
        t2.create_time
from
(
    select id,
           activity_id,
           activity_type,
           condition_amount,
           condition_num,
           benefit_amount,
           benefit_discount,
           benefit_level
    from ods_activity_rule
    where dt = '2021-06-08'
) t1
left join
(
    select id, activity_name, activity_type, start_time, end_time, create_time
    from ods_activity_info
    where dt = '2021-06-08'
) t2
on t1.activity_id = t2.id;

    2)每日转载(同首日导入,修改分区字段值)

5.4 地区维度表(特殊)

  1.建表语句

离线数仓(四)

drop table if exists dim_base_province;
create external table dim_base_province (
    --来自ods_base_province
    `id` string comment 'id',
    `province_name` string comment '省市名称',
    `area_code` string comment '地区编码',
    `iso_code` string comment 'ISO-3166编码,供可视化使用',
    `iso_3166_2` string comment 'IOS-3166-2编码,供可视化使用',
    `region_id` string comment '地区id',
    --来自ods_base_region
    `region_name` string comment '地区名称'
) comment '地区维度表'
stored as parquet
location '/warehouse/gmall/dim/dim_base_province/'
tblproperties("parquet.compression"="lzo");

  2.数据装载

    地区维度表数据相对稳定,变化概率较低,故无需每日装载

insert overwrite table dim_base_province
select pro.id,
       name,
       area_code,
       iso_code,
       iso_3166_2,
       region_id,
       region_name
from ods_base_province pro
join ods_base_region reg on pro.region_id = reg.id;

5.5 时间维度表(特殊)

  1.建表语句

drop table if exists dim_date_info;
create external table dim_date_info(
    `date_id` string comment '',
    `week_id` string comment '周ID',
    `week_day` string comment '周几',
    `day` string comment '每月的第几天',
    `month` string comment '第几月',
    `quarter` string comment '第几季度',
    `year` string comment '',
    `is_workday` string comment '是否是工作日',
    `holiday_id` string comment '节假日'
) comment '时间维度表'
stored as parquet
location '/warehouse/gmall/dim/dim_date_info/'
tblproperties("parquet.compression"="lzo");

  2.数据装载

    通常情况下,时间维度表的数据并不是来自于业务系统,而是手动写入,并且由于时间维度表数据的可预见性,无须每日导入,一般可一次性导入一年的数据

    1)创建临时表

CREATE EXTERNAL TABLE dim_date_info_tmp(
    `date_id` STRING COMMENT '',
    `week_id` STRING COMMENT '周ID',
    `week_day` STRING COMMENT '周几',
    `day` STRING COMMENT '每月的第几天',
    `month` STRING COMMENT '第几月',
    `quarter` STRING COMMENT '第几季度',
    `year` STRING COMMENT '',
    `is_workday` STRING COMMENT '是否是工作日',
    `holiday_id` STRING COMMENT '节假日'
) COMMENT '时间维度表'
    row format delimited
    fields terminated by 't'
LOCATION '/warehouse/gmall/dim/dim_date_info_tmp/';

    2)将数据文件上传到HFDS上临时表指定路径/warehouse/gmall/dim/dim_date_info_tmp/

离线数仓(四)

    3)执行以下语句将其导入时间维度表

insert overwrite table dim_date_info select * from dim_date_info_tmp;

    4)检查数据是否导入成功

select * from dim_date_info;

离线数仓(四)

5.6 用户维度表(拉链表)

5.6.1 拉链表概述

  1)什么是拉链表

    拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前日期放入生效开始日期

    若当前信息至今有效,在生效结束日期中填入一个极大值(如:9999-99-99)

离线数仓(四)

  2)为什么要做拉链表

    拉链表适合于数据会发生变化,但是变化频率并不是很高的维度,即缓慢变化维,比如:用户信息会发生变化,但是每天变化的比例不高,若数据量有一定规模,按照每日全量的方式保存,效率很低,例如:一亿用户 * 365天,每天一份用户数据信息,这样子每日全量效率很低

离线数仓(四)

  3)如何使用拉链表

    通过生效开始日期<=某个日期且生效结束日期>=某个日期,能够得到某个时间点的数据全量切片

    (1)拉链表数据

离线数仓(四)

    (2)获取2019-01-01的历史切片:select * from user_info where start_date <= ‘2019-01-01’ and end_date >= ‘2019-01-01’;

离线数仓(四)

    (3)获取2019-01-02的历史切片:select * from user_info where start_date <= ‘2019-01-02’ and end_date >= ‘2019-01-02’;

离线数仓(四)

  4)拉链表形成过程

离线数仓(四)

5.6.2 制作拉链表

  1.建表语句

离线数仓(四)

drop table if exists dim_user_info;
create external table dim_user_info(
    --来自ods_user_info
    `id` string comment '用户id',
    `login_name` string comment '用户名称',
    `nick_name` string comment '用户昵称',
    `name` string comment '用户姓名',
    `phone_num` string comment '手机号码',
    `email` string comment '邮箱',
    `user_level` string comment '用户等级',
    `birthday` string comment '生日',
    `gender` string comment '性别',
    `create_time` string comment '创建时间',
    `operate_time` string comment '操作时间',
    --自己创建的字段
    `start_date` string comment '开始日期',
    `end_date` string comment '结束日期'
) comment '用户表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dim/dim_user_info/'
tblproperties("parquet.compression"="lzo");

  2.数据装载

离线数仓(四)

    1)首日装载

      拉链表首日装载,需要进行初始化操作,具体工作为将截止到初始化当日的全部历史用户一次性导入到拉链表中,目前的ods_order_info表的第一个分区,即2021-06-08分区中就是全部的历史用户,故将该分区数据进行一定处理后导入拉链表的9999-99-99分区即可

insert overwrite table dim_user_info partition (dt='9999-99-99')
select id,
login_name,
nick_name,
name,
phone_num,
email,
user_level,
birthday,
gender,
create_time,
operate_time,
dt,
'9999-99-99' end_date
from ods_user_info
where dt = '2021-06-08';

    2)每日更新

insert overwrite table dim_user_info partition(dt)
select
    --优先取ods表中的数据,如果ods表中的字段为null,取dim表中的数据
    if(ods_data.id is not null, ods_data.id , dim_data.id) id,
    if(ods_data.id is not null, ods_data.login_name , dim_data.login_name) login_name,
    if(ods_data.id is not null, ods_data.nick_name , dim_data.nick_name) nick_name,
    if(ods_data.id is not null, ods_data.name , dim_data.name) name,
    if(ods_data.id is not null, ods_data.phone_num , dim_data.phone_num) phone_num,
    if(ods_data.id is not null, ods_data.email , dim_data.email) email,
    if(ods_data.id is not null, ods_data.user_level , dim_data.user_level) user_level,
    if(ods_data.id is not null, ods_data.birthday , dim_data.birthday) birthday,
    if(ods_data.id is not null, ods_data.gender , dim_data.gender) gender,
    if(ods_data.id is not null, ods_data.create_time , dim_data.create_time) create_time,
    if(ods_data.id is not null, ods_data.operate_time , dim_data.operate_time) operate_time,
    if(ods_data.id is not null, ods_data.dt , dim_data.start_date) start_date,
    if(ods_data.id is not null, '9999-99-99' , dim_data.end_date) end_date,
      '9999-99-99' dt
from
(
    select *
    from dim_user_info
    where dt = '9999-99-99'
) dim_data
full join
(
    select *
    from ods_user_info
    --存放的是 2021-06-09当天mysql中user_info表新增和变化的数据
    where dt='2021-06-09'
) ods_data on dim_data.id = ods_data.id
union all
select
    dim_data.id,
    dim_data.login_name,
    dim_data.nick_name,
    dim_data.name,
    dim_data.phone_num,
    dim_data.email,
    dim_data.user_level,
    dim_data.birthday,
    dim_data.gender,
    dim_data.create_time,
    dim_data.operate_time,
    dim_data.start_date,
    --修改end_date
    cast(date_sub(ods_data.dt,1) as string) end_date ,
    cast(date_sub(ods_data.dt,1) as string) dt
from
(
    select *
    from dim_user_info
    where dt = '9999-99-99'
) dim_data
join
(
    select *
    from ods_user_info
    --存放的是 2021-06-09当天mysql中user_info表新增和变化的数据
    where dt='2021-06-09'
) ods_data
on dim_data.id = ods_data.id;

5.7 DIM层首日数据装载脚本

  1)编写脚本

    (1)在/home/atguigu/bin目录下创建脚本ods_to_dim_db_init.sh

vim ods_to_dim_db_init.sh

    (2)增加执行权限

chmod +x ods_to_dim_db_init.sh

  2)脚本使用

    (1)执行脚本(该脚本不包含时间维度表的装载,时间维度表需手动装载数据,参考5.5

ods_to_dim_db_init.sh all 2021-06-08

    (2)查看数据是否导入成功

5.8 DIM层每日数据装载脚本

  1)编写脚本

    (1)在/home/atguigu/bin目录下创建脚本ods_to_dim_db.sh

vim ods_to_dim_db.sh
#!/bin/bash

APP=gmall

if [ -n "$2" ]; then
    do_date=$2
else
    echo "请传入日期参数"
    exit
fi

dim_user_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_user_info partition(dt = '9999-99-99')
select
    id,
    login_name,
    nick_name,
    md5(name),
    md5(phone_num),
    md5(email),
    user_level,
    birthday,
    gender,
    create_time,
    operate_time,
    '$do_date',
    '9999-99-99'
from ${APP}.ods_user_info
where dt = '$do_date';"

dim_sku_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
with sku_info as (
    select id sku_id,
           price,
           sku_name,
           sku_desc,
           weight,
           is_sale,
           spu_id,
           category3_id,
           tm_id,
           create_time
    from ${APP}.ods_sku_info
    where dt = '$do_date'
),
     spu_info as (
         select id spu_id, spu_name
         from ${APP}.ods_spu_info
         where dt = '$do_date'
     ),
     base_category3 as (
         select id category3_id, name category3_name, category2_id
         from ${APP}.ods_base_category3
         where dt = '$do_date'
     ),
     base_category2 as (
         select id category2_id, name category2_name, category1_id
         from ${APP}.ods_base_category2
         where dt = '$do_date'
     ),
     base_category1 as (
         select id category1_id, name category1_name
         from ${APP}.ods_base_category1
         where dt = '$do_date'
     ),
     base_trademark as (
         select id tm_id, tm_name
         from ${APP}.ods_base_trademark
         where dt = '$do_date'
     ),
     sku_attr_info as (
        select sku_id,
            collect_list(named_struct('attr_id',attr_id,'value_id',value_id,'attr_name',attr_name,'value_name',value_name)) sku_attr_values
        from ${APP}.ods_sku_attr_value
        where dt = '$do_date'
        group by sku_id
     ),
     sku_sale_attr_info as (
         select sku_id,
             collect_list(named_struct('sale_attr_id',sale_attr_id,'sale_attr_value_id',sale_attr_value_id,
            'sale_attr_name',sale_attr_name,'sale_attr_value_name',sale_attr_value_name)) sku_sale_attr_values
         from ${APP}.ods_sku_sale_attr_value
         where dt = '$do_date'
         group by sku_id
     )
insert overwrite table ${APP}.dim_sku_info partition (dt = '$do_date')
select sku_info.sku_id,
       sku_info.price,
       sku_info.sku_name,
       sku_info.sku_desc,
       sku_info.weight,
       sku_info.is_sale,
       sku_info.spu_id,
       sku_info.category3_id,
       sku_info.tm_id,
       sku_info.create_time,
       spu_info.spu_name,
       base_category3.category3_name,
       base_category3.category2_id,
       base_category2.category2_name,
       base_category2.category1_id,
       base_category1.category1_name,
       base_trademark.tm_name,
       sku_attr_info.sku_attr_values,
       sku_sale_attr_info.sku_sale_attr_values
from sku_info left join spu_info on spu_info.spu_id = sku_info.spu_id
left join base_category3 on base_category3.category3_id = sku_info.category3_id
left join base_category2 on base_category2.category1_id = base_category3.category2_id
left join base_category1 on base_category1.category1_id = base_category2.category1_id
left join base_trademark on base_trademark.tm_id = sku_info.tm_id
left join sku_attr_info on sku_attr_info.sku_id = sku_info.sku_id
left join sku_sale_attr_info on sku_sale_attr_info.sku_id = sku_info.sku_id;
"

dim_base_province="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_base_province
select pro.id,
       pro.name,
       pro.area_code,
       pro.iso_code,
       pro.iso_3166_2,
       pro.region_id,
       reg.region_name
from ${APP}.ods_base_province pro
join ${APP}.ods_base_region reg 
on pro.region_id = reg.id;
"

dim_coupon_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dim_coupon_info partition (dt)
select id,
       coupon_name,
       coupon_type,
       condition_amount,
       condition_num,
       activity_id,
       benefit_amount,
       benefit_discount,
       create_time,
       range_type,
       limit_num,
       taken_count,
       start_time,
       end_time,
       operate_time,
       expire_time,
       dt
from ${APP}.ods_coupon_info
where dt = '$do_date';
"

dim_activity_rule_info="set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dim_activity_rule_info partition (dt = '$do_date')
select
    t1.id activity_rule_id,
        t1.activity_id,
        t1.condition_amount,
        t1.condition_num,
        t1.benefit_amount,
        t1.benefit_discount,
        t1.benefit_level,
        t2.activity_name,
        t2.activity_type,
        t2.start_time,
        t2.end_time,
        t2.create_time
from
(
    select id,
           activity_id,
           condition_amount,
           condition_num,
           benefit_amount,
           benefit_discount,
           benefit_level
    from ${APP}.ods_activity_rule
    where dt = '$do_date'
) t1
left join
(
    select id, activity_name, activity_type, start_time, end_time, create_time,dt
    from ${APP}.ods_activity_info
    where dt = '$do_date'
) t2
on t1.activity_id = t2.id;"

case $1 in
"dim_user_info"){
    hive -e "$dim_user_info"
};;
"dim_sku_info"){
    hive -e "$dim_sku_info"
};;
"dim_base_province"){
    hive -e "$dim_base_province"
};;
"dim_coupon_info"){
    hive -e "$dim_coupon_info"
};;
"dim_activity_rule_info"){
    hive -e "$dim_activity_rule_info"
};;
"all"){
    hive -e "$dim_user_info$dim_sku_info$dim_coupon_info$dim_activity_rule_info$dim_base_province"
};;
esac

    (2)增加执行权限

chmod +x ods_to_dim_db.sh

  2)脚本使用

    (1)执行脚本

ods_to_dim_db.sh all 2021-06-09

    (2)查看数据是否导入成功


程序员灯塔
转载请注明原文链接:离线数仓(四)
喜欢 (0)