欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Spark 战斗--基于 Spark 日志清理和数据统计以及 Zeppelin 的使用

最编程 2024-03-08 13:33:02
...

Saprk-日志实战

一、用户行为日志

1.概念

用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)

	用户行为轨迹,流量日志

2.原因

分析日志:
	网站页面访问量
	网站的粘性
	推荐

3.生产渠道

(1)Nginx

(2)Ajax

4.日志内容

在这里插入图片描述

日志数据内容:
	1.访问的系统属性:操作系统、浏览器等
	2.访问特征:点击URL,跳转页面(referer)、页面停留时间
	3.访问信息:seesion_id、访问id信息(地市\运营商)

注意:Nginx配置,可以获取指定信息

5.意义

(1)网站的眼睛
	投放广告收益
(2)网站的神经
	网站布局(影响用户体验)
(3)网站的大脑

二、离线数据处理

1.处理流程

1)数据采集
Flume:
	产生的Web日志,写入到HDFS
	
2)数据清洗
	Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
	
3)数据处理
	按照业务逻辑进行统计分析
	Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
	
4)处理结果入库
	RDBMS(MySQL)\NoSQL(HBase、Redis)
	
5)数据可视化展示
	通过图形化展示:饼图、柱状图、地图、折线图
	Echarts、HUE、Zeppelin

在这里插入图片描述

三、项目需求

code/video
需求一:
	统计imooc主站最受欢迎的课程/手记Top N访问次数
	
需求二:
	按地市统计imooc主站最受欢迎的Top N课程
	a.根据IP地址获取出城市信息
	b.窗口函数在Spark SQL中的使用
	
需求三:
	按流量统计imooc主站最受欢迎的Top N课程

四、日志内容

需要字段:
	访问时间、访问URL、访问过程耗费流量、访问IP地址

日志处理:
	一般的日志处理方式,我们是需要进行分区的,
	按照日志中的访问时间进行相应的分区,比如: d, h,m5(每5分钟一个分区)
	
输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

Maven打包

mvn install:install-file -Dfile=D:\ipdatabase-master\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar2 -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

五、数据清洗

1.原始日志解析

package com.saddam.spark.MuKe.ImoocProject

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

/**
  * 第一步清洗:抽取出所需要指定列数据
  *
  * 添加断点,可以查看各个字段
  */
object SparkStatFormatJob {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("SparkStatFormatJob")
      .master("local[2]")
      .getOrCreate()

    val logRDD = spark.sparkContext.textFile("D:\\Spark\\DataSets\\access.20161111.log")

//    logRDD.take(10).foreach(println)

    val result = logRDD.map(line => {
      
      val split = line.split(" ")
      val ip = split(0)
      /**
        * 原始日志的第三个和第四个字段拼接起来就是完整的时间字段:
        * [10/Nov/2016:00:01:02 +0800]==>yyyy-MM-dd HH
        */
      //TODO 使用时间解析工具类
      val time = split(3) + " " + split(4)

      //"http://www.imooc.com/code/1852" 引号需要放空
      val url = split(11).replaceAll("\"", "")

      val traffic = split(9)

      //使用元组
      // (ip,DateUtils.parse(time),url,traffic)
      
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).take(20).foreach(println)


//    result.saveAsTextFile("D:\\Spark\\OutPut\\log_local_2")
/*
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
(117.35.88.11,[10/Nov/2016:00:01:02 +0800]) 
(182.106.215.93,[10/Nov/2016:00:01:02 +0800])
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
 */

    spark.stop()
}
}

2.日期工具类

package com.saddam.spark.MuKe.ImoocProject

import java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat

/**
  * 日期时间解析工具类
  */
object DateUtils {

  // 输入文件日期时间格式
  //[10/Nov/2016:00:01:02 +0800]
  val YYYYMMDDHHMM_TIME_FOEMAT= FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)

   //目标日期格式
  //2016-11-10 00:01:02
  val TARGET_FORMAT=FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  
  /**
    *解析时间
    * @param time
    * @return
    */
  def parse(time:String)={
    TARGET_FORMAT.format(new Date(getTime(time)))
  }

  /**
    * 获取输入日志时间:long类型
    *
    * time:[10/Nov/2016:00:01:02 +0800]
    * @param time
    * @return
    */
  def getTime(time:String)= {
    try {
      YYYYMMDDHHMM_TIME_FOEMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch {
      case e: Exception => {
        0l
      }
    }
  }
    def main(args: Array[String]): Unit = {
      println(parse("[10/Nov/2016:00:01:02 +0800]"))

  }
}

在这里插入图片描述

六、项目需求

需求一

					统计imooc主站最受欢迎的课程/手记TopN访问次数
	
按照需求完成统计信息并将统计结果入库
	--使用DataFrame API完成统计分析
	--使用SQL API完成统计分析
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object PopularVideoVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
    /*
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url                               |cmsType|cmsId|traffic|ip             |city|time               |day     |
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/video/4500   |video  |4500 |304    |218.75.35.226  |鏈煡 |2017-05-11 14:09:14|20170511|
|http://www.imooc.com/video/14623  |video  |14623|69     |202.96.134.133 |鏈煡 |2017-05-11 15:25:05|20170511|
|http://www.imooc.com/article/17894|article|17894|115    |202.96.134.133 |鏈煡 |2017-05-11 07:50:01|20170511|
     */

      //代码重构
    val day="20170511"
      
      
    videoAccessTopNStat(spark,accessDF,day)

    //MySQL工具类测试
    println(MySQLUtils.getConnection())
    
    /**
      * 按照流量进行统计
      */
    def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame,day:String)={
      //隐式转换
      import spark.implicits._

      //TODO 统计方式一:DataFrame方式统计video
      val  videoAccessTopNDF= accessDF
        .filter($"day"===day && $"cmsType"==="video")
        .groupBy("day","cmsId")
        .agg(count("cmsId").as("times"))
      videoAccessTopNDF.printSchema()
      videoAccessTopNDF.show(false)

      //TODO 统计方式二:SQL方式统计article
      accessDF.createOrReplaceTempView("temp")
      val videoAccessTopNSQL = spark.sql("select " +
        "day,cmsId,count(1) as times " +
        "from temp " +
        "where day='20170511' and cmsType='article' " +
        "group by day,cmsId " +
        "order by times desc")
      videoAccessTopNSQL.show(false)


      /**
        * TODO 将最受欢迎的TopN课程统计结果写入MySQL
        *
        */
      try{
        videoAccessTopNSQL.foreachPartition(partitionOfRecords=>{
          val list =new ListBuffer[DayVideoAccessStat]

          partitionOfRecords.foreach(info=>{
            val day=info.getAs[Integer]("day").toString
            val cmsId=info.getAs[Long]("cmsId")
            val times=info.getAs[Long]("times")

            list.append(DayVideoAccessStat(day,cmsId,times))
          })

          StatDAO.insertDayVideoAccessTopN(list)

        })}catch {
        case e:Exception=>e.printStackTrace()
      }

    }


    spark.stop()
  }

  /**
    * 课程访问次数实体类
    */
  case class  DayVideoAccessStat(day:String,cmsId:Long,times:Long)


  /**
   * TODO MySQL操作工具类
   */
  object MySQLUtils{
    def getConnection()={
      DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
    }
    /**
      * 释放数据库连接等资源
      * @param connection
      * @param pstmt
      */
    def release(connection: Connection, pstmt: PreparedStatement): Unit = {
      try {
        if (pstmt != null) {
          pstmt.close()
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    }
  }

  /**
    * TODO DAO数据库接口
    */
  object StatDAO{
    /**
      * 批量保存DayVideoAccessStat到数据库
      * insertDayVideoAccessTopN:每天访问视频的
      */
    def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {

      var connection:Connection = null
      var pstmt:PreparedStatement = null

      try {
        connection =MySQLUtils.getConnection()

        connection.setAutoCommit(false) //设置手动提交

        val sql = "insert into day_video_access_topn_stat2(day,cms_id,times) values (?,?,?)"
        pstmt = connection.prepareStatement(sql)

        for (ele <- list) {
          pstmt.setString(1, ele.day)
          pstmt.setLong(2, ele.cmsId)
          pstmt.setLong(3, ele.times)

          pstmt.addBatch()
        }

        pstmt.executeBatch() // 执行批量处理
        connection.commit() //手工提交
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        MySQLUtils.release(connection, pstmt)
      }
    }
    
  }

}

需求二

				按地市统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object PopularCiytVideoVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
      
     //代码重构
    val day="20170511"
      
    //TODO 按照地市进行统计TopN课程
    cityAccessTopNStat(spark,accessDF,day)
      
    /**
    * 按照地市进行统计TopN课程
    * @param spark
    * @param accessDf
    */
  def cityAccessTopNStat(spark: SparkSession,accessDF:DataFrame,day:String)={
    import spark.implicits._
        val cityAccessTopNDF=accessDF.filter($"day"===day&&$"cmsType"==="video").groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
        cityAccessTopNDF.printSchema()
        cityAccessTopNDF.show(false)

    //Windows函数在Spark SQL的使用

    val top3DF=cityAccessTopNDF.select(
    cityAccessTopNDF("day"),
    cityAccessTopNDF("city"),
    cityAccessTopNDF("cmsId"),
    cityAccessTopNDF("times"),

    row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
      .orderBy(cityAccessTopNDF("times").desc)
    ).as("times_rank")
    ).filter("times_rank <=3") //.show(false)  //Top3
    /**
      * 将地市进行统计TopN课程统计结果写入MySQL
      *
      */
    try{
      top3DF.foreachPartition(partitionOfRecords=>{
        val list =new ListBuffer[DayCityVideoAccessStat]

        partitionOfRecords.foreach(info=>{
          val day=info.getAs[Integer]("day").toString
          val cmsId=info.getAs[Long]("cmsId")
          val city=info.getAs[String]("city")
          val times=info.getAs[Long]("times")
          val timesRank=info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day,cmsId,city,times,timesRank))


        })

        StatDAO.insertDayCityVideoAccessTopN(list)

      })}catch {
      case e:Exception=>e.printStackTrace()
    }
  }
       spark.stop()
  }   
    
    /**
    * 实体类
    */
    case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)
    
     /**
   * TODO MySQL操作工具类
   */
  object MySQLUtils{
    def getConnection()={
      DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
    }
    /**
      * 释放数据库连接等资源
      * @param connection
      * @param pstmt
      */
    def release(connection: Connection, pstmt: PreparedStatement): Unit = {
      try {
        if (pstmt != null) {
          pstmt.close()
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (connection != null) {
          connection.close()
        }
      }
    }
  }
    
    
  /**
    * TODO DAO数据库接口
    */
  object StatDAO{
      /**
    * 批量保存DayCityVideoAccessStat到数据库
    */
  def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false) //设置手动提交

      val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setString(3, ele.city)
        pstmt.setLong(4, ele.times)
        pstmt.setInt(5, ele.timesRank)
        pstmt.addBatch()
      }

      pstmt.executeBatch() // 执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
  }
}

需求三

					按流量统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe

import java.sql.{Connection, DriverManager, PreparedStatement}


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object VideoTrafficVisits {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession
      .builder()
      .appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
      .master("local[2]")
      .getOrCreate()

    val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")

    accessDF.printSchema()
    accessDF.show(false)
    
    //代码重构
    val day="20170511"
      
    //TODO 按照流量进行统计
    videoTrafficsTopNStat(spark,accessDF,day)
      
    
						

上一篇: C# 设计模式(1)--单例模式

下一篇: Linux常见问题解答--如何修复 "tar:由于之前的错误,以失败状态退出