class=”htmledit_views”>
Keywords: Spark operator, Spark function, Spark RDD Action, aggregate, fold, lookup
aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
Aggregate users aggregate the elements in RDD, first use seqOp to aggregate the T-type elements in each partition in the RDD into U-type, and then use combOp to aggregate the previously aggregated U-type elements in each partition into U-type, Pay special attention to both seqOp and combOp will use the value of zeroValue, the type of zeroValue is U
var rdd1 = sc. makeRDD(1 to 10,2)
rdd1. mapPartitionsWithIndex{
(partIdx, iter) => {
var part_map = scala.collection.mutable.Map[String,List[Int]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter. next()
if(part_map. contains(part_name)) {
var elems = part_map(part_name)
elems ::= elems
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator
}
}.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
##The first partition contains 5,4,3,2,1
##The second partition contains 10,9,8,7,6
var rdd1 = sc. makeRDD(1 to 10,2)
rdd1.aggregate(1)(
{(x : Int,y : Int) => x + y},
{(a : Int,b : Int) => a + b}
)
res0: Int = 58
The result is 58, see the calculation process below:
##First execute iteratively in each partition (x : Int,y : Int) => x + y and use the value of zeroValue 1
## Namely: zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 in part_0
## part_1 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
##Merge the results of the two partitions again (a : Int,b : Int) => a + b, and use the value of zeroValue 1
## Namely: zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
Another example:
scala> rdd1.aggregate(2)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a * b}
| )
res18: Int = 1428
##This time zeroValue=2
In ##part_0 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
In ##part_1 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##Finally: zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
Therefore, zeroValue not only determines the type of U, but also has a crucial impact on the result, so pay special attention when using it.
fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold is a simplification of aggregate, using the same function op for seqOp and combOp in aggregate.
scala> rdd1.fold(1)(
| (x,y) => x + y
| )
res19: Int = 58
##The result is the same as the first example using aggregate above, namely:
scala> rdd1.aggregate(1)(
| {(x,y) => x + y},
| {(a,b) => a + b}
| )
res20: Int = 58
var rdd1 = sc. makeRDD(1 to 10,4)
rdd1.fold(3)(_+_)
res4: Int = 70
[(3+1+2+3)+(3+4+5+6)+(3+7+8)+(3+9)]+3 = 55+(4+1)*3 = 75 ; 4+1 means that the 4 partitions are calculated 4 times, and 3 is added each time; the final result of multiple partitions needs to be combined with 3
var rdd1 = sc. makeRDD(1 to 10,3)
rdd1.fold(5)(_+_)
res5: Int = 75 = 55+(3+1)*5
lookup
def lookup(key: K): Seq[V]
lookup is used for (K, V) type RDD, specify the K value, and return all the V values corresponding to the K in the RDD.
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),(" B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1. lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)
scala> rdd1. lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
From: http://lxw1234.com/archives/2015/07/394.htm
—–
reduce/fold/aggregate The three method operations are all aggregation operations on RDD.
foldByKey and aggregateByKey, fold and aggregate are similar in usage and function!
foldByKey is a simplification of aggregateByKey, and fold is a simplification of aggregate.
1. The reduce() and fold() methods operate on RDDs of the same element type data, that is, they must be isomorphic. Its return value returns a new element of the same type.
val nums = Array(1,2,3,4,5,6,7,8,9)
val numsRdd = sc. parallelize(nums,3)
val reduce = numsRdd. reduce((a,b) => a+b)
reduce: Int = 45
2. fold() is similar to reduce(). It receives a function with the same signature as the function received by reduce, plus an initial value as the result of the first call. (For example, the initial value of addition should be 0, and the initial value of multiplication should be 1)
val rdd = sc.makeRDD(List("a","a","b","b"),4)
val res = rdd.fold("")(_+_) //The result is not fixed
res: String = baab
or
res: String = abba
For specific cases, please refer to: Spark operators [10]: foldByKey, fold source code examples
3. The aggregate() method can aggregate two different types of elements, that is, it supports heterogeneity.
It first aggregates the elements in each partition, then returns all the results, and then aggregates with a given combine method and a given initial value zero value.