RDD in PySpark

In this article I will explain the Resilient Distributed Datasets, RDD, in Spark with Python and PySpark. I explain how are built the pipelines in PySpark with the Transformations and Actions. Finally, I will show examples of code handling a simple numerical dataset.

Content

  • Spark
  • Resilient Distributed Dataset
  • Transformations & Actions
  • Hands-On
  • Accumulators
  • Broadcasts

Check this video for more details.

The code is available in the following repository.

Spark

Spark is one of the most used tools for distributed computing. When I must process large amount of data very fast, the best option is the horizontal scaling. Spark allows me to do so. I can use the RDDs, the Resilient Distributed Datasets, transformations and actions.

Let’s say I want to compute some numerical operations on a large dataset, I want to obtain the mean. But when distributing the dataset on several servers, the mean of each subset may not be exact. Each subset may not have the same weight or the same amount of data. How to do so?

Distributed Mean

Resilient Distributed Dataset

Let’s start with some concepts of Spark. RDD stands for Resilient Distributed Dataset. Spark can have multiple servers where the operations will be executed in parallel. This is the horizontal scaling. This is to obtain the result faster. As the computing is done on several servers, the dataset must be split into several servers too. Each server will handle just a portion of the original dataset. This stands for Distributed Dataset.

Now, if a single server fails, Spark knows which subset was lost and it’s able to inject the lost subset again into the remaining servers. Now that my initial dataset is split into several servers, each server can run the operations of my pipeline in parallel.

Transformations & Actions

What are the available operations? Transformations and actions. Each pipeline can have several transformations but always have a single action at the end. The dataset will travel from transformation to transformation until the last action. In fact the action operation is the one which will trigger the pipeline. If there is no action operation, nothing will be done. Not even the first transformation. I can chain different transformations. The output of the previous transformation will be the input of the next transformation. This way until the action.

RDD pipeline

What are the available transformations and actions? There are more than 20 transformations. Let’s see the most important.

  • Map will apply a single transformation to each individual element of the incoming dataset.
  • Filter will filter some incoming elements of the dataset.
  • Flatmap is similar to map with the inverse of filter. I can apply a map to the incoming element and generate a list of new elements. I can read a word and generate a list of letters as output.
  • With reduceByKey, I get as input a list of tuples and aggregate the values per keys. The length of the resulting output will be smaller as the keys won’t be repeated. And the aggregation operation must be given as input parameter.

There are more transformations, as union, intersect, join and more.

What about to the actions?

  • I can simply collect all the elements into a single list and return it.
  • I can count how many elements reach the end of the pipeline.
  • Or I can return the first element which reaches the end of the pipeline.
  • With forEach, I perform an operation on each element without returning anything. As printing the result or sending it somewhere.
  • With reduce, I aggregate each element into an accumulative result which will be returned, as the sum of all the elements.

And there are some other actions but I won’t list them all. The main goal of the RDD is that the transformations can be done in parallel. Spark will split the dataset into multiple subsets and each server will apply the transformations individually. And at the end, aggregate the result. This may sound difficult, and there are some important considerations that must be taken into account. But let’s see it with an example.

Hands-On

I have a big file with numerical values. I will perform some operation with those numbers. I will start by creating the Spark context.

app_name = "stats"
master = "local"

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

app_name is just the name of my Spark application and master is where the Spark cluster is located. To start, I will just use a cluster with a single local server which will be created when starting. Later in the article, I will create a cluster with multiple servers. Now from the context, I can create the pipeline’s operations. Let’s start with a small list of values.

distributed_data = sc.parallelize(list(range(10)))

With the parallelize method, I tell the cluster to prepare the subsets which will be distributed to the servers. And it returns me the pipeline object.

print(distributed_data.count())

Now, I’ve added an action. This makes the pipeline to be executed and returns the count result.

print(distributed_data.reduce(lambda a, b: a + b))

Here is another action. As said, the reduce will aggregate all the elements into a final one. This reduction just performs the sum of all the elements. I get the accumulator in the first element and the next item as the second element. I just performed the sum. Then, the result will be written as the accumulator for the next item. And so on. Let’s continue with some more complicated examples.

distributed_data = sc.parallelize([("Apple", 1), ("Pear", 2), ("Banana", 2), ("Apple", 4), ("Banana", 1)])
print(distributed_data.reduceByKey(lambda a, b: a+b).collect())
> [("Apple", 5), ("Pear", 2), ("Banana", 3)]

This time, I’ve used a list of tuples and the reducedByKey transformation. As the action used is collect, this will just transform the dataset into a list. This way, I can see the output of the reduceByKey transformation. I can see that the list of tuples was aggregated with the sum of the values.

print(distributed_data.reduceByKey(lambda a, b: a+b).collectAsMap())
> {"Apple": 5, "Pear": 2, "Banana": 3}

With collectAsMap, I obtained the same result but now in a dictionary.

print(distributed_data
    .flatMap(lambda a: [c for c in a[0]])
    .filter(lambda a: a.islower())
    .countByKey())
> {"p": 4, "l": 2, "e": 3, "a": 7, "r": 1, "n": 4}

Now I’ve used the flatmap to transform my tuple into a list. I select the first element of the tuple and create a list with the letters. Then, filter the lowercase letters and count how many distinct characters there are.

Accumulators

Ok this was simple transformations and actions I can do with Spark. The problem comes when the transformations are more complicated, the aggregation isn’t easy and it’s harder when the operations are done in separated servers. Let’s now use a big list and compute the count of the items.

app_name = "stats"
master = "local[5]"

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

distributed_data = sc.parallelize(list(range(10000000)))

acc = sc.accumulator(0)
distributed_data.foreach(lambda l: acc.add(l))
print(acc.value)

This time, I’ve run the pipeline on five servers in my localhost. And I’ve used a special object, the Accumulator. I can’t use a simple variable inside the pipeline operations as the variable can’t be shared between the servers. Instead, I have the Accumulator. This object can be shared between the servers and compute the sum when all the servers finish. This is fine to count the number of lines or to obtain the sum of the values. But what about the mean? I will first try to create my own accumulator to obtain the count, then I will adapt it to obtain the mean.

class CountAccumulator(AccumulatorParam):

    def __init__(self, initial_count=0):
        self.count = initial_count

    def zero(self, value):
        return CountAccumulator()

    def addInPlace(self, val1, val2):
        if isinstance(val1, CountAccumulator):
            new_count = val1.count
        elif val1 is None:
            new_count = 0
        else:
            new_count = 1

        if isinstance(val2, CountAccumulator):
            new_count += val2.count
        elif val2 is not None:
            new_count += 1
        return CountAccumulator(new_count)

Let’s take a look at this object. I must extend the AccumulatorParam class and implement two methods: zero and addInPlace. The zero method will create the empty object, the initial object. In my case, I will create CountAccumulator with the count value of zero. Then, in the addInPlace, I must handle the aggregation of two items. But what are the items? I may receive a zero element or one line of the input dataset. Depending on that, I will compute the count in a different way. And finally, return the accumulator object. Let’s now enrich this object to also compute the sum.

class MeanAccumulator(AccumulatorParam):

    def __init__(self, initial_count=0, initial_sum=0):
        self.count = initial_count
        self.sum = initial_sum

    def zero(self, value):
        return MeanAccumulator()

    def addInPlace(self, val1, val2):
        if isinstance(val1, MeanAccumulator):
            new_count = val1.count
            new_sum = val1.sum
        elif val1 is None:
            new_count = 0
            new_sum = 0
        else:
            new_count = 1
            new_sum = val1

        if isinstance(val2, MeanAccumulator):
            new_count += val2.count
            new_sum += val2.sum
        elif val2 is not None:
            new_count += 1
            new_sum += val2
        return MeanAccumulator(new_count, new_sum)

    @property
    def mean(self):
        if self.count == 0:
            return 0
        return self.sum / self.count

The code is very similar to before. I’ve added the mean method to compute the mean from the sum and the count. The accumulators are very important to store a value during the pipeline. The accumulator must be serializable to be sent through the servers. Nevertheless, I can’t read the value of the accumulator while running the pipeline. As the value may not be the same on all the servers. The accumulators are write only objects.

Broadcasts

But I have a read-only object, the broadcast. This one is used to share a static value between all the servers. Now, in the broadcast, I will store which column I want to operate from a big file of multiple columns of numbers.

broadcast_value = sc.broadcast({"column": 5, "op": ["count"]})
distributed_file = sc.textFile(get_available_big_filename())

print(distributed_file
    .map(lambda line: line.split(","))
    .filter(lambda line: "col" not in line[0])
    .map(lambda line: [int(c) for c in line])
    .map(lambda line: line[broadcast_value.value["column"]])
    .sum())

First of all, at line 5, I split each line into a list of columns. In the second transformation at line 6, I filter the header which contains the header titles. The map at line 7 transforms the string value of the column into an integer. Finally at line 8, I read the column given by broadcast variable to compute the sum.

Conclusion

  • Spark is a distributed computing application.
  • So, I must have a dataset which must be split into multiple subsets. Those are the RDDs, the Resilient Distributed Datasets.
  • I can initialize an RDD with the method parallelize.
  • I can chain multiple transformation operations, as map, flatmap, filter or reduce.
  • I must have a final action for the pipeline to be triggered, as count, sum, collect or foreach.
  • I have the Accumulators to share and aggregate values inside the pipeline between all the servers.
  • And I have the Broadcast to share read-only values with all the servers.

Links

Repository


Never Miss Another Tech Innovation

Concrete insights and actionable resources delivered straight to your inbox to boost your developer career.

My New ebook, Best Practices To Create A Backend With Spring Boot 3, is available now.

Best practices to create a backend with Spring Boot 3

Leave a comment

Discover more from The Dev World - Sergio Lema

Subscribe now to keep reading and get access to the full archive.

Continue reading