spark读写hbase数据

2018-01-03 10:55:58来源:https://ask.hellobi.com/blog/ambition119/10940作者:天善智能人点击

分享

1.写数据,未采用HbaseContext


import com.alibaba.fastjson.JSON
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import java.util.ArrayList
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.spark.HBaseContext
object HbasePremiumInfoPlan {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HbasePremiumInfoPlan")
.setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(30))
//hbase常量
val hbase_table = "premium_info_plan"
val hbase_premium_info = "premium_info"
val hbase_premium_plan = "premium_plan"
val hbase_common = "common"
val info_due_voucher_date="due_voucher_date"
val info_finance_department_code="finance_department_code"
val info_department_code="department_code"
val info_currency_code="currency_code"
val info_channel_source_code="channel_source_code"
val info_client_attribute="client_attribute"
val info_is_write="is_write"
val plan_code="plan_code"
val plan_marketproduct_code="marketproduct_code"
val plan_premium_amount="premium_amount"
val plan_is_write="is_write"
val common_is_count="is_count"
//参数接收
val hbase_zookeeper_quorum="SZB-L0032013,SZB-L0032014,SZB-L0032015"
val hbase_zookeeper_property_clientPort="2181"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum",hbase_zookeeper_quorum)
hbaseConf.set("hbase.zookeeper.property.clientPort",hbase_zookeeper_property_clientPort)
hbaseConf.set("hbase.defaults.for.version.skip","true")
val hbaseContext = new HBaseContext(sc, hbaseConf)
val info = Set("DEPLOYOP.PREMIUM_INFO")
val plan = Set("DEPLOYOP.PREMIUM_PLAN")
val brokers = "10.20.25.199:9092"
val kafkaParams = Map[String,String](
"metadata.broker.list" -> brokers,
"group.id" -> "ogg_premium_info_plan",
"auto.offset.reset" -> "smallest",
"serializer.class" -> "kafka.serializer.StringEncoder" //kafka.serializer.DefaultEncoder
)
//接受streaming
//第一种方式: topics定义了两个topic,根据接收数据过滤为不同的DStream
//第二种方式:定义两个topic Set直接创建
//优化点1: 并行调用createDirectStream
val kafkaInfo = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, info)
val kafkaPlan = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, plan)
// premium_info:
//(RECEIPT_NO,DUE_VOUCHER_DATE,FINANCE_DEPARTMENT_CODE,DEPARTMENT_CODE,CURRENCY_CODE,CHANNEL_SOURCE_CODE,CLIENT_ATTRIBUTE)
// premium_plan:
//(RECEIPT_NO,PLAN_CODE,MARKETPRODUCT_CODE,PREMIUM_AMOUNT)
//output:
//key:RECEIPT_NO|DUE_VOUCHER_DATE|FINANCE_DEPARTMENT_CODE|DEPARTMENT_CODE|CURRENCY_CODE|CHANNEL_SOURCE_CODE|CLIENT_ATTRIBUTE|PLAN_CODE|MARKETPRODUCT_CODE
//value : PREMIUM_AMOUNT
//1.premium_info数据
//收据号
val RECEIPT_NO = "RECEIPT_NO"
//应收制证日期
val DUE_VOUCHER_DATE = "DUE_VOUCHER_DATE"
//财务机构编码
val FINANCE_DEPARTMENT_CODE = "FINANCE_DEPARTMENT_CODE"
//业务机构编码
val DEPARTMENT_CODE = "DEPARTMENT_CODE"
//币种
val CURRENCY_CODE = "CURRENCY_CODE"
//渠道
val CHANNEL_SOURCE_CODE = "CHANNEL_SOURCE_CODE"
//客户性质 0 团体 1 个人
val CLIENT_ATTRIBUTE = "CLIENT_ATTRIBUTE"
//添加数据
val infoInsertDS = kafkaInfo.map{ inStream =>
val info = JSON.parseObject(inStream._2)
var report_no = ""
var due_voucher_date=""
var finance_department_code=""
var department_code=""
var currency_code=""
var channel_source_code=""
var client_attribute=""
//判断是不是I
if(info.getString("op_type").equalsIgnoreCase("I")){
val infoAfter = info.getString("after")
val infoRow= JSON.parseObject(infoAfter)
if( infoRow.containsKey(RECEIPT_NO) &&
infoRow.containsKey(DUE_VOUCHER_DATE) &&
infoRow.containsKey(FINANCE_DEPARTMENT_CODE) &&
infoRow.containsKey(DEPARTMENT_CODE) &&
infoRow.containsKey(CURRENCY_CODE) &&
infoRow.containsKey(CHANNEL_SOURCE_CODE) &&
infoRow.containsKey(CLIENT_ATTRIBUTE)
){
report_no = infoRow.getString(RECEIPT_NO)
due_voucher_date = infoRow.getString(DUE_VOUCHER_DATE)
finance_department_code =infoRow.getString(FINANCE_DEPARTMENT_CODE)
department_code =infoRow.getString(DEPARTMENT_CODE)
currency_code =infoRow.getString(CURRENCY_CODE)
channel_source_code =infoRow.getString(CHANNEL_SOURCE_CODE)
client_attribute =infoRow.getString(CLIENT_ATTRIBUTE)
}
}
(report_no,due_voucher_date,finance_department_code,department_code,currency_code,channel_source_code,client_attribute)
}
//todo:
//判断修改,删除数据
//2.premium_plan数据
val PLAN_CODE = "PLAN_CODE"
//产品编码
val MARKETPRODUCT_CODE = "MARKETPRODUCT_CODE"
// 保费金额
val PREMIUM_AMOUNT = "PREMIUM_AMOUNT"
val planInsertDS = kafkaPlan.map{ inStream =>
val info = JSON.parseObject(inStream._2)
var report_no = ""
var plan_code =""
var marketproduct_code =""
var premium_amount =""
if(info.getString("op_type").equalsIgnoreCase("I")){
val infoAfter = info.getString("after")
val infoRow= JSON.parseObject(infoAfter)
if( infoRow.containsKey(RECEIPT_NO) &&
infoRow.containsKey(PLAN_CODE) &&
infoRow.containsKey(PREMIUM_AMOUNT) &&
infoRow.containsKey(MARKETPRODUCT_CODE)
){
report_no = infoRow.getString(RECEIPT_NO)
plan_code = infoRow.getString(PLAN_CODE)
marketproduct_code =infoRow.getString(MARKETPRODUCT_CODE)
premium_amount =infoRow.getString(PREMIUM_AMOUNT)
}
}
(report_no,plan_code,marketproduct_code,premium_amount)
}
infoInsertDS.foreachRDD(rdd => {
rdd.foreachPartition(iter => {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum",hbase_zookeeper_quorum)
hbaseConf.set("hbase.zookeeper.property.clientPort",hbase_zookeeper_property_clientPort)
hbaseConf.set("hbase.defaults.for.version.skip","true")
val hbaseTable = new HTable(hbaseConf,TableName.valueOf(hbase_table))
//hbaseTable.setAutoFlush(false, false)//关键点1
//hbaseTable.setWriteBufferSize(1*1024*1024)//关键点2
val info = new ArrayList[Put]
while(iter.hasNext){
val row = iter.next()
//val rowKey = row._1.hashCode()
val rowKey = row._1
val due_voucher_date = row._2
val finance_department_code = row._3
val department_code = row._4
val currency_code = row._5
val channel_source_code = row._6
val client_attribute = row._7
val put = new Put(Bytes.toBytes(rowKey))
//put.add(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_due_voucher_date),Bytes.toBytes(due_voucher_date.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_finance_department_code),Bytes.toBytes(finance_department_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_department_code),Bytes.toBytes(department_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_currency_code),Bytes.toBytes(currency_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_channel_source_code),Bytes.toBytes(channel_source_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_client_attribute),Bytes.toBytes(client_attribute.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_info),Bytes.toBytes(info_is_write),Bytes.toBytes("ok"))
put.addColumn(Bytes.toBytes(hbase_common),Bytes.toBytes(common_is_count),Bytes.toBytes("no"))info.add(put)
}
if(info.size() > 0){
hbaseTable.put(info)
hbaseTable.flushCommits()//关键点3
hbaseTable.close()
}
})
})
planInsertDS.foreachRDD(rdd => {
rdd.foreachPartition(iter => {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum",hbase_zookeeper_quorum)
hbaseConf.set("hbase.zookeeper.property.clientPort",hbase_zookeeper_property_clientPort)
hbaseConf.set("hbase.defaults.for.version.skip","true")
val hbaseTable = new HTable(hbaseConf,TableName.valueOf(hbase_table))
//hbaseTable.setAutoFlush(false, false)
//hbaseTable.setWriteBufferSize(1*1024*1024)
val info = new ArrayList[Put]
while(iter.hasNext){
val row = iter.next()
//val rowKey = row._1.hashCode()
val rowKey = row._1
val plan_code = row._2
val marketproduct_code = row._3
val premium_amount = row._4
val put = new Put(Bytes.toBytes(rowKey))
put.addColumn(Bytes.toBytes(hbase_premium_plan),Bytes.toBytes(plan_code),Bytes.toBytes(plan_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_plan),Bytes.toBytes(plan_marketproduct_code),Bytes.toBytes(marketproduct_code.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_plan),Bytes.toBytes(plan_premium_amount),Bytes.toBytes(premium_amount.toString()))
put.addColumn(Bytes.toBytes(hbase_premium_plan),Bytes.toBytes(plan_is_write),Bytes.toBytes("ok"))
put.addColumn(Bytes.toBytes(hbase_common),Bytes.toBytes(common_is_count),Bytes.toBytes("no"))info.add(put)
}
if(info.size() > 0){
hbaseTable.put(info)
hbaseTable.flushCommits()
hbaseTable.close()
}
})
})
//infoInsertDS.foreachRDD(rdd =>{
//val recods = rdd.count()
//hbaseContext.bulkPut(rdd, TableName.valueOf(hbase_table), (recods)=>{
//
//})
//})

//todo:构造计算数据和计算
//infoInsert.mapPartitions(mapPartFunc, preservePartitioning)
//查询hbase中info_is_write,plan_is_write为ok,is_count为no的数据进行计算

ssc.start()
ssc.awaitTermination()
sys.addShutdownHook(ssc.stop(true, true))
}
}

2.读hbase数据


import java.util.ArrayList
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.Filter
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.commons.lang3.StringUtils
object ConutPremiumInfoPlan {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ConutPremiumInfoPlan")
.setMaster("local")
val sc = new SparkContext(conf)
//val ssc = new StreamingContext(sc,Seconds(30))
//hbase常量
val hbase_table = "premium_info_plan"
//列簇
val hbase_premium_info = "premium_info"
val hbase_premium_plan = "premium_plan"
val hbase_common = "common"
//列标识符
val info_due_voucher_date="due_voucher_date"
val info_finance_department_code="finance_department_code"
val info_department_code="department_code"
val info_currency_code="currency_code"
val info_channel_source_code="channel_source_code"
val info_client_attribute="client_attribute"
val info_is_write="is_write"
val plan_code="plan_code"
val plan_marketproduct_code="marketproduct_code"
val plan_premium_amount="premium_amount"
val plan_is_write="is_write"
val common_is_count="is_count"
//参数接收
val hbase_zookeeper_quorum="SZB-L0032013,SZB-L0032014,SZB-L0032015"
val hbase_zookeeper_property_clientPort="2181"
//读取hbase数据
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum",hbase_zookeeper_quorum)
hbaseConf.set("hbase.zookeeper.property.clientPort",hbase_zookeeper_property_clientPort)
hbaseConf.set("hbase.defaults.for.version.skip","true")
//cdh 的hbase-spark jar包
//https://github.com/cloudera-labs/SparkOnHBase
val hbaseContext = new HBaseContext(sc, hbaseConf)

//过滤is_write为ok 且 is_count为no的row
val infoFilter = new SingleColumnValueFilter(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_is_write), CompareOp.EQUAL, Bytes.toBytes("ok"));
val planFilter = new SingleColumnValueFilter(Bytes.toBytes(hbase_premium_plan), Bytes.toBytes(plan_is_write), CompareOp.EQUAL, Bytes.toBytes("ok"));
val commFilter = new SingleColumnValueFilter(Bytes.toBytes(hbase_common), Bytes.toBytes(common_is_count), CompareOp.EQUAL, Bytes.toBytes("no"));
val filters = new FilterList(FilterList.Operator.MUST_PASS_ALL)
filters.addFilter(infoFilter)
filters.addFilter(planFilter)
filters.addFilter(commFilter)
val scan = new Scan()
//scan.addColumn(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_is_write))
//scan.addColumn(Bytes.toBytes(hbase_premium_plan), Bytes.toBytes(plan_is_write))
//scan.addColumn(Bytes.toBytes(hbase_common), Bytes.toBytes(common_is_count))
scan.setFilter(filters)
scan.setCaching(500)
scan.setCacheBlocks(false)
val hbaseRdd = hbaseContext.hbaseRDD(TableName.valueOf(hbase_table) , scan)
hbaseRdd.foreach(row => {
//获取行键
val key = Bytes.toString(row._2.getRow)
//通过列族和列名获取列
val info_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_is_write)))
val info_finance_department_code_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_finance_department_code)))
val plan_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_plan), Bytes.toBytes(plan_is_write)))
val is_count_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_common), Bytes.toBytes(common_is_count)))
println("Row key:"+key+" info_is_write:"+info_is_write_value+" plan_is_write:"+plan_is_write_value +" info_finance_department_code:"+info_finance_department_code_value +" count: "+is_count_value)
})
//rdd的filter操作,过滤不符合的记录
hbaseRdd.filter(row =>{
var flag = true
val info_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_is_write)))
val plan_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_plan), Bytes.toBytes(plan_is_write)))
val is_count_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_common), Bytes.toBytes(common_is_count)))
if("ok".equals(info_is_write_value) && "ok".equals(plan_is_write_value) && "no".equals(is_count_value)){
flag = true
}else{
flag = false
}
flag
})
.foreach(row => {
val key = Bytes.toString(row._2.getRow)
val info_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_is_write)))
val info_finance_department_code_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_info), Bytes.toBytes(info_finance_department_code)))
val plan_is_write_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_premium_plan), Bytes.toBytes(plan_is_write)))
val is_count_value:String = Bytes.toString(row._2.getValue(Bytes.toBytes(hbase_common), Bytes.toBytes(common_is_count)))
println("Row key:"+key+" info_is_write:"+info_is_write_value+" plan_is_write:"+plan_is_write_value +" info_finance_department_code:"+info_finance_department_code_value +" count: "+is_count_value)
})
}
}
KafkaRDD: Computing topic DEPLOYOP.PREMIUM_PLAN, partition 0 offsets 0 -> 13
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
kafkaStream.foreachRDD(rdd =>{
//获取offset集合
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach(println _)
})
//数据操作
messages.foreachRDD(mess => {
//获取offset集合
val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
mess.foreachPartition(lines => {
lines.foreach(line => {
val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
logger.info(s"${o.topic}${o.partition}${o.fromOffset}${o.untilOffset}")
logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
logger.info("The kafkaline is " + line)
})
})
})
指定分区:http://blog.csdn.net/BlockheadLS/article/details/52366153
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
//指定分区信息为100个哈希分区
.partionBy(new HashPartiotioner(100)) //userData和joined中的元素只要key相同,就一定会在同一台机器上
从分区中获益的操作:cogroup(), groupwith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),cobimeByKey(),lookup()
所有基于key的操作都会获益
transactional.id transactional.id检查打开的事务并且完成处理。 Kafka也增加了一个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
[bdas@SZC-L0075940 ~]$ /data/soft/kafka_2.10-0.8.2.2/bin/kafka-topics.sh --zookeeper SZC-L0075940:2181,SZC-L0075995:2181,SZC-L0075996:2181 --describe --topic web_app_log
Topic:web_app_log PartitionCount:3ReplicationFactor:2 Configs:
Topic: web_app_logPartition: 0Leader: 2 Replicas: 2,0 Isr: 0,2
Topic: web_app_logPartition: 1Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: web_app_logPartition: 2Leader: 1 Replicas: 1,2 Isr: 1,2
//这里Replicas:2,0和Isr:0,2不一致,没有影响,不是缺就好
Spark找不到partition的Leader。查看监控后发现,在异常发生的时间点,有一个Broker挂掉了。可是对应Topic的replica设置的2,就算挂掉一个,应该有replica顶上啊。后来发现,这是由于存在Partition的Replica没有跟Leader保持同步更新,也就是通常所说的“没追上”。
查看某个Topic是否存在没追上的情况:
kafka-topics.sh --describe --zookeeper XXX�0�2--topic XXX
观察其中的Replicas和Isr是否一致,如果出现Isr少于Replicas,则对应Partition存在没追上的情况
解决方法
增大num.replica.fetchers的值,此参数是Replicas从Leader同步数据的线程数,默认为1,增大此参数即增大了同步IO。经过测试,增大此值后,不再有追不上的情况
更改配置:
listeners=PLAINTEXT://30.16.45.156:9092
advertised.listeners=PLAINTEXT://30.16.45.156:9092
num.replica.fetchers=3


财务保费实时计算方案.pptx


本文由平常心 创作,采用知识共享署名-相同方式共享 3.0 中国大陆许可协议
进行许可。


转载、引用前需联系作者,并署名作者且注明文章出处。


本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据着作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。



最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台