1024programmer Blog Spark operator: RDD action Action operation – aggregate, fold, lookup; the difference between reduce/fold/aggregate

Spark operator: RDD action Action operation – aggregate, fold, lookup; the difference between reduce/fold/aggregate

class=”htmledit_views”>

Keywords: Spark operator, Spark function, Spark RDD Action, aggregate, fold, lookup

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

Aggregate users aggregate the elements in RDD, first use seqOp to aggregate the T-type elements in each partition in the RDD into U-type, and then use combOp to aggregate the previously aggregated U-type elements in each partition into U-type, Pay special attention to both seqOp and combOp will use the value of zeroValue, the type of zeroValue is U

var rdd1 = sc. makeRDD(1 to 10,2)
 rdd1. mapPartitionsWithIndex{
 (partIdx, iter) => {
 var part_map = scala.collection.mutable.Map[String,List[Int]]()
 while(iter.hasNext){
 var part_name = "part_" + partIdx;
 var elem = iter. next()
 if(part_map. contains(part_name)) {
 var elems = part_map(part_name)
 elems ::= elems
 part_map(part_name) = elems
 } else {
 part_map(part_name) = List[Int]{elem}
 }
 }
 part_map.iterator

 }
 }.collect
 res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))  

##The first partition contains 5,4,3,2,1

##The second partition contains 10,9,8,7,6

var rdd1 = sc. makeRDD(1 to 10,2)
 rdd1.aggregate(1)(
                 {(x : Int,y : Int) => x + y},
                 {(a : Int,b : Int) => a + b}
                 )
 res0: Int = 58

The result is 58, see the calculation process below:

##First execute iteratively in each partition (x : Int,y : Int) => x + y and use the value of zeroValue 1

## Namely: zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 in part_0

## part_1 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

##Merge the results of the two partitions again (a : Int,b : Int) => a + b, and use the value of zeroValue 1

## Namely: zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

Another example:

scala> rdd1.aggregate(2)(
 | {(x : Int,y : Int) => x + y},
 | {(a : Int,b : Int) => a * b}
 | )
 res18: Int = 1428

##This time zeroValue=2

In ##part_0 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

In ##part_1 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

##Finally: zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428

Therefore, zeroValue not only determines the type of U, but also has a crucial impact on the result, so pay special attention when using it.

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold is a simplification of aggregate, using the same function op for seqOp and combOp in aggregate.

scala> rdd1.fold(1)(
 | (x,y) => x + y
 | )
 res19: Int = 58

 ##The result is the same as the first example using aggregate above, namely:
 scala> rdd1.aggregate(1)(
 | {(x,y) => x + y},
 | {(a,b) => a + b}
 | )
 res20: Int = 58



 var rdd1 = sc. makeRDD(1 to 10,4)
 rdd1.fold(3)(_+_)
 res4: Int = 70

 [(3+1+2+3)+(3+4+5+6)+(3+7+8)+(3+9)]+3 = 55+(4+1)*3 = 75 ;  4+1 means that the 4 partitions are calculated 4 times, and 3 is added each time; the final result of multiple partitions needs to be combined with 3


 var rdd1 = sc. makeRDD(1 to 10,3)
 rdd1.fold(5)(_+_)
 res5: Int = 75 = 55+(3+1)*5
 

lookup

def lookup(key: K): Seq[V]

lookup is used for (K, V) type RDD, specify the K value, and return all the V values ​​corresponding to the K in the RDD.

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("  B",2),("C",1)))
 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

 scala> rdd1. lookup("A")
 res0: Seq[Int] = WrappedArray(0, 2)

 scala> rdd1. lookup("B")
 res1: Seq[Int] = WrappedArray(1, 2)
 
From: http://lxw1234.com/archives/2015/07/394.htm

—–

reduce/fold/aggregate The three method operations are all aggregation operations on RDD.

foldByKey and aggregateByKey, fold and aggregate are similar in usage and function!
foldByKey is a simplification of aggregateByKey, and fold is a simplification of aggregate.

1. The reduce() and fold() methods operate on RDDs of the same element type data, that is, they must be isomorphic. Its return value returns a new element of the same type.

val nums = Array(1,2,3,4,5,6,7,8,9)
 val numsRdd = sc. parallelize(nums,3)
 val reduce = numsRdd. reduce((a,b) => a+b)
 reduce: Int = 45

2. fold() is similar to reduce(). It receives a function with the same signature as the function received by reduce, plus an initial value as the result of the first call. (For example, the initial value of addition should be 0, and the initial value of multiplication should be 1)

val rdd = sc.makeRDD(List("a","a","b","b"),4)
 val res = rdd.fold("")(_+_) //The result is not fixed
 res: String = baab
 or
 res: String = abba
 

For specific cases, please refer to: Spark operators [10]: foldByKey, fold source code examples

3. The aggregate() method can aggregate two different types of elements, that is, it supports heterogeneity.
It first aggregates the elements in each partition, then returns all the results, and then aggregates with a given combine method and a given initial value zero value.

This article is from the internet and does not represent1024programmerPosition, please indicate the source when reprinting:https://www.1024programmer.com/spark-operator-rdd-action-action-operation-aggregate-fold-lookup-the-difference-between-reduce-fold-aggregate/

author: admin

Previous article
Next article

Leave a Reply

Your email address will not be published. Required fields are marked *

Contact Us

Contact us

181-3619-1160

Online consultation: QQ交谈

E-mail: [email protected]

Working hours: Monday to Friday, 9:00-17:30, holidays off

Follow wechat
Scan wechat and follow us

Scan wechat and follow us

Follow Weibo
Back to top
首页
微信
电话
搜索