博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark- 计算每个学科最受欢迎的老师
阅读量:5869 次
发布时间:2019-06-19

本文共 7129 字,大约阅读时间需要 23 分钟。

日志类型

测试数据http://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://java.myit.com/lisihttp://java.myit.com/lisihttp://java.myit.com/lisi

 

计算每个学科最受欢迎的老师

package myproimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkContext, SparkConf}/** * Created by 166 on 2017/9/5. */object FavTeacher {  def main(args: Array[String]) {    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)    val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")//local[*]代表用多个线程跑,2代表用两个线程跑    val sc = new SparkContext(conf)    //读取数据    val lines: RDD[String] = sc.textFile(args(0))    //整理数据    val subjectAndTeacher:RDD[(String,String)]=lines.map(line=> {      val url = new URL(line)      val host = url.getHost      val subject = host.substring(0, host.indexOf("."))      val teacher = url.getPath.substring(1)   //去掉路径前面的"/"      (subject, teacher)    })    //聚合    val reduce = subjectAndTeacher.map((_,1)).reduceByKey(_+_)    //println(reduce.collect().toBuffer)    //按学科分组    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduce.groupBy(_._1._1)//迭代器不能排序,需要将它变成List。    //二次排序    val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))//用scala的语法,会把数据全部加载到内存后再做排序,数据量大的时候会有性能问题,内存溢出的问题,不建议这样使用,    val arr: Array[(String, List[((String, String), Int)])] = result.collect()    println(arr.toBuffer)      }}

 另种角度来实现,过滤多次提交

package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 过滤多次提交object GroupFavTeacher2 {  def main(args: Array[String]): Unit = {    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)    val sc = new SparkContext(conf)    val topN = args(1).toInt    val subject = Array("bigdata","javaee","php")    // 读取数据    val lines: RDD[String] = sc.textFile(args(0))    // 整理数据  http://bigdata.myit.cn/laozhang    val subjectAndTeacher= lines.map(line => {      val url = new URL(line)      val host = url.getHost      val subject = host.substring(0, host.indexOf("."))      val teacher = url.getPath.substring(1) // 去掉前面的/      ((subject, teacher),1)    })    // 聚合    val reduced = subjectAndTeacher.reduceByKey(_+_)     // 缓存到内存,因为多次过滤都是使用同一个rdd,缓存到内存可以提高反复使用的性能   val cache = reduced.cache()     for (sb <- subject){      val sorted = cache.filter(_._1._1 == sb).sortBy(_._2,false).take(topN)      println(sorted.toBuffer)    }    sc.stop()  }}

  使用自定义分区器将每个学科的数据shuffle到独自的分区,在分区内进行排序取topN

package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}// 自定义分区器import scala.collection.mutable// 过滤多次提交object GroupFavTeacher3 {  def main(args: Array[String]): Unit = {    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)    val sc = new SparkContext(conf)    val topN = args(1).toInt    val subject = Array("bigdata","javaee","php")    // 读取数据    val lines: RDD[String] = sc.textFile(args(0))    // 整理数据  http://bigdata.myit.cn/laozhang    val subjectAndTeacher= lines.map(line => {      val url = new URL(line)      val host = url.getHost      val subject = host.substring(0, host.indexOf("."))      val teacher = url.getPath.substring(1) // 去掉前面的/      ((subject, teacher),1)    })    // 聚合    val reduced = subjectAndTeacher.reduceByKey(_+_)    // 计算我们有多少学科    val sujects: Array[String] = reduced.map(_._1._1).distinct().collect()    // 自定义一个分区器,并且按照指定的分区器进行分区    val subjectPartitoner = new SubjectPartitoner(sujects)    // partitionBy按照指定的分区规则进行分区    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartitoner)    // 如果一次拿出一个分区(可以操作一个分区的数据)    val sorted = partitioned.mapPartitions(it => {      // 将迭代器转成List,然后排序,再转成迭代器返回      it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序    })    val result = sorted.collect()    println(result.toBuffer)    sc.stop()  }  // 自定义分区器  class SubjectPartitoner(sbs: Array[String]) extends Partitioner{    // 相当于主构造器(new 的时候会执行一次)    // 用于存放规则的一个map    val rules = new mutable.HashMap[String, Int]()    var i = 0    for (sb <- sbs){      rules.put(sb,i)      i += 1    }    // 返回分区的数量(下一个RDD有多少分区)    override def numPartitions: Int = sbs.length    // 根据传入的key计算分区标号    // Key是一个无组(String, String)    override def getPartition(key: Any): Int ={      // 获取学科名称      val subject = key.asInstanceOf[(String, String)]._1      // 根据规则计算分区编号      rules(subject)    }  }}

上面的方式会有多次shuffle,reduceByKey聚合数据的时候shuffle一次,使用自定义分区器重新对数据进行分析又shuffle了一次。我们可以尽可能的减少shuffle的过程,我们可以在reduceByKey的时候手动使用自定分区器进行分区,reduceByKey默认使用的是。HashPartitioner。

package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}// 自定义分区器且减少shuffleimport scala.collection.mutable// 过滤多次提交object GroupFavTeacher4 {  def main(args: Array[String]): Unit = {    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)    val sc = new SparkContext(conf)    val topN = args(1).toInt    val subject = Array("bigdata","javaee","php")    // 读取数据    val lines: RDD[String] = sc.textFile(args(0))    // 整理数据  http://bigdata.myit.cn/laozhang    val subjectAndTeacher= lines.map(line => {      val url = new URL(line)      val host = url.getHost      val subject = host.substring(0, host.indexOf("."))      val teacher = url.getPath.substring(1) // 去掉前面的/      ((subject, teacher),1)    })    // 计算我们有多少学科    val sujects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect()    // 自定义一个分区器,并且按照指定的分区器进行分区    val subjectPartitoner = new SubjectPartitoner2(sujects)    // 聚合,聚合是按照指定的分区器进行分区    // 该RDD一个分区内仅有一个学科的数据    val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartitoner,_+_)    // 如果一次拿出一个分区(可以操作一个分区的数据)    val sorted = reduced.mapPartitions(it => {      // 将迭代器转成List,然后排序,再转成迭代器返回      it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序    })    // 收集数据    val result = sorted.collect()    println(result.toBuffer)    sc.stop()  }  // 自定义分区器  class SubjectPartitoner2(sbs: Array[String]) extends Partitioner{    // 相当于主构造器(new 的时候会执行一次)    // 用于存放规则的一个map    val rules = new mutable.HashMap[String, Int]()    var i = 0    for (sb <- sbs){      rules.put(sb,i)      i += 1    }    // 返回分区的数量(下一个RDD有多少分区)    override def numPartitions: Int = sbs.length    // 根据传入的key计算分区标号    // Key是一个无组(String, String)    override def getPartition(key: Any): Int ={      // 获取学科名称      val subject = key.asInstanceOf[(String, String)]._1      // 根据规则计算分区编号      rules(subject)    }  }}

 

转载于:https://www.cnblogs.com/RzCong/p/7822998.html

你可能感兴趣的文章
android addCategory()等说明
查看>>
django信号
查看>>
java基础之反射---重要
查看>>
tdd 和 make file,以及cygwin
查看>>
重装系统后,delphi7打开报错
查看>>
i++的原子性问题
查看>>
【吉光片羽】短信验证
查看>>
git diff 的用法
查看>>
你不知道的Virtual DOM(二):Virtual Dom的更新
查看>>
CentOS 6.5搭建ELK环境ElasticSearch+Kibana+Logstash
查看>>
前端性能优化小结
查看>>
ubuntu中安装oracle 11g
查看>>
MacBook如何用Parallels Desktop安装windows7/8
查看>>
gitlab 完整部署实例
查看>>
GNS关于IPS&ASA&PIX&Junos的配置
查看>>
七天学会ASP.NET MVC (四)——用户授权认证问题
查看>>
upgrade to iOS7,how to remove stroyboard?
查看>>
影响企业信息化成败的几点因素
查看>>
Clipboard 实现网页复制粘贴
查看>>
Thinkphp5 模型里别名alias不生效bug【已解决】
查看>>