Home  >   Blog  >   Comparison of Running Time of Cached/Uncached Spark RDD

2017-04-16

Comparison of Running Time of Cached/Uncached Spark RDD

  Support by donation   Gift a cup of coffee


Resilient Distributed Dataset (RDD) is a distributed parallel data model in Spark. The model enables us to think of our distributed data like a single collection. In this article, I introduce some basics and show experimental result which clearly demonstrates the strength of RDD.

First and foremost, there are two different types of operations for RDD: transformation and action.

Type I: Transformation

Transformation corresponds to Scala transformers such as map() and filter(); we can apply both map() and filter() operations for RDDs in a similar way to the standard Scala collections.

In the Scala collections, this kind of operations return a new collection as:

scala> Seq(1, 2, 3).map(_ * 10)
res: Seq[Int] = List(10, 20, 30)

In RDDs, transformation similarly behaves; a new RDD will be returned as a result of transformation.

However, there is a huge difference between Scala collections and RDDs: result of transformation is NOT immediately computed. That is, transformation stands on laziness.

In case that we have a RDD which contains same values as the above example, rdd.map() does not return any "collections":

scala> val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:20

scala> rdd.map(_ * 10)
res: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:22

Note that, if you want to compute RDD and convert it to Scala collection, you can use collect() operation as follows.

scala> rdd.map(_ * 10).collect()
res: Array[Int] = Array(10, 20, 30)

Type II: Action

Action corresponds to Scala accessors e.g., fold() and aggregate(), and the usage is very similar to what Scala collections do.

Importantly, in contrast to transformations, actions return something like value.

scala> Seq(1, 2, 3).fold(0)(_ + _)
res: Int = 6
scala> val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:20

scala> rdd.fold(0)(_ + _)
res: Int = 6

It is obvious that actions are evaluated eager; result is immediately computed.

Uncached vs Cached

Even though you applied transformations for RDD, nothing happens until actions are kicked because transformations are lazy operations. This fact indicates that actions could be a bottleneck in code which handles RDDs.

In a context of machine learning, since iteration over large-scale data is necessary, same operations are repeatedly executed until convergence. To give an example, for an algorithm which iteratively runs rdd.filter().map().count(), all of three operations filter(), map() and count() are internally repeated. So, when size of data in RDD is huge, this iterations will be surprisingly slow.

In order to run such repeatedly-computed transformations more efficiently, RDD has a special operation named parallel() (or, persist()). If you define a RDD with the cache() operation, RDD internally caches the result of transformations, and hence iterative count() operations should be much faster than the uncached case.

val rddUncached = rdd.filter().map()
for (i <- 0 to n) { // inefficient :(
    // recompute `filter` -> `map` -> `count` in each time
    rddUncached.count()
}
val rddCached = rdd.filter().map().cache()
for (i <- 0 to n) { // efficient :)
    // result of `filter` -> `map` is internally cached
    rddCached.count()
}

Experiment

Let's finally check if cached RDD is truly faster than uncached one.

1) Define a Spark context and dummy data randomList which consists of 10k random integers:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("RDDTest")
val sc: SparkContext = new SparkContext(conf)

val rng: Random = new Random
val randomList: List[Int] = (for (i <- 1 to 10000) yield rng.nextInt).toList

2) Create a function which measures running time (cf. Easily measuring code execution time in Scala):

def time[R](block: => R): R = {
    val t0 = System.nanoTime()
    val result = block
    val t1 = System.nanoTime()
    println("Elapsed time: " + ((t1 - t0) / 1000000000.0) + " sec")
    result
}

3) Compare running time of the same operations over cached/uncached RDD:

for (i <- 1 to 3) { // try 3 times
    println("========")
    println("Uncached")
    println("========")
    time {
        val rdd: RDD[Int] = sc.parallelize(randomList).map(f)
        for (i <- 1 to 1000) rdd.count()
    }

    println("========")
    println("Cached")
    println("========")
    time {
        val rddCached: RDD[Int] = sc.parallelize(randomList).map(f).cache()
        for (i <- 1 to 1000) rddCached.count()
    }
}

Here, mapped function f is a dummy function which takes a long-time:

def f(v: Int): Int = {
    for (i <- 1 to 10000) {}
    v
}

Note: In view of unrelated optimization done by Scala itself, our code tries the same procedure three times.

Woo-hoo! As a consequence, we can observe that the cached RDD is much more efficient even for the same data and same number of iterations:

========
Uncached
========
Elapsed time: 40.727146166 sec
========
Cached
========
Elapsed time: 3.947885796 sec
========
Uncached
========
Elapsed time: 37.857984933 sec
========
Cached
========
Elapsed time: 3.149896662 sec
========
Uncached
========
Elapsed time: 35.778371576 sec
========
Cached
========
Elapsed time: 3.6294217 sec

Complete code is available at: RDDTest.scala.

Conclusion

Spark optimization basically relies on laziness, and caching transformation is one of the simplest and most effective ways to optimize your code utilizing RDDs.

The contents of this article is based on an online course "Big Data Analysis with Scala and Spark". This well-structured course is really interesting and exciting from a practical point of view :)

  Share


  Categories

Programming

  See also

2020-08-29
What I Think About When I Talk About ML Product
2020-05-26
Completed Functional Programming in Scala Specialization on Coursera, Finally
2019-03-24
Apache Hivemall in PySpark

  More

Last updated: 2022-09-02

  Author: Takuya Kitazawa

Takuya Kitazawa is a freelance software developer, previously working at a Big Tech and Silicon Valley-based start-up company where he wore multiple hats as a full-stack software developer, machine learning engineer, data scientist, and product manager. At the intersection of technological and social aspects of data-driven applications, he is passionate about promoting the ethical use of information technologies through his mentoring, business consultation, and public engagement activities. See CV for more information, or contact at [email protected].

  Support by donation   Gift a cup of coffee

  Disclaimer

  • Opinions are my own and do not represent the views of organizations I am/was belonging to.
  • I am doing my best to ensure the accuracy and fair use of the information. However, there might be some errors, outdated information, or biased subjective statements because the main purpose of this blog is to jot down my personal thoughts as soon as possible before conducting an extensive investigation. Visitors understand the limitations and rely on any information at their own risk.
  • That said, if there is any issue with the content, please contact me so I can take the necessary action.