RDD Transformation
Spark Transformation creates new RDD from the already existing RDDs. This function takes RDD as input and gives one or more RDD as output. Every time on encountering transformation it creates new RDD. As RDDs are immutable in nature, we cannot alter RDD.
RDD lineage is formed on applying the transformation, with the entire parent RDDs of the final RDD(s). RDD lineage are also called RDD operator graph or RDD dependency graph. RDD lineage is a logical execution plan meaning it is directed acyclic graph(DAG) of the entire parent RDDs of RDD.
As transformations are lazy they execute upon introduction of action operation rather than being executed immediately. Two most general type of transformations is a map(), and filter().
The resultant RDD after transformation is always different from its parent RDD. The final RDD can be smaller (e.g.filter, count, distinct, sample), bigger (e.g.flatMap, union, Cartesian) or of the same size (e.g.map).
There are two types of transformations:
- Narrow Transformation
In Narrow transformation, all the elements that compute the records in a single partition reside in the single partition of parent RDD. A limited subset of partition calculates the result.It is the result of the map(), and filter() operation.
- Wide Transformation
In wide transformation, all the elements that compute the records in the single partition reside in many partitions of parent RDD. It is the result of groupbyKey and reducebyKey.
The RDD Transformations with the Scala example are explained below-
1. map(func)
The map function iterates over each line and divides into new RDD. Withmap()transformation we pass any function, and that function applies on every element of RDD.
In the map function, the input and the return type of RDD may be different from one other. For example, we can have input RDD type as integer, after applying the function map() the RDD so returned can be double.
Consider an example: in RDD {1, 2, 3, 4, 5} if we apply “rdd.map(x=>x+10)” we will get the result as (11, 12, 13, 14, 15).
Map() example:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate()
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}
- Note – In above code map() function if used, which map each line of the file with its length.
2. flatMap()
With flatMap() function, for each input element, there are elements in resultant RDD. To split each input string into words is the simplest use of flatMap() function.
Map and flatMap are alike as they take a line from input RDD and apply a function on that line. The key difference lies in the fact that map() function returns only single element, while flatMap() can return a list of elements.
flatMap() example:
val data = spark.read.textFile("spark_test.txt").rdd
val flatmapFile = data.flatMap(lines => lines.split(""))
flatmapFile.foreach(println)
- Note – As shown here flatMap() function splits every line when space occurs.
3. filter(func)
In Apache Spark, RDD the filter() function returns new RDD, that contains only the element that meets a predicate. It is an arrow operation as it does not shuffle data from one partition to many partitions.
For Example: Suppose RDD contains first five natural numbers (1, 2, 3, 4, and 5) and the predicate is check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.
Filter() example:
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split("")).filter(value => value=="spark")
println(mapFile.count())
- Note – Here flatMap function maps line into words and count the word “Spark” using count() Action after filtering the lines containing “Spark” from mapFile.
4. mapPartitions(func)
This function transforms every partition of the source RDD into many different elements of the resultant (possibly none). In mapPartition(), the map() function applies to every partitions at the same time. MapPartition is like a map, but the difference is it runs separately on each partition(block) of the RDD.
5. mapPartitionWithIndex()
It is same as mapPartition; Along with mapPartition, it provides func with an integer value that represents the index of the partition. The map() operation applies on the partition, index-wise one after the other.
6. union(dataset)
Using this function, we can obtain the elements of both the RDD in the new RDD. The only requirement is the two RDDs should be of the same type.
For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) therefore the resultant rdd1.union(rdd2) will have elements (Spark, Spark, Spark, Hadoop, Flink, Flink, Big data).
Union() example:
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(17,"sep",2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6,"dec",2011),(16,"may",2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(Println)
- Note – Here union() operation will return a new dataset which contains the union of the elements in the source dataset (rdd1) and the argument (rdd2 & rdd3).
7. intersection(other-dataset)
With this function, we get the common element of both the RDD in new RDD. The only requirement is that the two RDDs should be of the same type.
For example, the elements of RDD1 are (Spark, Spark, Hadoop, Flink) and that of RDD2 are (Big data, Spark, Flink) therefore the resultant rdd1.intersection(rdd2) will have elements (Spark, Flink).
Intersection() example:
val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014, (16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(1,"jan",2016)))
val comman = rdd1.intersection(rdd2)
comman.foreach(Println)
- Note – Here the intersection() operation will return a new RDD.
8. distinct()
This function will return a new dataset which contains the distinct elements of source dataset. It is helpful to remove the twin data.
Example, if RDD contains (Spark, Spark, Hadoop, Flink), rdd.distinct() will give (Spark, Hadoop, Flink).
Distinct() example:
val rdd1 =park.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014),(3,"nov",2014)))
val result = rdd1.distinct()
println(result.collect().mkString(", "))
- Note – Here, the distinct function will remove the twin records (3,'”nov”,2014).
9. groupByKey()
On applying groupByKey() on (K, V) pairs, the data is shuffled in accordance to the key value K in another RDD. In this transformation, lots of unnecessary data transfer over the network.
Spark provides the provision to save data to disk when there is more data shuffled onto a single executor machine than can fit in memory.
groupByKey() example:
val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)
- Note – The groupByKey() will group the integers on the basis of same key(alphabet). After that collect() action will return all the elements of the dataset as an Array.
10. reduceByKey(func, [numTasks])
When we use reduceByKey on a dataset (K, V), before the shuffling of data, the pairs on the same machine with the same key are combined.
reduceByKey() example:
val words = Array("one","two","two","four","five","six","six","eight","nine","ten")
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)
- Note – The code written above will parallelize the Array of String. It will then map each word with count 1, then reduceByKey will merge the count of values having the similar key.
11. sortByKey()
On calling sortByKey() on a dataset of (K, V) pairs, the data gets sort according to the key K in another RDD.
sortByKey() example:
val data = spark.sparkContext.parallelize(Seq(("maths",52),("english",75),("science",82),("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)
- Note – Here sortByKey() transformation sort the data RDD into Ascending order of the Key(String).
12. join()
The Join is database term, which combines fields from two table with the help of common values. join() operation is defined on pair-wise RDD. RDD in which each element is in the form of tuples is Pair-wise RDDs. Here the first element is key and the second element is the value.
The boon of using keyed data is that we can combine the data together. The join() operation is performed to combine two data sets on the basis of the key.
Join() example:
val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))
- Note – The operation join() transformation will join two different RDDs based on Key.
13. coalesce()
To avoid full shuffling we use coalesce() function. Here we make use of existing partition so that less data shuffling takes place. With the help of this we can cut the number of the partition. If we own four nodes but require only two nodes in such case the data of extra nodes will be kept onto nodes which we kept.
Coalesce() example:
val rdd1 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)