# 最大值最小值

@Test

def testMaxMin: Unit = {

val sconf = new SparkConf().setAppName(“test”)

val sc = new SparkContext(sconf)

//初始化测试数据

val data = sc.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2))

//方法一

val res = data.map(x => (“key”, x)).groupByKey().map(x => {

var min = Integer.MAX_VALUE

var max = Integer.MIN_VALUE

for(num <- x._2){ if(num>max){

max = num

}

if(num {

println(“max\t”+x._1)

println(“min\t”+x._2)

})

//方法二，下面用一个比较鸡贼的方式求最大最小值

val max = data.reduce((a,b) => Math.max(a,b))

val min = data.reduce((a,b) => Math.min(a,b))

println(“max : ” + max)

println(“min : ” + min)

sc.stop

}

max: 1001

min: 2

# 平均值问题

@Test

def testAvg(): Unit ={

val sconf = new SparkConf().setAppName(“test”)

val sc = new SparkContext(sconf)

//初始化测试数据

val foo = sc.parallelize(List(Tuple2(“a”, 1), Tuple2(“a”, 3), Tuple2(“b”, 2), Tuple2(“b”, 8)));

val results=foo.combineByKey(

(v)=>(v,1),

(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),

(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)

).map{case(key,value)=>(key,value._1/value._2.toDouble)}

results.collect().foreach(println)

}

# TopN问题

@Test

def testTopN(): Unit ={

val sconf = new SparkConf().setAppName(“test”)

val sc = new SparkContext(sconf)

//初始话测试数据

val foo = sc.parallelize(Array(

(“a”, 1),

(“a”, 2),

(“a”, 3),

(“b”, 3),

(“b”, 1),

(“a”, 4),

(“b”, 4),

(“b”, 2)

))

//这里测试，取top 2。

val groupsSort=foo.groupByKey().map(tu=>{

val key=tu._1

val values=tu._2

val sortValues=values.toList.sortWith(_>_).take(2)

(key,sortValues)

})

//转换格式进行print

val flattenedTopNPerGroup =

groupsSort.flatMap({case (key, numbers) => numbers.map(key -> _)})

flattenedTopNPerGroup.foreach((value: Any) => {

println(value)

})

sc.stop

}

(a,4)

(a,3)

(b,4)

(b,3)

MaxLeap技术博客首发：https://blog.maxleap.cn/archives/1239