Spark 战斗--基于 Spark 日志清理和数据统计以及 Zeppelin 的使用
最编程
2024-03-08 13:33:02
...
Saprk-日志实战
一、用户行为日志
1.概念
用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)
用户行为轨迹,流量日志
2.原因
分析日志:
网站页面访问量
网站的粘性
推荐
3.生产渠道
(1)Nginx
(2)Ajax
4.日志内容
日志数据内容:
1.访问的系统属性:操作系统、浏览器等
2.访问特征:点击URL,跳转页面(referer)、页面停留时间
3.访问信息:seesion_id、访问id信息(地市\运营商)
注意:Nginx配置,可以获取指定信息
5.意义
(1)网站的眼睛
投放广告收益
(2)网站的神经
网站布局(影响用户体验)
(3)网站的大脑
二、离线数据处理
1.处理流程
1)数据采集
Flume:
产生的Web日志,写入到HDFS
2)数据清洗
Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
3)数据处理
按照业务逻辑进行统计分析
Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)
4)处理结果入库
RDBMS(MySQL)\NoSQL(HBase、Redis)
5)数据可视化展示
通过图形化展示:饼图、柱状图、地图、折线图
Echarts、HUE、Zeppelin
三、项目需求
code/video
需求一:
统计imooc主站最受欢迎的课程/手记Top N访问次数
需求二:
按地市统计imooc主站最受欢迎的Top N课程
a.根据IP地址获取出城市信息
b.窗口函数在Spark SQL中的使用
需求三:
按流量统计imooc主站最受欢迎的Top N课程
四、日志内容
需要字段:
访问时间、访问URL、访问过程耗费流量、访问IP地址
日志处理:
一般的日志处理方式,我们是需要进行分区的,
按照日志中的访问时间进行相应的分区,比如: d, h,m5(每5分钟一个分区)
输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
Maven打包
mvn install:install-file -Dfile=D:\ipdatabase-master\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar2 -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
五、数据清洗
1.原始日志解析
package com.saddam.spark.MuKe.ImoocProject
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
* 第一步清洗:抽取出所需要指定列数据
*
* 添加断点,可以查看各个字段
*/
object SparkStatFormatJob {
def main(args: Array[String]): Unit = {
val spark=SparkSession
.builder()
.appName("SparkStatFormatJob")
.master("local[2]")
.getOrCreate()
val logRDD = spark.sparkContext.textFile("D:\\Spark\\DataSets\\access.20161111.log")
// logRDD.take(10).foreach(println)
val result = logRDD.map(line => {
val split = line.split(" ")
val ip = split(0)
/**
* 原始日志的第三个和第四个字段拼接起来就是完整的时间字段:
* [10/Nov/2016:00:01:02 +0800]==>yyyy-MM-dd HH
*/
//TODO 使用时间解析工具类
val time = split(3) + " " + split(4)
//"http://www.imooc.com/code/1852" 引号需要放空
val url = split(11).replaceAll("\"", "")
val traffic = split(9)
//使用元组
// (ip,DateUtils.parse(time),url,traffic)
DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
}).take(20).foreach(println)
// result.saveAsTextFile("D:\\Spark\\OutPut\\log_local_2")
/*
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
(117.35.88.11,[10/Nov/2016:00:01:02 +0800])
(182.106.215.93,[10/Nov/2016:00:01:02 +0800])
(10.100.0.1,[10/Nov/2016:00:01:02 +0800])
*/
spark.stop()
}
}
2.日期工具类
package com.saddam.spark.MuKe.ImoocProject
import java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间解析工具类
*/
object DateUtils {
// 输入文件日期时间格式
//[10/Nov/2016:00:01:02 +0800]
val YYYYMMDDHHMM_TIME_FOEMAT= FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)
//目标日期格式
//2016-11-10 00:01:02
val TARGET_FORMAT=FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
/**
*解析时间
* @param time
* @return
*/
def parse(time:String)={
TARGET_FORMAT.format(new Date(getTime(time)))
}
/**
* 获取输入日志时间:long类型
*
* time:[10/Nov/2016:00:01:02 +0800]
* @param time
* @return
*/
def getTime(time:String)= {
try {
YYYYMMDDHHMM_TIME_FOEMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
} catch {
case e: Exception => {
0l
}
}
}
def main(args: Array[String]): Unit = {
println(parse("[10/Nov/2016:00:01:02 +0800]"))
}
}
六、项目需求
需求一
统计imooc主站最受欢迎的课程/手记TopN访问次数
按照需求完成统计信息并将统计结果入库
--使用DataFrame API完成统计分析
--使用SQL API完成统计分析
package com.saddam.spark.MuKe
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object PopularVideoVisits {
def main(args: Array[String]): Unit = {
val spark=SparkSession
.builder()
.appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
.master("local[2]")
.getOrCreate()
val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)
/*
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url |cmsType|cmsId|traffic|ip |city|time |day |
+----------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/video/4500 |video |4500 |304 |218.75.35.226 |鏈煡 |2017-05-11 14:09:14|20170511|
|http://www.imooc.com/video/14623 |video |14623|69 |202.96.134.133 |鏈煡 |2017-05-11 15:25:05|20170511|
|http://www.imooc.com/article/17894|article|17894|115 |202.96.134.133 |鏈煡 |2017-05-11 07:50:01|20170511|
*/
//代码重构
val day="20170511"
videoAccessTopNStat(spark,accessDF,day)
//MySQL工具类测试
println(MySQLUtils.getConnection())
/**
* 按照流量进行统计
*/
def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame,day:String)={
//隐式转换
import spark.implicits._
//TODO 统计方式一:DataFrame方式统计video
val videoAccessTopNDF= accessDF
.filter($"day"===day && $"cmsType"==="video")
.groupBy("day","cmsId")
.agg(count("cmsId").as("times"))
videoAccessTopNDF.printSchema()
videoAccessTopNDF.show(false)
//TODO 统计方式二:SQL方式统计article
accessDF.createOrReplaceTempView("temp")
val videoAccessTopNSQL = spark.sql("select " +
"day,cmsId,count(1) as times " +
"from temp " +
"where day='20170511' and cmsType='article' " +
"group by day,cmsId " +
"order by times desc")
videoAccessTopNSQL.show(false)
/**
* TODO 将最受欢迎的TopN课程统计结果写入MySQL
*
*/
try{
videoAccessTopNSQL.foreachPartition(partitionOfRecords=>{
val list =new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info=>{
val day=info.getAs[Integer]("day").toString
val cmsId=info.getAs[Long]("cmsId")
val times=info.getAs[Long]("times")
list.append(DayVideoAccessStat(day,cmsId,times))
})
StatDAO.insertDayVideoAccessTopN(list)
})}catch {
case e:Exception=>e.printStackTrace()
}
}
spark.stop()
}
/**
* 课程访问次数实体类
*/
case class DayVideoAccessStat(day:String,cmsId:Long,times:Long)
/**
* TODO MySQL操作工具类
*/
object MySQLUtils{
def getConnection()={
DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
}
/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}
}
/**
* TODO DAO数据库接口
*/
object StatDAO{
/**
* 批量保存DayVideoAccessStat到数据库
* insertDayVideoAccessTopN:每天访问视频的
*/
def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {
var connection:Connection = null
var pstmt:PreparedStatement = null
try {
connection =MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_access_topn_stat2(day,cms_id,times) values (?,?,?)"
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.times)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
}
}
需求二
按地市统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object PopularCiytVideoVisits {
def main(args: Array[String]): Unit = {
val spark=SparkSession
.builder()
.appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
.master("local[2]")
.getOrCreate()
val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)
//代码重构
val day="20170511"
//TODO 按照地市进行统计TopN课程
cityAccessTopNStat(spark,accessDF,day)
/**
* 按照地市进行统计TopN课程
* @param spark
* @param accessDf
*/
def cityAccessTopNStat(spark: SparkSession,accessDF:DataFrame,day:String)={
import spark.implicits._
val cityAccessTopNDF=accessDF.filter($"day"===day&&$"cmsType"==="video").groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
cityAccessTopNDF.printSchema()
cityAccessTopNDF.show(false)
//Windows函数在Spark SQL的使用
val top3DF=cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("times").desc)
).as("times_rank")
).filter("times_rank <=3") //.show(false) //Top3
/**
* 将地市进行统计TopN课程统计结果写入MySQL
*
*/
try{
top3DF.foreachPartition(partitionOfRecords=>{
val list =new ListBuffer[DayCityVideoAccessStat]
partitionOfRecords.foreach(info=>{
val day=info.getAs[Integer]("day").toString
val cmsId=info.getAs[Long]("cmsId")
val city=info.getAs[String]("city")
val times=info.getAs[Long]("times")
val timesRank=info.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day,cmsId,city,times,timesRank))
})
StatDAO.insertDayCityVideoAccessTopN(list)
})}catch {
case e:Exception=>e.printStackTrace()
}
}
spark.stop()
}
/**
* 实体类
*/
case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)
/**
* TODO MySQL操作工具类
*/
object MySQLUtils{
def getConnection()={
DriverManager.getConnection("jdbc:mysql://121.37.2x.xx:3306/imooc_project?user=root&password=xxxxxx&useSSL=false")
}
/**
* 释放数据库连接等资源
* @param connection
* @param pstmt
*/
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}
}
/**
* TODO DAO数据库接口
*/
object StatDAO{
/**
* 批量保存DayCityVideoAccessStat到数据库
*/
def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {
var connection: Connection = null
var pstmt: PreparedStatement = null
try {
connection = MySQLUtils.getConnection()
connection.setAutoCommit(false) //设置手动提交
val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)
for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()
}
pstmt.executeBatch() // 执行批量处理
connection.commit() //手工提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}
}
}
}
需求三
按流量统计imooc主站最受欢迎的Top N课程
package com.saddam.spark.MuKe
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object VideoTrafficVisits {
def main(args: Array[String]): Unit = {
val spark=SparkSession
.builder()
.appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enable", "false")
.master("local[2]")
.getOrCreate()
val accessDF=spark.read.format("parquet").load("D:\\Spark\\DataSets\\clean_city")
accessDF.printSchema()
accessDF.show(false)
//代码重构
val day="20170511"
//TODO 按照流量进行统计
videoTrafficsTopNStat(spark,accessDF,day)
上一篇:
C# 设计模式(1)--单例模式