数据仓库建设实例

2018-02-11 14:07:32来源:oschina作者:仔仔1993人点击

分享
注:前提不泄露公司信息
1.维表事实表设计

2.源表

2.1 数据导入

connect="xxx" username="xxx" password="xxxx" ###################################################################################### ########customer--updated_at每小时更新-->default.customer sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table customer / --hive-import / --hive-table default.customer


####创建作业,每天更新捕获变化数据 sqoop job --create customer_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table customer / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.customer / --incremental lastmodified / --check-column updated_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


######################################################################################### #### customer_coupon---> updated_at每小时 --->default.customer_coupon sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table customer_coupon / --hive-import / --hive-table default.customer_coupon


####创建作业,每天更新捕获变化数据 sqoop job --create customer_coupon_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table customer_coupon / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.customer_coupon / --incremental lastmodified / --check-column updated_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


############################################################################################## ## ## order_payment---> created_at每小时 ---> deault.order_payment sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table order_payment / --hive-import / --hive-table default.order_payment


####创建作业,每天更新捕获变化数据 sqoop job --create order_payment_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table order_payment / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.order_payment / --incremental lastmodified / --check-column created_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


################################################################################################# #### taxi_order_payment---> created_at每小时 ---> default.taxi_order_payment sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table taxi_order_payment / --hive-import / --hive-table default.taxi_order_payment


####创建作业,每天更新捕获变化数据 sqoop job --create taxi_order_payment_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table taxi_order_payment / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.taxi_order_payment / --incremental lastmodified / --check-column created_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


############################################################################################ #### ticket_payment---> updated_at每小时 ---> default.ticket_payment sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table ticket_payment / --hive-import / --hive-table default.ticket_payment


####创建作业,每天更新捕获变化数据 sqoop job --create ticket_payment_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table ticket_payment / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.ticket_payment / --incremental lastmodified / --check-column updated_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


########################################################################################### #### buspool_payment--> updated_at每小时 ---> default.buspool_payment sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table buspool_payment / --hive-import / --hive-table default.buspool_payment


####创建作业,每天更新捕获变化数据 sqoop job --create buspool_payment_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table buspool_payment / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.buspool_payment / --incremental lastmodified / --check-column updated_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


############################################################################################## #### coupon--->created_at 每天 --->default.coupon sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table coupon / --hive-import / --hive-table default.coupon


####创建作业,每天更新捕获变化数据 sqoop job --create coupon_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table coupon / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.coupon / --incremental lastmodified / --check-column created_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


################################################################################################## ### giftpack---> created_at 每天 --->default.giftpack sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table giftpack / --hive-import / --hive-table default.giftpack


####创建作业,每天更新捕获变化数据 sqoop job --create giftpack_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table giftpack / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.giftpack / --incremental lastmodified / --check-column created_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id #################################################################################################### #### action_center---> created_at 每天 --->default.action_center


sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table action_center / --hive-import / --hive-table default.action_center


####创建作业,每天更新捕获变化数据 sqoop job --create action_center_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table action_center / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.action_center / --incremental lastmodified / --check-column created_at / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


################################################################################################# #### bus_coupon---> updatedAt 每天 --->default.bus_coupon sqoop import / --connect ${connect} / --username ${username} / --password ${password} / --table bus_coupon / --hive-import / --hive-table default.bus_coupon


####创建作业,每天更新捕获变化数据 sqoop job --create bus_coupon_add_job_zw -- import / --connect ${connect} / --username ${username} / --password ${password} / --table bus_coupon / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --hive-drop-import-delims / --hive-table default.bus_coupon / --incremental lastmodified / --check-column updatedAt / --last-value '2015-01-01 00:00:00' / -m 1 / --merge-key id


3.维度表和事实表

3.1 数据装载

############################################################################################# dw维表/事实表表结构创建 ############################################################################################# #####dim_action create table default.dim_action (id int,citystring ,title string,deleted_atstring ,created_atstring comment '创建时间',updated_atstring comment '更新时间',remark string ) row format delimited fields terminated by '/t' stored as textfile;


INSERT overwrite table default.dim_action select id, title, city, deleted_at, created_at, updated_at, remark from default.action_center;


#####dim_coupon create table default.dim_coupon (id int ,namestring ,citystring ,maxbigint,typeint ,value double ,durationint,enable int ,created_atstring ,updated_atstring,minCostdouble,maxDiscount double ,expired_atstring,scope int ,isPool int,minPay int ,purposeint ,remark string ) row format delimited fields terminated by '/t' stored as textfile;


insert overwrite table default.dim_coupon select id, name, city, max, type, value, duration, enable, created_at, updated_at, minCost, maxDiscount, expired_at, scope, isPool, minPay, purpose, remark from default.coupon;


############dim_customer create table default.dim_customer ( id int,name string,phone int,email string,password int,created_at string,updated_at string,balance int,rating double,rating_total double,rating_count int,city string,alipay_user_id int ) row format delimited fields terminated by '/t' stored as textfile;


INSERT overwrite table default.dim_customer select id, name, phone, email, password, created_at, updated_at, balance, rating, rating_total, rating_count, city, alipay_user_id from default.customer;


##############dim_giftpack create table default.dim_giftpack (id int ,citystring ,namestring,maxbigint,enable int ,typestring,expired_atstring,created_atstring,effected_at string ,updated_atstring ,wayint,recharge_basebigint,recharge_typeint ,remark string ) row format delimited fields terminated by '/t' stored as textfile;


insert overwrite table default.dim_giftpack selectid,city,name,max,enable,type,expired_at,created_at,effected_at,updated_at,way,recharge_base,recharge_type,remark from default.giftpack;


############fct_order_payment_deduction create table fct_order_payment_deduction (coupon_id bigint comment 'id',price double,created_atstring ,status int ) row format delimited fields terminated by '/t' stored as textfile;


INSERT overwrite table fct_order_payment_deduction select r.coupon_id, r.price, r.created_at, r.status from ( SELECT p.info coupon_id,p.price,p.created_at,1 status from default.order_payment p where p.type='coupon' union all SELECT q.info coupon_id,q.price,q.created_at,2 status from default.taxi_order_payment q where q.type='coupon' union all SELECT m.info coupon_id,m.price,m.created_at,3 status from default.ticket_payment m where m.type='coupon' union all SELECT n.info coupon_id,n.price,n.created_at,4 status from default.buspool_payment n where n.type='coupon' ) r ;


################################# create table fct_customer_coupon (id bigint comment '唯一主键',customer_id bigint ,coupon_idbigint ,giftpack_id bigint ,citystring,source string,typeint ,status int,expire_atstring ,created_atstring,updated_atstring ,value double ,source_type string ,source_idbigint ,ispool int ,action_idbigint,balance_detail_idbigint ,remark string, ) row format delimited fields terminated by '/t' stored as textfile;


INSERT overwrite table fct_customer_coupon select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.city, x.source, x.type, x.status, x.expire_at, x.created_at, x.updated_at, x.value, x.source_type, x.source_id, x.ispool, x.action_id, x.balance_detail_id, x.remark from default.customer_coupon x ;


4.数据清洗到报表层

############################################################################################# # default层数据通过HQL洗到rpt层 ############################################################################################# ###########################rpt_customer_coupon_details drop table if exists default.rpt_customer_coupon_details;


CREATE TABLE default.rpt_customer_coupon_details (`id` int,`date_id` string ,`cname` string,`cphone` string,`ccity` string,`couponid` int,`couponname` string,`accity` string,`ctype` int,`couponvalue` double ,`duration` string ,`pagename` string ,`expired_at` string ,`status` int ,`minCost` double ,`maxDiscount` double ,`minPay` double ,`created_at` string ,`updated_at` string,`source` string ,`actionname` string ,`carprice` double,`taxiprice` double,`ticketprice` double,`buspoolprice` double ) comment 'rpt_customer_coupon_details' row format delimited fields terminated by '/t' stored as textfile ;


insert into default.rpt_customer_coupon_details select a.id, to_date(a.created_at) date_id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolpricefrom (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECTp.coupon_id,p.price,p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id;


#############################rpt_customer_coupon_action_statistic drop table if exists default.rpt_customer_coupon_action_statistic;


create table default.rpt_customer_coupon_action_statistic (`date_id` string ,`action_id` int ,`title` string,`city` string,`customer_cnt` int,`total_cnt` int,`usecnt` int ,`unusecnt` int,`outcnt` int,`ocnt` int,`qcnt` int ,`pcnt` int,`rcnt` int,`oprice` double,`qprice` double,`pprice` double,`rprice` double ) comment 'rpt_customer_coupon_action_statistic' row format delimited fields terminated by '/t' stored as textfile;


insert into default.rpt_customer_coupon_action_statistic selectto_date(x.created_at) date_id,x.action_id,x.title, x.city,count(DISTINCT x.customer_id) customer_cnt,count(x.id) total_cnt,count(case when x.cstatus=1 then x.id else null end) usecnt,count(case when x.cstatus=0 then x.id else null end) unusecnt,count(case when x.cstatus=-1 then x.id else null end) outcnt,count(case when x.oprice>0 then x.id else null end) ocnt,count(case when x.qprice>0 then x.id else null end) qcnt,count(case when x.pprice>0 then x.id else null end) pcnt,count(case when x.rprice>0 then x.id else null end) rcnt,sum(x.oprice) oprice,sum(x.qprice) qprice,sum(x.pprice) pprice,sum(x.rprice) rprice FROM(SELECTa.id,a.created_at,a.action_id,da.city,da.title,a.customer_id,a.`status` cstatus, (case when fr.status=1 then nvl(price,0) else 0 end) oprice, (case when fr.status=2 then nvl(price,0) else 0 end) pprice, (case when fr.status=3 then nvl(price,0) else 0 end) qprice, (case when fr.status=4 then nvl(price,0) else 0 end) rpriceFROM default.fct_customer_coupon aLEFT JOIN default.dim_action da on a.action_id=da.idLEFT JOIN (SELECTcoupon_id,price,statusfrom default.fct_order_payment_deduction) fr on a.id=fr.coupon_id) xGROUP BY to_date(x.created_at),x.action_id,x.city,x.title ;


######################### drop table if exists default.rpt_customer_coupon_source_statistic;


create table default.rpt_customer_coupon_source_statistic (`date_id` string,`source` string ,`customer_cnt` int,`total_cnt` int,`usecnt` int,`unusecnt` int,`outcnt` int ,`ocnt` int,`qcnt` int,`pcnt` int,`rcnt` int,`oprice` double,`qprice` double,`pprice` double,`rprice` double ) comment 'rpt_customer_coupon_source_statistic' row format delimited fields terminated by '/t' stored as textfile;


insert into default.rpt_customer_coupon_source_statistic SELECTto_date(x.created_at) date_id,x.source source,count(DISTINCT x.customer_id) customer_cnt,count(x.id) total_cnt,count(case when x.cstatus=1 then x.id else null end) usecnt,count(case when x.cstatus=0 then x.id else null end) unusecnt,count(case when x.cstatus=-1 then x.id else null end) outcnt,count(case when x.oprice>0 then x.id else null end) ocnt,count(case when x.qprice>0 then x.id else null end) qcnt,count(case when x.pprice>0 then x.id else null end) pcnt,count(case when x.rprice>0 then x.id else null end) rcnt,sum(x.oprice) oprice,sum(x.qprice) qprice,sum(x.pprice) pprice,sum(x.rprice) rprice FROM (SELECTa.id,a.created_at, (casewhen a.source='complete' then 'xxxx' when a.source='feedback' then 'xxx'when a.source='first' then 'xxx'when a.source='invite' then 'xxx'when a.source='keyword' then 'xx'when a.source='new' then 'xx'when a.source='recharge' then 'xx'when a.source='share' then 'xx' else 'xxx'end) source,a.customer_id,a.`status` cstatus,(case when r.status=1 then nvl(price,0) else 0 end) oprice,(case when r.status=2 then nvl(price,0) else 0 end) pprice,(case when r.status=3 then nvl(price,0) else 0 end) qprice,(case when r.status=4 then nvl(price,0) else 0 end) rprice FROM default.fct_customer_coupon a LEFT JOIN (SELECT p.coupon_id, p.price, p.statusfrom default.fct_order_payment_deduction p ) r on a.id=r.coupon_id ) x GROUP BY to_date(x.created_at),x.source ;


5.报表层数据输出到mysql

mysqlconnect="xxx" username="xxx" password="xxxx" hdfsconnect="xxxx" ## 导出rpt_customer_coupon_details数据 sqoop eval / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --query "truncate table rpt_customer_coupon_details"


sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_details / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --input-null-non-string '//N' / --input-null-string '//N' / --update-key id / --update-mode updateonly / --export-dir '${hdfscon1}' / --columns="id,date_id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice" / -m 4 / -direct


###### 导出rpt_customer_coupon_action_statistic数据 sqoop eval / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --query "truncate table rpt_customer_coupon_action_statistic"


sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_action_statistic / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --input-fields-terminated-by "/t" / --input-lines-terminated-by "/n" / --input-null-non-string '//N' / --input-null-string '//N' / --export-dir ${hdfscon2} / --columns="date_id,action_id,title,city,customer_cnt,total_cnt,usecnt,unusecnt,outcnt,ocnt,qcnt,pcnt,rcnt,oprice,qprice,pprice,rprice" / -m 1


## rpt_customer_coupon_source_statistic sqoop eval / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --query "truncate table rpt_customer_coupon_source_statistic"


sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_source_statistic / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --input-fields-terminated-by "/t" / --input-lines-terminated-by "/n" / --input-null-non-string '//N' / --input-null-string '//N' / --export-dir '${hdfscon3}' / --columns="date_id,source,customer_cnt,total_cnt,usecnt,unusecnt,outcnt,ocnt,qcnt,pcnt,rcnt,oprice,qprice,pprice,rprice" / -m 1


############################################################################################# # hive表天分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details;


drop table if exists default.rpt_customer_coupon_details_partition;


CREATE TABLE default.rpt_customer_coupon_details_partition (`id` int,`cname` string,`cphone` string,`ccity` string,`couponid` int,`couponname` string,`accity` string,`ctype` int,`couponvalue` double ,`duration` string ,`pagename` string ,`expired_at` string ,`status` int ,`minCost` double ,`maxDiscount` double ,`minPay` double ,`created_at` string ,`updated_at` string,`source` string ,`actionname` string ,`carprice` double,`taxiprice` double,`ticketprice` double,`buspoolprice` double) comment 'rpt_customer_coupon_details' partitioned by (year string,month string,`date_id` string) row format delimited fields terminated by '/t' stored as textfile;


########设置动态分区 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;


######向分区表插入数据 insert into default.rpt_customer_coupon_details_partition partition (year,month,day) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, substr(to_date(a.created_at),1,4) year, substr(to_date(a.created_at),6,2) month, to_date(a.created_at) date_id from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECTp.coupon_id,p.price,p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id ;


#############导出-必须指定到hive表下面的文件,如果是路径的话就会报错 sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_details_partition_year / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --input-null-non-string '//N' / --input-null-string '//N' / --update-key id / --update-mode updateonly / --export-dir '${hdfscon1}_partition_year/year=2016' / --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,year" / -m 4 / -direct


############################################################################################# # hive表按年分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details;


drop table if exists default.rpt_customer_coupon_details_partition;


CREATE TABLE default.rpt_customer_coupon_details_partition_year (`id` int,`cname` string,`cphone` string,`ccity` string,`couponid` int,`couponname` string,`accity` string,`ctype` int,`couponvalue` double ,`duration` string ,`pagename` string ,`expired_at` string ,`status` int ,`minCost` double ,`maxDiscount` double ,`minPay` double ,`created_at` string ,`updated_at` string,`source` string ,`actionname` string ,`carprice` double,`taxiprice` double,`ticketprice` double,`buspoolprice` double ,`date_id` string ) comment 'rpt_customer_coupon_details' partitioned by (year string) row format delimited fields terminated by '/t' stored as textfile;


########设置动态分区 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;


######向分区表插入数据 insert into default.rpt_customer_coupon_details_partition_year partition (year) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, to_date(a.created_at) date_id, substr(to_date(a.created_at),1,4) year from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECTp.coupon_id,p.price,p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id;


#############导出 sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_details_partition_year / --input-fields-terminated-by '/t' / --update-key id / --update-mode updateonly / --export-dir '${hdfscon1}/year=2016' / --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,year" / -m 1


sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_details_partition_year / --hcatalog-database default / --hcatalog-table rpt_customer_coupon_details_partition_year / --num-mappers 1


####################mysql建表 CREATE TABLE `rpt_customer_coupon_details_partition_year` (`id` int(20) NOT NULL,`cname` varchar(255) DEFAULT NULL,`cphone` varchar(20) DEFAULT NULL ,`ccity` varchar(50) DEFAULT NULL,`couponid` bigint(20) DEFAULT NULL,`couponname` varchar(255) DEFAULT NULL,`accity` varchar(50) DEFAULT NULL ,`ctype` varchar(50) DEFAULT NULL,`couponvalue` double DEFAULT NULL,`duration` varchar(50) DEFAULT NULL,`pagename` varchar(255) DEFAULT NULL,`expired_at` varchar(50) DEFAULT NULL,`status` int(11) DEFAULT NULL,`minCost` double DEFAULT NULL ,`maxDiscount` double DEFAULT NULL ,`minPay` double DEFAULT NULL ,`created_at` varchar(50) DEFAULT NULL ,`updated_at` varchar(50) DEFAULT NULL ,`source` varchar(50) DEFAULT NULL ,`actionname` varchar(255) DEFAULT NULL ,`carprice` double DEFAULT NULL ,`taxiprice` double DEFAULT NULL ,`ticketprice` double DEFAULT NULL ,`buspoolprice` double DEFAULT NULL,`date_id` varchar(20) DEFAULT,`year` int(20) default null ,PRIMARY KEY (`id`,`year`) ) partition by range(`year`)( partition p2016 values less than (2017), partition p2017 values less than (2018), partition p2018 values less than (2019) );


############################################################################################# # hive表按季度分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details;


drop table if exists default.rpt_customer_coupon_details_partition_quarter;


CREATE TABLE default.rpt_customer_coupon_details_partition_quarter (`id` int,`cname` string,`cphone` string,`ccity` string,`couponid` int,`couponname` string,`accity` string,`ctype` int,`couponvalue` double ,`duration` string ,`pagename` string ,`expired_at` string ,`status` int ,`minCost` double ,`maxDiscount` double ,`minPay` double ,`created_at` string ,`updated_at` string,`source` string ,`actionname` string ,`carprice` double,`taxiprice` double,`ticketprice` double,`buspoolprice` double ,`date_id` string ) comment 'rpt_customer_coupon_details' partitioned by (quarter string) row format delimited fields terminated by '/t' stored as textfile;


########设置动态分区 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;


######向分区表插入数据 insert into default.rpt_customer_coupon_details_partition_quarter partition (quarter) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, to_date(a.created_at) date_id, floor(substr(to_date(a.created_at),6,2)/3.1)+1 quarter from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECTp.coupon_id,p.price,p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id;


#############导出 sqoop export / --connect ${mysqlconnect} / --username ${username} / --password ${password} / --table rpt_customer_coupon_details_partition_quarter / --fields-terminated-by "/t" / --lines-terminated-by "/n" / --update-key id / --update-mode updateonly / --export-dir '${hdfscon1}_partition_quarter/quarter=1' / --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,quarter" / -m 4


###############mysql建表 CREATE TABLE `rpt_customer_coupon_details_partition_quarter` (`ID` bigint(20) NOT NULL,`cname` varchar(255) DEFAULT NULL ,`cphone` varchar(20) DEFAULT NULL ,`ccity` varchar(50) DEFAULT NULL ,`couponid` bigint(20) DEFAULT NULL,`couponname` varchar(255) DEFAULT,`accity` varchar(50) DEFAULT NULL ,`ctype` varchar(50) DEFAULT NULL,`couponvalue` decimal(15,2) DEFAULT NULL,`duration` varchar(50) DEFAULT NULL ,`pagename` varchar(255) DEFAULT NULL ,`expired_at` varchar(50) DEFAULT NULL,`status` int(11) DEFAULT NULL,`minCost` decimal(15,2) DEFAULT ,`maxDiscount` decimal(15,2) DEFAULT NULL ,`minPay` decimal(15,2) DEFAULT NULL ,`created_at` varchar(50) DEFAULT NULL ,`updated_at` varchar(50) DEFAULT NULL ,`source` varchar(50) DEFAULT NULL ,`actionname` varchar(255) DEFAULT NULL,`carprice` decimal(15,2) DEFAULT NULL ,`taxiprice` decimal(15,2) DEFAULT NULL,`ticketprice` decimal(15,2) DEFAULT NULL ,`buspoolprice` decimal(15,2) DEFAULT NULL,`date_id` varchar(20) DEFAULT NULL ,`quarter` varchar(10) DEFAULT NULL ,PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台