日志类型
测试数据http://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://bigdata.myit.com/zhangsanhttp://java.myit.com/lisihttp://java.myit.com/lisihttp://java.myit.com/lisi
计算每个学科最受欢迎的老师
package myproimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkContext, SparkConf}/** * Created by 166 on 2017/9/5. */object FavTeacher { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")//local[*]代表用多个线程跑,2代表用两个线程跑 val sc = new SparkContext(conf) //读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val subjectAndTeacher:RDD[(String,String)]=lines.map(line=> { val url = new URL(line) val host = url.getHost val subject = host.substring(0, host.indexOf(".")) val teacher = url.getPath.substring(1) //去掉路径前面的"/" (subject, teacher) }) //聚合 val reduce = subjectAndTeacher.map((_,1)).reduceByKey(_+_) //println(reduce.collect().toBuffer) //按学科分组 val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduce.groupBy(_._1._1)//迭代器不能排序,需要将它变成List。 //二次排序 val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(2))//用scala的语法,会把数据全部加载到内存后再做排序,数据量大的时候会有性能问题,内存溢出的问题,不建议这样使用, val arr: Array[(String, List[((String, String), Int)])] = result.collect() println(arr.toBuffer) }}
另种角度来实现,过滤多次提交
package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 过滤多次提交object GroupFavTeacher2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val topN = args(1).toInt val subject = Array("bigdata","javaee","php") // 读取数据 val lines: RDD[String] = sc.textFile(args(0)) // 整理数据 http://bigdata.myit.cn/laozhang val subjectAndTeacher= lines.map(line => { val url = new URL(line) val host = url.getHost val subject = host.substring(0, host.indexOf(".")) val teacher = url.getPath.substring(1) // 去掉前面的/ ((subject, teacher),1) }) // 聚合 val reduced = subjectAndTeacher.reduceByKey(_+_) // 缓存到内存,因为多次过滤都是使用同一个rdd,缓存到内存可以提高反复使用的性能 val cache = reduced.cache() for (sb <- subject){ val sorted = cache.filter(_._1._1 == sb).sortBy(_._2,false).take(topN) println(sorted.toBuffer) } sc.stop() }}
使用自定义分区器将每个学科的数据shuffle到独自的分区,在分区内进行排序取topN
package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}// 自定义分区器import scala.collection.mutable// 过滤多次提交object GroupFavTeacher3 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val topN = args(1).toInt val subject = Array("bigdata","javaee","php") // 读取数据 val lines: RDD[String] = sc.textFile(args(0)) // 整理数据 http://bigdata.myit.cn/laozhang val subjectAndTeacher= lines.map(line => { val url = new URL(line) val host = url.getHost val subject = host.substring(0, host.indexOf(".")) val teacher = url.getPath.substring(1) // 去掉前面的/ ((subject, teacher),1) }) // 聚合 val reduced = subjectAndTeacher.reduceByKey(_+_) // 计算我们有多少学科 val sujects: Array[String] = reduced.map(_._1._1).distinct().collect() // 自定义一个分区器,并且按照指定的分区器进行分区 val subjectPartitoner = new SubjectPartitoner(sujects) // partitionBy按照指定的分区规则进行分区 val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(subjectPartitoner) // 如果一次拿出一个分区(可以操作一个分区的数据) val sorted = partitioned.mapPartitions(it => { // 将迭代器转成List,然后排序,再转成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序 }) val result = sorted.collect() println(result.toBuffer) sc.stop() } // 自定义分区器 class SubjectPartitoner(sbs: Array[String]) extends Partitioner{ // 相当于主构造器(new 的时候会执行一次) // 用于存放规则的一个map val rules = new mutable.HashMap[String, Int]() var i = 0 for (sb <- sbs){ rules.put(sb,i) i += 1 } // 返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length // 根据传入的key计算分区标号 // Key是一个无组(String, String) override def getPartition(key: Any): Int ={ // 获取学科名称 val subject = key.asInstanceOf[(String, String)]._1 // 根据规则计算分区编号 rules(subject) } }}
上面的方式会有多次shuffle,reduceByKey聚合数据的时候shuffle一次,使用自定义分区器重新对数据进行分析又shuffle了一次。我们可以尽可能的减少shuffle的过程,我们可以在reduceByKey的时候手动使用自定分区器进行分区,reduceByKey默认使用的是。HashPartitioner。
package com.rz.spark.baseimport java.net.URLimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}// 自定义分区器且减少shuffleimport scala.collection.mutable// 过滤多次提交object GroupFavTeacher4 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val topN = args(1).toInt val subject = Array("bigdata","javaee","php") // 读取数据 val lines: RDD[String] = sc.textFile(args(0)) // 整理数据 http://bigdata.myit.cn/laozhang val subjectAndTeacher= lines.map(line => { val url = new URL(line) val host = url.getHost val subject = host.substring(0, host.indexOf(".")) val teacher = url.getPath.substring(1) // 去掉前面的/ ((subject, teacher),1) }) // 计算我们有多少学科 val sujects: Array[String] = subjectAndTeacher.map(_._1._1).distinct().collect() // 自定义一个分区器,并且按照指定的分区器进行分区 val subjectPartitoner = new SubjectPartitoner2(sujects) // 聚合,聚合是按照指定的分区器进行分区 // 该RDD一个分区内仅有一个学科的数据 val reduced: RDD[((String, String), Int)] = subjectAndTeacher.reduceByKey(subjectPartitoner,_+_) // 如果一次拿出一个分区(可以操作一个分区的数据) val sorted = reduced.mapPartitions(it => { // 将迭代器转成List,然后排序,再转成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).toIterator // 按数值排序 }) // 收集数据 val result = sorted.collect() println(result.toBuffer) sc.stop() } // 自定义分区器 class SubjectPartitoner2(sbs: Array[String]) extends Partitioner{ // 相当于主构造器(new 的时候会执行一次) // 用于存放规则的一个map val rules = new mutable.HashMap[String, Int]() var i = 0 for (sb <- sbs){ rules.put(sb,i) i += 1 } // 返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length // 根据传入的key计算分区标号 // Key是一个无组(String, String) override def getPartition(key: Any): Int ={ // 获取学科名称 val subject = key.asInstanceOf[(String, String)]._1 // 根据规则计算分区编号 rules(subject) } }}