旅游集市数仓建设
小白如何从0到1成为大数据工程师
目录
旅游集市数仓建设
1.上传数据
2.可能用到的UDF函数
3.创建所需数据库及表
1)ODS层
①ods_oidd
②ods_wcdr
③ods_ddr
④ods_dpi
2)DWD层
①dwd_res_regn_mergelocation_msk_d
②dwm_staypoint_msk_d
③dws_province_tourist_msk_d
④dws_city_tourist_msk_d
⑤dws_county_tourist_msk_d
3)DIM层
①dim_usertag_msk_m
4)ADS层
1)需求矩阵
2)根据区县游客表计算如下指标
1.上传数据
cd /usr/local/soft/
mkdir ctyun/
cd ctyun/
pwd
2.可能用到的UDF函数
cd /usr/local/soft/
mkdir jars/
cd jars/
pwd

添加资源并注册函数
add jars /usr/local/soft/jars/jtxy_hdfs-1.0-SNAPSHOT.jar;
create temporary function get_points as 'ctyun.udf.getPointsUDF';
create temporary function dateBetweenUDF as 'ctyun.udf.dateBetweenUDF';
create temporary function calLength as 'ctyun.udf.calLength';
create temporary function get_city_or_prov_id as 'ctyun.udf.getCityIdOrProvID';
3.创建所需数据库及表
create database ods;
use ods;
1)ODS层
①ods_oidd
OIDD是采集A接口的信令数据,包括手机在发生业务时的位置信息。OIDD信令类型数据分为三大 类,呼叫记录、短信记录和用户位置更新记录。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_oidd(
 mdn string comment '手机号码'  
,start_time string comment '业务开始时间'  
,county_id string comment '区县编码'  
,longi string comment '经度'  
,lati string comment '纬度'  
,bsid string comment '基站标识'  
,grid_id string comment '网格号'  
,biz_type string comment '业务类型'  
,event_type string comment '事件类型'  
,data_source string comment '数据源'  
) 
comment  'oidd位置数据表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
location '/data/tour/ods/ods_oidd'; 
// 添加分区
alter table ods.ods_oidd add partition(day_id=20180503);
 // 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_oidd/day_id=20180503/*' into table ods_oidd partition(day_id=20180503);//查看数据
select * from  ods.ods_oidd limit 10;
dfs -mkdir -p /data/tour/ods/ods_oidd;
dfs -ls /data/tour/ods/ods_oidd;
dfs -ls /data/tour/ods/;
dfs -rmr /data/tour/ods/ods_oidd;

②ods_wcdr
WCDR采集网络中ABIS接口的数据,基于业务发生过程中三个扇区的测量信息,通过三角定位法 确定用户的位置信息。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_wcdr (
 mdn string comment '手机号码'  
,start_time string comment '业务开始时间'  
,county_id string comment '区县编码'  
,longi string comment '经度'  
,lati string comment '纬度'  
,bsid string comment '基站标识'  
,grid_id string comment '网格号'  
,biz_type string comment '业务类型'  
,event_type string comment '事件类型'  
,data_source string comment '数据源'  
) 
comment  'wcdr位置数据表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
location '/data/tour/ods/ods_wcdr'; 
// 添加分区
alter table ods.ods_wcdr add partition(day_id=20180503);
 // 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_wcdr/day_id=20180503/*' into 
table ods_wcdr partition(day_id=20180503);//查看数据
select * from ods.ods_wcdr limit 10;

③ods_ddr
当前DDR中只有移动数据详单可以提取基站标识,其他语音,短信,增值等业务没有位置信息, 不做为数据融合的基础数据。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_ddr(
 mdn string comment '手机号码'  
,start_time string comment '业务开始时间'  
,county_id string comment '区县编码'  
,longi string comment '经度'  
,lati string comment '纬度'  
,bsid string comment '基站标识'  
,grid_id string comment '网格号'  
,biz_type string comment '业务类型'  
,event_type string comment '事件类型'  
,data_source string comment '数据源'  
) 
comment  'ddr位置数据表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
location '/data/tour/ods/ods_ddr'; 
// 添加分区
alter table ods.ods_ddr add partition(day_id=20180503);
 // 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_ddr/day_id=20180503/*' into 
table ods_ddr partition(day_id=20180503);// 查询数据
select * from ods.ods_ddr limit 10;
④ods_dpi
移动DPI数据采集用户移动用户数据上网时移动核心网和PDSN之间接口的数据。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_dpi(
 mdn string comment '手机号码'  
,start_time string comment '业务开始时间'  
,county_id string comment '区县编码'  
,longi string comment '经度'  
,lati string comment '纬度'  
,bsid string comment '基站标识'  
,grid_id string comment '网格号'  
,biz_type string comment '业务类型'  
,event_type string comment '事件类型'  
,data_source string comment '数据源'  
) 
comment  'dpi位置数据表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  
location '/data/tour/ods/ods_dpi'; 
// 添加分区
alter table ods.ods_dpi add partition(day_id=20180503);
 // 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_dpi/day_id=20180503/*' into 
table ods_dpi partition(day_id=20180503);// 查询数据
select * from ods.ods_dpi limit 10;
2)DWD层
create database dwd;
use dwd;
①dwd_res_regn_mergelocation_msk_d
在ODS层中,由于数据来源不同,原始位置数据被分成了好几张表加载到了我们的ODS层。 为了方便大家的使用,我们在DWD层做了一张位置数据融合表,在这里,我们将oidd、wcdr、 ddr、dpi位置数据汇聚到一张表里面,统一字段名,提升数据质量,这样就有了一张可供大家方 便使用的明细表了。
CREATE EXTERNAL TABLE IF NOT EXISTS dwd.dwd_res_regn_mergelocation_msk_d (
 mdn string comment '手机号码'  
,start_time string comment '业务开始时间'  
,county_id string comment '区县编码'  
,longi string comment '经度'  
,lati string comment '纬度'  
,bsid string comment '基站标识'  
,grid_id string comment '网格号'  
,biz_type string comment '业务类型'  
,event_type string comment '事件类型'  
,data_source string comment '数据源'  
) 
comment  '位置数据融合表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS ORCFile
 location '/data/tour/dwd/dwd_res_regn_mergelocation_msk_d'; 
// 添加分区
alter table dwd.dwd_res_regn_mergelocation_msk_d add partition(day_id=20180503);手动下载数据
// hive 直接运行速度太慢,可用手动load/put文件方式
// 手动load
// 注意 上面的建表语句文件存储格式修改为了ORCFile 所以不能直接使用下面的load方法
 load data local inpath '/usr/local/soft/ctyun/dwd_merge/part-00000*' into 
table dwd.dwd_res_regn_mergelocation_msk_d partition(day_id=20180503);
 // union all
 insert into table dwd.dwd_res_regn_mergelocation_msk_d 
partition(day_id="20180503")
 select  mdn  
        ,start_time  
        ,county_id  
        ,longi  
        ,lati  
        ,bsid  
        ,grid_id  
        ,biz_type  
        ,event_type  
        ,data_source 
from ods.ods_oidd
 where day_id = "20180503"
 union all
 select  mdn  
        ,start_time  
        ,county_id  
        ,longi  
        ,lati  
        ,bsid  
        ,grid_id  
        ,biz_type  
        ,event_type  
        ,data_source 
from ods.ods_wcdr
 where day_id = "20180503"
 union all
 select  mdn  
        ,start_time  
        ,county_id  
        ,longi  
        ,lati  
        ,bsid  
        ,grid_id  
        ,biz_type  
        ,event_type  
        ,data_source 
from ods.ods_dpi
 where day_id = "20180503"
 union all
 select  mdn  
        ,start_time  
        ,county_id  
        ,longi  
        ,lati  
        ,bsid  
        ,grid_id  
,biz_type  
,event_type  
,data_source 
from ods.ods_ddr
 where day_id = "20180503";

②dwm_staypoint_msk_d
计算一个人在一个网格内的停留时间,按手机号,网格id,区县id分组
1、对所有时间进行排序
2、取第一个点的开始时间和最后一个点的结束时间
create database dwm;
use dwm;CREATE EXTERNAL TABLE IF NOT EXISTS dwm.dwm_staypoint_msk_d (
 mdn string comment '用户手机号码'  
,longi string comment '网格中心点经度'  
,lati string comment '网格中心点纬度'  
,grid_id string comment '停留点所在电信内部网格号'  
,county_id string comment '停留点区县'  
,duration string comment '机主在停留点停留的时间长度(分钟),lTime-eTime'  
,grid_first_time string comment '网格第一个记录位置点时间(秒级)'  
,grid_last_time string comment '网格最后一个记录位置点时间(秒级)'  
) 
comment  '停留点表'
 PARTITIONED BY (
 day_id string comment '天分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE
 location '/data/tour/dwm/dwm_staypoint_msk_d'; 通过grid_id 网格id 获取 网格中心点经纬度 longi、lati
该SQL执行会出现问题: 执行流程一直处于 0% Map  0% reduce
/**
insert into table dwm.dwm_staypoint_msk_d partition(day_id=20180503)
select  t1.mdn
        ,get_points(grid_id)[0] as longi
        ,get_points(grid_id)[1] as lati
        ,t1.grid_id
        ,t1.county_id
        ,dateBetweenUDF(t1.grid_first_time,t1.grid_last_time) as duration
        ,t1.grid_first_time
        ,t1.grid_last_time
from (
    select  mdn
            ,grid_id
            ,county_id
            ,min(split(start_time,',')[0]) as grid_first_time
            ,max(split(start_time,',')[1]) as grid_last_time
    from dwd.dwd_res_regn_mergelocation_msk_d
    where day_id="20180503"
    group by mdn, grid_id, county_id
)t1;
*/
优化后的SQL:
WITH split_table as (
SELECT
mdn
,grid_id
,county_id
,split(start_time,',')[1] as grid_first_time
,split(start_time,',')[0] as grid_last_time
FROM dwd.dwd_res_regn_mergelocation_msk_d
where day_id="20180503"
)
, max_min_table as (
SELECT
mdn
,grid_id
,county_id
,Max(grid_first_time) OVER(PARTITION BY mdn,grid_id,county_id) as grid_first_time
,MIN(grid_last_time) OVER(PARTITION BY mdn,grid_id,county_id) as grid_last_time
FROM split_table 
)
insert into table dwm.dwm_staypoint_msk_d partition(day_id=20180503)
SELECT 
t1.mdn
,get_points(t1.grid_id)[0] as longi
,get_points(t1.grid_id)[1] as lati
,t1.grid_id
,t1.county_id
,dateBetweenUDF(t1.grid_first_time,t1.grid_last_time) as duration
,t1.grid_first_time
,t1.grid_last_time
FROM (
SELECT
mdn
,grid_id
,county_id
,grid_first_time
,grid_last_time
FROM max_min_table
group by 
mdn
,grid_id
,county_id
,grid_first_time
,grid_last_time
) t1

③dws_province_tourist_msk_d
游客定义 出行距离大于300km 常住地在用户画像表中 在省内停留时间大于3个小时
create database dws;
use dws;

CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_province_tourist_msk_d (
 mdn string comment '手机号大写MD5加密'  
,source_county_id string comment '游客来源区县'  
,d_province_id string comment '旅游目的地省代码'  
,d_stay_time double comment '游客在该省停留的时间长度(小时)'  
,d_max_distance double comment '游客本次出游距离'  
) 
comment  '旅游应用专题数据省级别-天'
 PARTITIONED BY (
 day_id string comment '日分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS PARQUET
 location '/data/tour/dws/dws_province_tourist_msk_d';停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,使用 get_city_or_prov_id(county_id,"province")方法,传入county_id,返回province_id,然后按 mdn、province_id、resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居 住地网格id,算出出行距离,并计算每个用户到每个省的累计出行时间,然后取出 累计时间最大 值超过3小时(180分钟),出行距离大于300km的用户
④dws_city_tourist_msk_d
出行距离大于100km 在市内停留时间大于3个小时
CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_city_tourist_msk_d (
 mdn string comment '手机号大写MD5加密'  
,source_county_id string comment '游客来源区县'  
,d_city_id string comment '旅游目的地市代码'  
,d_stay_time double comment '游客在该省市停留的时间长度(小时)'  
,d_max_distance double comment '游客本次出游距离'  
) 
comment  '旅游应用专题数据城市级别-天'
 PARTITIONED BY (
 day_id string comment '日分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS PARQUET
 location '/data/tour/dws/dws_city_tourist_msk_d';停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,使用 get_city_or_prov_id(county_id,"city")方法,传入county_id,返回city_id,然后按mdn、city_id、 resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居住地网格id,算出出行 距离,并计算每个用户到每个市的累计出行时间,然后取出 累计时间最大值超过3小时(180分 钟),出行距离大于100km的用户
⑤dws_county_tourist_msk_d
出行距离大于10km 在县内停留时间大于3个小时
CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_county_tourist_msk_d (
    mdn string comment '手机号大写MD5加密'  
    ,source_county_id string comment '游客来源区县'  
    ,d_county_id string comment '旅游目的地县代码'  
    ,d_stay_time double comment '游客在该县停留的时间长度(小时)'  
    ,d_max_distance double comment '游客本次出游距离'  
) 
comment  '旅游应用专题数据县级别-天'
 PARTITIONED BY (
    day_id string comment '日分区'  
) 
ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY '\t' 
STORED AS PARQUET
 location '/data/tour/dws/dws_county_tourist_msk_d';停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,按mdn、 county_id、resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居住地id, 算出出行距离,并计算每个用户到每个县的累计出行时间,然后取出 累计时间最大值超过3小时 (180分钟),出行距离大于10km的用户
运行SQL时报错 我们要创建一下UDF函数

add jars /usr/local/soft/jars/jtxy_hdfs-1.0-SNAPSHOT.jar;
create temporary function get_points as 'ctyun.udf.getPointsUDF';
create temporary function dateBetweenUDF as 'ctyun.udf.dateBetweenUDF';
create temporary function calLength as 'ctyun.udf.calLength';
create temporary function get_city_or_prov_id as 'ctyun.udf.getCityIdOrProvID';insert into table dws.dws_county_tourist_msk_d partition(day_id="20180503")
 select  ttt1.mdn
        ,ttt1.source_county_id
        ,ttt1.d_county_id
        ,ttt1.d_stay_time
        ,ttt1.d_max_distance
 from(
        select  mdn
                ,resi_county_id as source_county_id
                ,county_id as d_county_id
                ,sum(duration) as d_stay_time
                ,max(calLength(tt1.grid_id,tt1.resi_grid_id)) as d_max_distance
        from(
                select  t1.mdn
                        ,t1.grid_id
                        ,t1.county_id
                        ,t1.duration
                        ,t2.resi_county_id
                        ,t2.resi_grid_id
                from (
                        select  *
                        from dwm.dwm_staypoint_msk_d
where day_id='20180503'
 ) t1 join(
 select *
 from dim.dim_usertag_msk_m
 where month_id='201805'
 ) t2 on t1.mdn = t2.mdn
 ) tt1 group by tt1.mdn,tt1.county_id,tt1.resi_county_id
 )ttt1 where d_stay_time > 180 and d_max_distance > 10000
 ;3)DIM层
create database dim;
use dim;
①dim_usertag_msk_m
CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_usertag_msk_m (
 mdn string comment '手机号大写MD5加密'  
,name string comment '姓名'  
,gender string comment '性别,1男2女'  
,age string comment '年龄'  
,id_number string comment '证件号码'  
,number_attr string comment '号码归属地'  
,trmnl_brand string comment '终端品牌'    
,trmnl_price string comment '终端价格'
 ,packg string comment '套餐'  
,conpot string comment '消费潜力'  
,resi_grid_id string comment '常住地网格'  
,resi_county_id string comment '常住地区县'  
) 
comment  '用户画像表'
 PARTITIONED BY (
 month_id string comment '月分区'  
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t' 
STORED AS PARQUET
 location '/data/tour/dim/dim_usertag_msk_m'; 
// 添加分区
alter table dim.dim_usertag_msk_m add partition(month_id=201805);
 // 加载数据
load data local inpath 
'/usr/local/soft/ctyun/dim_usertag_msk_m/month_id=201805/*' into table 
dim.dim_usertag_msk_m partition(month_id=201805);// 查询数据
select * from dim_usertag_msk_m limit 10;
4)ADS层
根据需求建设
1)需求矩阵


2)根据区县游客表计算如下指标
客流量按天 [区县id,客流量]
select  t1.d_county_id
        ,count(*) as d_county_cnt
from (
    select  d_county_id
    from dws.dws_county_tourist_msk_d
    where t1.day_id="20180503"
) t1 group by t1.d_county_id;性别按天 [区县id,性别,客流量]
select  t1.d_county_id
        ,t2.gender
        ,count(*) as d_county_gender_cnt
from(
    select  mdn
            ,d_county_id
    from dws.dws_county_tourist_msk_d
    where day_id="20180503"
) t1 left join (
    select  mdn
            ,gender
    from dim.dim_usertag_msk_m
    where month_id=20180503
) t2 on t1.mdn = t2.mdn
group by t1.d_county_id,t2.gender;年龄按天 [区县id,年龄,客流量]
 常住地按天 [区县id,常住地市,客流量]
 归属地按天 [区县id,归属地市,客流量]
select  t1.d_county_id
        ,t2.number_attr
        ,count(*) as d_county_number_attr_cnt
from(
    select  mdn
            ,d_county_id
    from dws.dws_county_tourist_msk_d
    where day_id="20180503"
) t1 left join (
    select  mdn
            ,number_attr
    from dim.dim_usertag_msk_m
    where month_id=20180503
) t2 on t1.mdn = t2.mdn
group by t1.d_county_id,t2.number_attr;终端型号按天 [区县id,终端型号,客流量]
 消费等级按天 [区县id,消费等级,客流量]
 停留时长按天 [区县id,停留时长,客流量]



















