How to Create RDD in Spark?
Resilient Distributed Datasets (RDD) being the fundamental data structure of Spark is immutable. It is also fault-tolerant in nature. They are distributed collections of objects. The datasets are divided into a logical partition. It further computes on different nodes over the cluster. So, RDD is just the way to represent dataset distributed across many machines. It can operate around in parallel. RDDs are resilient as they can always re-compute an RDD.
In Spark there are three ways to create an RDD:
- Parallelizing existing collection in driver program.
- Referencing dataset in an external storage system (e.g. HDFS, HBase, shared file system).
- Creating RDD from the RDDs that already exist.
1. Parallelized Collection (parallelizing)
During the learning phase of Spark, RDDs are created with parallelized collection i.e. by existing collection in the program and pass it to SparkContext’s parallelize() method. The method is useful in the initial stages of learning. This is because it creates our own RDDs in Spark shell and performs operations. It is less useful outside testing and prototyping since it requires entire data set on one machine.
Consider the following example of sortByKey-
val data=spark.sparkContext.parallelize(Seq(("maths",52),("English",75),("science",82),("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)
The key point here is the number of partition the data set is cut into. For each partition of the cluster, Spark runs one task. We then need two to four partitions for every CPU in the cluster. Based on the cluster, Spark sets a number of the partition. Also, we can manually set the number of partitions. This is done by passing a number of partition as the second parameter to parallelize.
e.g. _sc.parallelize(data, 10), _we have given a number of partition as 10.
2. External Datasets (Referencing a dataset)
In Spark, distributed dataset can be formed from any data source supported by Hadoop. For example the local file system, HDFS, Cassandra, HBase. The data here is loaded from an external dataset. To create text file RDD, we make use of SparkContext’s textfile method. It takes in the URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc.
The point of concern is that the path of the local file system and worker node should be same. The file should be at same place both in the local file system and worker node. We can copy the file to the workernodes or use a network mounted shared file system.
Using DataFrameReader Interface one can load a Dataset from external storage systems. For example file systems, key-value stores, etc. Use SparkSession.read to access an instance of DataFrameReader. DataFrameReader supports many file formats for example-
i) csv (String path)
It loads a CSV file and then returns the result as a Dataset<Row>.
For Example:
import org.apache.spark.sql.SparkSession
def main(args: Array[String]):Unit = {
object DataFormat {
val spark = SparkSession.builder.appName("AvgAnsTime").master("local").getOrCreate()
val dataRDD = spark.read.csv("path/of/csv/file").rdd
Note– .rdd method convert Dataset<Row> to RDD<Row>.
ii) json (String path)
It loads a JSON file (one object per line) and then returns the result as a Dataset<Row>
val dataRDD = spark.read.json("path/of/json/file").rdd
iii) textFile (String path)
It loads text files and then returns a Dataset of String.
val dataRDD = spark.read.textFile("path/of/text/file").rdd
3. Creating RDD from existing RDD
Using transformation one can convert RDD into another RDD. As a result, the transformation is a way to create an RDD from existing RDD. This thus creates a difference between Apache Spark and Hadoop MapReduce. Transformation is an operation that takes RDD as an input and produces back one RDD. Here the input RDD remains unchanged. This is because RDDs are immutable in nature but it produces one or more RDD by applying operations. For example, some of the operations applied on RDD are filter, count, distinct, Map, FlatMapetc.