Pyspark looks like regular python code, but the distributed nature of the execution requires the whole new way of thinking to optimize the code.

PySpark execution logic and code optimization

PySpark looks like regular python code. In reality the distributed nature of the execution requires the whole new way of thinking to optimize the PySpark code.

This article will focus on understanding PySpark execution logic and performance optimization. PySpark DataFrames are in an important role.

To try PySpark on practice, get your hands dirty with this tutorial: Spark and Python tutorial for data developers in AWS

DataFrames in pandas as a PySpark prerequisite

PySpark needs totally different kind of engineering compared to regular Python code.

If you are going to work with PySpark DataFrames it is likely that you are familiar with the pandas Python library and its DataFrame class.

Here comes the first source of potential confusion: despite their similar names, PySpark DataFrames and pandas DataFrames behave very differently. It is also easy to confuse them in your code. You might want to use suffix like _pDF for pandas DataFrames and _sDF for Spark DataFrames.

The pandas DataFrame object stores all the data represented by the data frame within the memory space of the Python interpreter. All of the data is easily and immediately accessible. The operations on the data are executed immediately when the code is executed, line by line. It is easy to print intermediate results to debug the code.

However, these advantages are offset by the fact that you are limited by the local computer’s memory and processing power constraints – you can only handle data which fits into the local memory. But since the operations are done in memory, with a basic data processing task you do not need to wait more than a few minutes at maximum.

PySpark DataFrames and their execution logic

The PySpark DataFrame object is an interface to Spark’s DataFrame API and a Spark DataFrame within a Spark application. The data in the DataFrame is very likely to be somewhere else than the computer running the Python interpreter – e.g. on a remote Spark cluster running in the cloud.

There are two distinct kinds of operations on Spark DataFrames: transformations and actions. Transformations describe operations on the data, e.g. filtering a column by value, joining two DataFrames by key columns, or sorting data. Actions are operations which take DataFrame(s) as input and output something else. Some examples from action would be showing the contents of a DataFrame or writing a DataFrame to a file system.

The key point to understand how Spark works is that transformations are lazy. Executing a Python command which describes a transformation of a PySpark DataFrame to another does not actually require calculations to take place. Ordering by a column and calculating aggregate values, returning another PySpark DataFrame would be such transformation. Rather, the operation is added to the graph describing what Spark should eventually do.

When an action is requested – e.g. return the contents of this Spark DataFrame as a Pandas DataFrame – Spark looks at the processing graph and then optimizes the tasks which needs to be done. This is the job of the Catalyst optimizer, and it enables Spark to optimize the operations to very high degree.

Also, the actual computation tasks run on the Spark cluster, meaning that you can have huge amounts of memory and processing cores available for the actual computation, even without resorting to the top-of-the-line virtual machines offered by cloud providers.

Consider caching to speed up PySpark

However, the highly optimized and parallelized execution comes at a cost: it is not as easy to see what is going on at each step. Looking at the data after some transformations means that you have to gather the data, or its subset, to a single computer. This is an action, so Spark has to determine the computation graph, optimize it, and execute it.

If your dataset is large, this may take quite some time. This is especially true if caching is not enabled and Spark has to start by reading the input data from a remote source – such as a database cluster or cloud object storage like S3.

You can alleviate this by caching the DataFrame at some suitable point. Caching causes the DataFrame partitions to be retained on the executors and not be removed from memory or disk unless there is a pressing need. In practice this means that the cached version of the DataFrame is available quickly for further calculations. However, playing around with the data is still not as easy or quick as with pandas DataFrames.

Use small scripts and multiple environments in PySpark

As a rule of thumb, one PySpark script should perform just one well defined task. This is due to the fact that any action triggers the transformation plan execution from the beginning. Managing and debugging becomes a pain if the code has lots of actions.

The normal flow is to read the data, transform the data and write the data. Often the write stage is the only place where you need to execute an action. Instead of debugging in the middle of the code, you can review the output of the whole PySpark job.

With large amounts of data this approach would be slow. You would have to wait a long time to see the results after each job.

My suggestion is to create environments that have different sizes of data. In the environment with little data you test the business logic and syntax. The test cycle is rapid as there’s no need process gigabytes of data. Running the PySpark script with the full dataset reveals the performance problems.

This goes well together with the traditional dev, test, prod environment split.

Favor DataFrame over RDD with structured data

RDD (Resilient Distributed Dataset) can be any set of items. For example, a shopping list.

["apple", "milk", "bread"]

RDD is the low-level data representation in Spark, and in earlier versions of Spark it was also the only way to access and manipulate data. However, the DataFrame API was introduced as an abstraction on top of the RDD API. As a rule of thumb, unless you are doing something very involved (and you really know what you are doing!), stick with the DataFrame API.

DataFrame is a tabular structure: a collection of Columns, each of which has a well defined data type. If you have a description and amount for each item in the shopping list, then a DataFrame would do better.

+-------+-----------+------+
|product|description|amount|
+-------+-----------+------+
|apple  |green      |5     |
|milk   |skimmed    |2     |
|bread  |rye        |1     |
+-------+-----------+------+

This is also a very intuitive representation for structured data, something that can be found from a database table. PySpark DataFrames have their own methods for data manipulation just like pandas DataFrames have.

Avoid User Defined Functions in PySpark

As a beginner I thought PySpark DataFrames would integrate seamlessly to Python. That’s why I chose to use UDFs (User Defined Functions) to transform the data.

A UDF is simply a Python function which has been registered to Spark using PySpark’s spark.udf.register method.

With the small sample dataset it was relatively easy to get started with UDF functions. When running the PySpark script with more data, spark popped an OutOfMemory error.

Investigating the issue revealed that the code could not be optimized when using UDFs.  To Spark’s Catalyst optimizer, the UDF is a black box. This means that Spark may have to read in all of the input data, even though the data actually used by the UDF comes from a small fragments in the input I.e. doing data filtering at the data read step near the data, i.e. predicate pushdown, cannot be used.

Additionally, there is a performance penalty: on the Spark executors, where the actual computations take place, data has to be converted (serialized) in the Spark JVM to a format Python can read, a Python interpreter spun up, the data deserialized in the Python interpreter, the UDF executed, and the result serialized and deserialized again to the Spark JVM. All of this takes significant amounts of time!

The recommendation is to stay in native PySpark dataframe functions whenever possible, since they are translated directly to native Scala functions running on Spark.

If you absolutely, positively need to do something with UDFs in PySpark, consider using the pandas vectorized UDFs introduced in Spark 2.3 – the UDFs are still a black box to the optimizer, but at least the performance penalty of moving data between JVM and Python interpreter is lot smaller.

Number of partitions and partition size in PySpark

In order to process data in a parallel fashion on multiple compute nodes, Spark splits data into partitions, smaller data chunks. A DataFrame of 1,000,000 rows could be partitioned to 10 partitions having 100,000 rows each. Additionally, the computation jobs Spark runs are split into tasks, each task acting on a single data partition. Spark cluster has a driver that distributes the tasks to multiple executors. This means that the datasets can be much larger than fits into the memory of a single computer – as long as the partitions fit into the memory of the computers running the executors.

In one of the projects our team encountered an out-of-memory error that we spent a long time figuring out. Finally we found out that the problem was a result of too large partitions. The data in a partition could simply not fit to the memory of a single executor node.

Too few partitions also make the execution inefficient. Some of the executor cores idle while others are working on a full steam, if there are not as many partitions as there are available cores (or, technically, available slots)

However, having a large amount of small partitions is not optimal either – shuffling the data in the small partitions is inefficient. Also reading and writing to disk (not to mention a network destination) in small chunks potentially increases the total execution time.

The Spark programming guide recommends 128 MB partition size as the default. For 128 GB of data this would mean 1000 partitions. Without going too deep in the details, consider partitioning as a crucial part of the optimization toolbox. If your partitions are too large or too small, you can use the coalesce() and repartition() methods of DataFrame to instruct Spark to modify the partition distribution. The number of partitions in a DataFrame sDF can be checked with sDF.rdd.getNumPartitions().

Summary – PySpark basics and optimization

PySpark offers a versatile interface for using powerful Spark clusters, but it requires a completely different way of thinking and being aware of the differences of local and distributed execution models. The functionality offered by the core PySpark interface can be extended by creating User-Defined Functions (UDFs), but as a tradeoff the performance is not as good as for native PySpark functions due to lesser degree of optimization. Partitioning the data correctly and with a reasonable partition size is crucial for efficient execution – and as always, good planning is the key to success.

New call-to-action
Finnish stemming and lemmatization in python. See python code examples and try scripts yourself. This tutorial uses python 3.

Finnish stemming and lemmatization in python

Finnish stemming and lemmatization in python for text analytics.

There are plenty of options for natural language processing in English. For small languages like Finnish it is a different story. Not all solutions are easy to find.

In this blog I deal with stemming and lemmatization in Finnish language. Examples are written in python 3.6.

Difference between stemming and lemmatization

Transforming a word to a generalized format is helpful in many applications of text analysis. This is because words like cat and cats mean almost the same thing.

Lemmatization can be defined as converting words to their base forms. After the conversion, the different “versions” of a word such as cat, cats, cat’s or cats’ would all be simply cat.

Stemming is the other option to convert words to a general format. Stemming is not exactly the same operation as base form conversion as it goes deeper down to the structure and science of the language. More about stemming from Wikipedia.

Here is a simple example about the difference between lemmatization and stemming.

Original word Lemmatized word Stemmed word
Study Study Study
Studies Study Studi

More focus is put on lemmatization in this article. This is because Finnish lemmatization libraries were more difficult to find.

Finnish lemmatization with voikko python library

In the GitHub page Voikko describes the use cases for the library:

“Libvoikko provides spell checking, hyphenation, grammar checking and morphological analysis for Finnish language.”

It took some trial and error to find proper installation instructions for python. Instead of using python’s pip package installer, the following line worked for Linux users. For Windows users I recommend installing Ubuntu subsytem for Windows.

sudo apt -y install -y voikko-fi python-libvoikko

 

After installation the libvoikko library can be imported to python scripts as usual. Here is an example how to lemmatize a single Finnish word to its base form with python.

#Import the Voikko library
import libvoikko

#Define a Voikko class for Finnish
v = libvoikko.Voikko(u"fi")

#A word that might or might not be in base form
#Finnish word "kissoja" means "cats" in English
word = "kissoja"

#Analyze the word
voikko_dict = v.analyze(word)

#Extract the base form as
#analyze() function returns various info for the word
word_baseform = voikko_dict[0]['BASEFORM']

#Print the base form of the word
#This should print "kissa", which is "cat" in English
print(word_baseform)

 

Finnish sentence lemmatization in python

Often you would like to perform the base form conversion for a block of text or for a sentence. To achieve this you should first split the long text to list of words. The you can apply Voikko’s analyze() function for each of them. Word splitting is called word tokenization.

There are different ways of doing tokenization depending on your objective. Sometimes commas, dashes and upper case letters matter, sometimes not.

Python package nltk provides an English module for tokenization which works for Finnish in most cases. But instead, I wrote my own tokenization script to demonstrate base form conversion for multiple sentences.

#Import the Voikko library
import libvoikko

#Define a Voikko class for Finnish
v = libvoikko.Voikko(u"fi")

#Some Finnish text
txt = "Tähän jotain suomenkielistä tekstiä. Väärinkirjoitettu yhdys-sana, pahus."

#Pre-process the text
txt = txt.lower().replace(".", "").replace(",", "")

#Split to list by space character
word_list = txt.split(" ")

#Initialize a list for base form words
bf_list = []

#Loop all words in the list
for w in word_list:
  
  #Analyze the word with voikko
  voikko_dict = v.analyze(w)
  
  #Extract the base form, if the word is recognized
  if voikko_dict:
    bf_word = voikko_dict[0]['BASEFORM']
  #If word is not recognized, add the original word
  else:
    bf_word = w
  
  #Append to the list
  bf_list.append(bf_word)
  
#Print results
print("Original:")
print(word_list)
print("Lemmatized:")
print(bf_list)

 

Finnish stemming with python

The nltk package provides stemming for Finnish language here.

And here are some Finnish stemming examples.

#Import nltk Snowball stemmer
from nltk.stem.snowball import SnowballStemmer

#Create a Finnish instance
stemmer = SnowballStemmer("finnish")

#Print the stemmed version of some Finnish word
print(stemmer.stem("koiriemme"))

As you can see, the nltk stemmer is extremely easy to use. Antoher advanatage is, with very little code you can harness the same script for other languages.

Summary – Lemmatization and stemming in Finnish

This blog offered you simple and concrete examples to lemmatize and stem Finnish words in python. Hopefully this gets you started with your text mining project.

There is no absolute truth whether you should use stemming or lemmatization. One rule of thumb is that stemming captures more semantics than lemmatization. On the other hand lemmatization is easier to understand and generalizes more.

Now harness your creativity and try yourself!