从mongoDB,使用SPARK来抓取数据, 主要针对无埋点数据。

2017-08-12 20:16:22来源:CSDN作者:m0_37845836人点击

分享
val spark = SparkSession  .builder()  .master("local[*]")  .appName("Spark")  .config("spark.sql.warehouse.dir","file:///root/project/dcproject/spark-warehouse")  .config("hive.metastore.uris","thrift://sy-003.hadoop:9083")  .config("fs.defaultFS", "hdfs://cluster")  .config("dfs.nameservices","cluster")  .config("dfs.ha.namenodes.cluster","nn1,nn2")  .config("dfs.namenode.rpc-address.cluster.nn1","sy-002.hadoop:8020")  .config("dfs.namenode.rpc-address.cluster.nn2","sy-003.hadoop:8020")  .config("dfs.client.failover.proxy.provider.cluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")  .config("spark.mongodb.input.uri","mongodb://192.168.56.17:27017/growingio.webpv")//  .config("spark.mongodb.output.uri","mongodb://192.168.56.17:27017/growingio.webpv")  .enableHiveSupport()  .getOrCreate()
这里是 SparkSession 的添加,除了启用hive 后,还可以直接为mongoDB添加配置.
package OutAnalysis/** 无埋点数据 用户行为统计分析* author: 谭志坚* 2017-02-24* */import java.io.IOExceptionimport DAO.{ScalaConn, ScalaHbase}import com.mongodb.spark.MongoSparkimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.hbase.client.{Connection, Put, Table}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.{MasterNotRunningException, TableName, ZooKeeperConnectionException}import org.apache.spark.sql.{SparkSession, DataFrame}import org.slf4j.LoggerFactoryimport com.mongodb.spark.config._import com.mongodb.spark._import scala.collection.JavaConverters._import org.bson.Documentcase class webaction(_id:String,p:String,b:String,s:String,t:String,d:String,u:String,v:String,x:String,h:String,ptm:String,tm:String )@SerialVersionUID(-1)object DW_WMD_WEBACTION {  def LOG = LoggerFactory.getLogger(getClass)  @throws(classOf[Exception])  def DW_WMD_WEBACTION_run( conf: Configuration,hconn: Connection) {    val familyColumn: Array[String] = Array[String]("USERS")    ScalaHbase.createTable(conf,hconn, "DW_WMD_WEBACTION", familyColumn)    val DW_WMD_WEBACTION: Table = hconn.getTable(TableName.valueOf("DW_WMD_WEBACTION"))    var maxdate: String = ScalaConn.getMaxDate(conf,hconn,DW_WMD_WEBACTION, "USERS", "TM")//    val date: Date = new Date();//    var maxdt: Timestamp = new Timestamp(date.getTime) ;    if (maxdate.contains("-")) {      maxdate =  "0"    }    val spark = ScalaConn.spark    val readConfig = ReadConfig(Map("collection" -> "webaction", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(spark.sparkContext)))//    val webaction = spark.sparkContext.loadFromMongoDB(ReadConfig( Map("uri" -> "mongodb://192.168.56.17:27017/growingio.webaction"))).toDF()    val webaction = MongoSpark.load(spark.sparkContext,readConfig).toDF()    webaction.createOrReplaceTempView("webaction")    val DW_WMD_WEBACTION_sql = "SELECT _id,p,s,t,d,u,v,x,h,ptm,tm FROM webaction WHERE tm >= "+maxdate   var filteredDF: DataFrame = spark.sql(DW_WMD_WEBACTION_sql)//    filteredDF.show()    try {        filteredDF.collect().foreach { userGetLoginRow => {        val id: String = String.valueOf(userGetLoginRow.get(0)).replace("[","").replace("]","")        val p: String =  String.valueOf(userGetLoginRow.getString(1))        val s: String = String.valueOf(userGetLoginRow.getString(2))        val t: String = String.valueOf(userGetLoginRow.getString(3))        val d: String = String.valueOf(userGetLoginRow.getString(4))        var u:String = String.valueOf(userGetLoginRow.getString(5))        var v:String = String.valueOf(userGetLoginRow.getString(6))        var x:String = String.valueOf(userGetLoginRow.getString(7))        var h:String = String.valueOf(userGetLoginRow.getString(8))        var ptm:String = String.valueOf(userGetLoginRow.getLong(9))        var tm:String = String.valueOf(userGetLoginRow.getLong(10))        val rowKey: String = tm+id        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "ID", id)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "P", p)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "S", s)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "T", t)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "D", d)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "U", u)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "V", v)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "X", x)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "H", h)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "PTM",ptm)        ScalaHbase.addRow(DW_WMD_WEBACTION, rowKey, "USERS", "TM",tm)        }    }}    catch {      case e: Exception => {        if (e.getClass == classOf[MasterNotRunningException]) {          LOG.error("MasterNotRunningException:"+e.toString)        }        if (e.getClass == classOf[ZooKeeperConnectionException]) {          LOG.error("ZooKeeperConnectionException:"+e.toString)        }        if (e.getClass == classOf[IOException]) {          LOG.error("IOException:"+e.toString)        }        LOG.error("error xception:"+ e.printStackTrace())      }    } finally {      filteredDF = null      if (null != DW_WMD_WEBACTION) {        DW_WMD_WEBACTION.close      }    }  }}

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台