Cover image from Azure Synapse

Azure Synapse Analytics – Unifying your data pipeline toolbox

This blog post will serve as an overview of the capabilities to give you insight on how to position Synapse Analytics to your overall data platform architecture. We won’t do a deep dive to technical implementation in this post.

Introduction

Azure SQL capabilities have evolved a lot over the years. Azure offers everything from VMs for running your own SQL Server setup, to SQL DB Hyperscale where you are getting your extremely scalable, but still traditional, DB as a service. But what about massively parallel processing data warehousing? Data integrations? Data science? What about limitless analytics? For that, there’s Azure Synapse Analytics.

In November 2019 Microsoft announced Azure Synapse as limitless analytics service and the next evolution of SQL Data Warehouse. Only thing released in GA was re-naming of SQL DW. Other synapse features were only available in limited private preview and for most setting up Synapse Analytics meant setting up former SQL DW. Nowadays Azure Synapse Analytics is a name for the whole analytics service with former SQL DW being part of it. While writing this blog the former SQL DW (also known as Synapse SQL provisioned) is still the only thing in GA, rest of the features are in public preview and anyone can set up Synapse Analytics workspace. Yes, the naming is confusing but hang on, we will try to clear that for you.

This blog post will serve as an overview of the capabilities to give you insight on how to position Synapse Analytics to your overall data platform architecture. We won’t do a deep dive to technical implementation in this post.

Synapse Analytics Architecture

Note: At the moment of writing this blog, Synapse Analytics can refer to both “Synapse Analytics (formerly SQL DW)” and “Synapse Analytics (workspace preview)” in Azure documentation. Here we are talking about unified experience in the workspace preview.

Azure Synapse Analytics is the common naming for integrated tooling providing everything from source system integration to relational databases to data science tooling to reporting data sets.  Synapse Analytics contains the following

"<yoastmark

  • Synapse SQL
    • This is the data warehouse part formerly known as Azure SQL DW.
    • Provides both serverless (SQL on-demand) and pre-allocated (SQL Pool) resources
    • Shares a common metastore with the Spark engine for seamless integration
  • Apache Spark
    • Seamlessly integrated big data engine.
    • Shares a common metastore with Synapse SQL
  • Data Flow and Integrations
    • Shares codebase with Azure Data Factory, so has everything you expect and more
    • Integrate to nearly a hundred data sources to ingest data
    • Orchestrate SQL Procedures and Spark Notebooks
  • Management and Monitoring
    • Familiar management and monitoring tools from Azure Data Factory are available.
  • Synapse Studio
    • Single web UI where you can create everything:
      • Integrate to source systems
      • Land data to Azure Data Lake
      • Explore the data using Spark notebooks
      • Load data to Synapse SQL using T-SQL scripts
      • Predict what needs predicting using Python, Scala C# or SQL in Spark
      • Publish data sets to Power BI
      • Manage and orchestrate everything with pipelines

In conclusion, Synapse Analytics refers to all of the capabilities available to you and not a single included tool. Synapse SQL, as the name suggests, is perhaps the most recognizable part with the SQL DW and T-SQL. However, Synapse Analytics also contains a serverless SQL form factor named SQL On-demand. So using T-SQL no longer requires a provisioned SQL Pool.

Synapse Analytics Unique value proposal

By unifying all of the tools mentioned above, Synapse Analytics really brings something new to the table. Using Synapse Analytics you can, without ever leaving the Synapse Studio, connect to a new on-premises data source, extract, load and transform that data to Data Lake and Synapse SQL, enrich it further with ML models, and provide it for reporting usage.

Synapse Analytics provides capabilities for each of the steps in a data pipeline: ingestion, preparing data, storing, exploring, transforming and serving the data:

Azure Synapse Analytics fit

Ingest

If you have previously used Azure Data Factory, you will be right at home using the data integration tools in Synapse Analytics. Synapse Analytics even shares the same codebase with Data Factory, so everything you have grown accustomed to is already there (almost everything, you can check the complete differences from https://docs.microsoft.com/en-us/azure/synapse-analytics/data-integration/concepts-data-factory-differences). You can use an Integration Runtime running inside your on-premises network to access all your data sources inside your network, or an Azure hosted one for massive scale. The list of natively supported systems is constantly growing, and you can find up-to-date information from https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview#supported-data-stores-and-formats. And if your source system is not on the list, you can always use a self hosted integration runtime with ODBC or JDBC drivers.

Despite their similarities, there’s currently no tooling to migrate ADF pipelines to Synapse Analytics Pipelines, and you can’t use the same Integration Runtime for both ADF and Synapse Analytics.

Another interesting preview feature is Azure Synapse Link. It allows you to run analytical workloads on top of operational data in Azure Cosmos DB near real time without affecting the operational usage. This means that application data in Cosmos DB is available for Synapse SQL and Spark using cloud native HTAP without building ETL workflows. https://docs.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link

Prepare

After connecting to your data source, you can extract your data to Azure. There are numerous ways to organize the data in the data lake. Generally, you’ll probably want to have the original untouched data in a “raw” area, and processed and more refined data in another area.

With the new COPY INTO T-SQL command, you can use gzipped csv format to reduce the storage footprint. If using csvs, it is advisable to split large files into smaller ones, depending on your DWU capacity. And finally, you don’t have to worry about the 1 MB row limit and hard coded separator values as with PolyBase.

To really leverage all the possibilities of Synapse Analytics, you should use the parquet format in your data lake. In addition to being a compressed format, using parquet supports predicate pushdown in Synapse SQL and Spark. This greatly speeds up your exploratory queries to data lake from Synapse SQL or Spark, as you don’t have to read all the files from a given folder structure to get the rows you want. With parquet, the COPY INTO command automatically splits your files to speed up the processing.

Using Data Flows to clean and unify your data

Data Flows provide a no-code approach to transforming your data. As integrations, Data Flows are also already familiar from Azure Data Factory. With Data Flows, you could for example combine a few columns, delete a redundant one, calculate a working unique identifier and join the data with another flow before storing back to Data Lake as a processed file and also persisting it to SQL Pool table.

Explore data

Synapse Studio gives you multiple options to explore your data. We can graphically view the data lake structure, and easily get a SQL script or Spark Notebook to view the file contents just by right clicking a file.

Select from data lake file

With SQL On-demand, you can explore the contents of files without moving or importing them anywhere, and generate simple graphs in Synapse Studio UI to give you an idea of what you are looking at.

Graph T-SQL results in Azure Synapse Analytics

Synapse SQL, both on-demand and provisioned can be connected outside Synapse Studio with different clients using application protocols like ODBC, JDBC and ADO.NET.

In Apache Spark (example in Scala), using data from the SQL Pool is as easy as:

%%spark
val df = spark.read.sqlanalytics("SQLPool.schema.table")

This doesn’t require any configurations on Spark’s side, as the Spark and SQL are just two different runtimes operating on the same metadata and data sources.

Transform and enrich data

For transforming and enrichment, Synapse Analytics offers Spark Notebooks in addition to T-SQL. However, it has already been pretty easy to add Databricks notebooks as a part of your Azure Data Factory pipelines. With Synapse Analytics, again this integration is a bit more ready-made and easier.

One interesting possibility is SQL On-demand and it’s external tables. SQL On-demand doesn’t get access to SQL Pool’s tables (as you don’t even need to provision a SQL Pool or Synapse SQL to use SQL On-demand), but you can create external tables using T-SQL. External tables are stored as parquet backed files in Azure Data Lake Storage Gen 2. Compared to the current Azure General Availability offering, moving towards SQL On-demand and external tables in your transformations, offers serverless and fully scalable architecture. What’s the performance of external tables and SQL On-demand is going to be like, remains to be seen.

Serve

Synapse Analytics offers a few new interesting features for serving enriched data: SQL On-demand and Power BI Service connection.

With SQL On-demand, it is possible to create a highly scalable data pipeline all the way from source systems to transforming and storing data to serving it to Power BI, without any predetermined service level or capacity choices:

  • Ingest and Transform data using Data Pipelines and Data Flow. Use SQL On-demand or Data Flow as a part of your pipeline where needed
  • Store the results with “CETAS”, a T-SQL command CREATE EXTERNAL TABLE AS SELECT to store the results of your final SELECT statement to Azure Data Lake Storage Gen 2
  • Create a data set in Power BI. You can either use the files directly or use Direct Query via SQL On-demand.

Lastly, Synapse Analytics has the capability to link to Power BI Service! You can create linked services with Power BI Workspaces, view datasets and also create Power BI reports without ever leaving Synapse Studio. This greatly simplifies the process of creating reports from new data.

Connect Synapse Analytics with Power BI workspace

Do note that at the moment, you can only link to ONE Power BI Workspace. After connecting to a Power BI Workspace, you can create new Power BI reports based on published data sets without leaving Synapse Studio.

Create Power BI Reports straight from Synapse Studio

Manage and orchestrate

For orchestrating your pipelines, Synapse Analytics offers pretty much the same tooling as Azure Data Factory. You can easily combine your Data Flows, Spark Notebooks, T-SQL queries and everything to form pipelines as in Azure Data Factory. The familiar triggers are also there.

Management views also share much of the same as Azure Data Factory.

Synapse Analytics excels

Synapse Analytics strengths lie in it’s unified and yet versatile tooling.

  • Starting a new data platform project
    • Synapse Analytics offers an unified experience creating ingestion, preparation, transformations and serving your data in one place
  • Architecture with Data Lake
    • Synapse Analytics’s unified tooling makes it easier to work with and explore Data Lake
  • Architecture with Cosmos DB (or other future possible Synapse Link sources)
    • Near-real-time analytics based on operative data sources without any manual ETL processes
  • Explorative work on unknown data
    • Many tools and languages in one place
  • Security management in Synapse Workspace
    • Instead of setting up and configuring multiple separate tools with authentications and networks between them, you have only the Synapse Analytics workspace to setup
  • Possibility for both no-code and code approaches
    • A suitable approach probably exists for many developers to get started
  • Synapse SQL Pool as an MPP database
    • Synapse SQL is a powerful database in it’s own right

What we wish to see in future updates

  • While Synapse SQL Pool does pack a punch, it does require dba work and manual maintenance. Maybe more automation regarding this in the future?
    • Different indexes, partitions and distributions will have an impact on performance and storage costs
    • Workload management: classification, importance, isolation all need to be planned for and addressed for full scale operations
  • More dynamic resource scaling
    • Scaling Synapse SQL is an offline operation
    • While SQL On-demand offers some exciting possibilities, it fails to deliver any database functionalities: no access to relational tables for example as it doesn’t require a SQL Pool. There are parquet backed external tables, but their performance remains to be seen
  • Development in the Spark-sector as Synapse’s Spark is not on par with Databricks
    • For example, one notebook locks one Spark pool
    • Different runtime versions compared to Databricks
  • Implementation of version control
    • There’s no GIT or any other version control system support. The only way to save work is to publish, which also makes it visible to all other developers.
    • Managing different environments without version control system requires the use of SDK or APIs, which means a lot of work to get production ready
  • Getting from preview to GA
    • There still are instabilities and UI issues. The usual Preview shenanigans which are hopefully taken care of before moving to general availability.

Conclusion

In conclusion, Synapse Analytics’s vision is a great one: unified tool and experience to create almost everything you require in Azure to get your data from a data source to a published report. At the moment, there are some drawbacks as the whole workspace experience is still in preview, of which the lack of version control is definitely not the smallest.

At this point, we haven’t made any performance analysis and it is too early to say, if Synapse Analytics can really be a silver bullet for combining data pipelines, warehousing and analytics. In addition to performance, some key unique features are somewhat handicapped at this point: for example you can only connect to one Power BI Workspace. The vision is definitely there, and we will be anxiously waiting for future updates.

Automatized Code Deployment from Azure DevOps to Databricks

Target audience are data practitioners looking for a method to practice DataOps with a simple method even in restricted environments. A walk-through of the code is detailed in the appendix.

The linked code repository contains a minimal setup to automatize infrastructure and code deployment simultaneously from Azure DevOps Git Repositories to Databricks.

TL;DR:

  1. Import the repo into a fresh Azure DevOps Project,
  2. get a secret access token from your Databricks Workspace,
  3. paste the token and the Databricks URL into a Azure DevOps Library’s variable group named “databricks_cli”,
  4. Create and run two pipelines referencing the YAML in the repo’s pipelines/ directory.
  5. Any Databricks compatible (Python, Scala, R) code pushed to the remote repository’s workspace/ directory will be copied to the Databricks workspace with an interactive cluster waiting to execute it.

Background

Azure DevOps and Databricks have one thing in common – providing industry standard technology and offering them as an intuitive, managed platform:

  • Databricks for running Apache Spark
  • DevOps for Git repos and build pipelines

Both platforms have much more to offer then what is used in this minimal integration example. DevOps offers wiki, bug-, task- and issue tracking, canban, scrum and workflow functionality among others.

Databricks is a fully managed and optimized Apache Spark PaaS. It can natively execute Scala, Python, PySpark, R, SparkR, SQL and Bash code; some cluster types have Tensorflow installed and configured (inclusive GPU drivers). Integration of the H2O machine learning platform is quite straight forward. In essence Databricks is a highly performant general purpose data science and engineering platform which tackles virtually any challenge in the Big Data universe.

Both have free tiers and a pay-as-you-go pricing model.

Databricks provides infrastructure as code. A few lines of JSON consistently deploy an optimized Apache Spark runtime.

After several projects and the increasing need to build and prototype in a managed and reproducible way the DevOps-Databricks combination became very appreciated: It enables quick and responsive interactive runtimes and provides best industry practice for software development and data engineering. Deployment into (scheduled), performant, resilient production environments is possible without changes to the platform and without any need for refactoring.

The core of the integration uses Databricks infrastructure-as-code (IaC) capability together with DevOps pipelines functionality to deploy any kind of code.

  1. the Databricks CLI facilitates programmatic access to Databricks and
  2. the managed Build Agents in DevOps deploy both infrastructure and analytic code.

Azure pipelines deploy both the infrastructure code and the notebook code from the repository to the Databricks workspace. This enables version control of both the runtime and the code in one compact, responsive repository.

All pieces of the integration are hosted in a single, compact repository which make all parts of a data and modeling pipeline fully reproducible.

Prerequisites

Log into Azure DevOps and Databricks Workspace. There are free tiers for both of them. Setup details are explained extensively in the canonical quick start sections of either service:

For the integration Databricks can be hosted in either the Azure or AWS cloud.

1. Import the Repository

To use this demo as a starting point for a new project, prepare a Azure DevOps project:

  • create a new project (with an empty repository by default)
  • select the repository tab and choose “Import a repository”
  • paste the URL of this demo into the Clone URL field: https://dev.azure.com/reinhardseifert/DatabricksDevOps/_git/DatabricksDevOps
  • wait for the import to complete
  • clone the newly imported repository to your local computer to start deploying your own code into the workspace directory

Then create two Azure pipelines which create the runtime and sync any code updates into it (see below).

2. Create Databricks Secret Token

Log into the Databricks Workspace and under User settings (icon in the top right corner) and select “Generate New Token”. Choose a descriptive name (“DevOps Build Agent Key”) and copy the token to a notebook or clipboard. The token is displayed just once – directly after creation; you can create as many tokens as you wish.

Databricks > User Settings > Create New Token

3. Add the token to the Azure DevOps Library

The Databricks Secret Token has to be added to a Variable Group named “databricks_cli”. Variable groups are created under Pipelines > Library. Note that the name of the variable group is referenced in both pipeline definitions (/pipelines/build-cluster.yml and /pipelines/build-workspace.yml). Two variables have to be defined: 1. databricks_host and 2. databricks_token

The variable names are referenced in the .yml file – changing them in the DevOps library requires also changing them correspondingly in the .yml files. When clicking the lock icon after defining the variable it is treated as a secret and not visible after that action in the DevOps project. Neither in the Library nor in the Build servers (even when accidentially echo-ing them. But of course writing them to the Databricks environment would potentially expose them. This is a security concern when collaborating with non-trusted parties on a Project.

Pipelines > Library > Add Variable Group

 

Azure DevOps

Generally the Azure DevOps portal offers as minimal functionality a git repository to maintain code and pipelines to deploy the code from the repository into runtimes.

Azure Repositories

The Azure repo contains the full logic of the integration:

  1. the actual (Python) code to run,
  2. the JSON specification of the Spark-cluster which will run the code,
  3. shell build scripts which are executed in the pipeline/ build server,
  4. the YAML configuration which define the pipelines.

The complete CI/CD pipeline is contained in a single Git repository in a very compact fashion. Following Databricks’ terminology the Python code (1) is located in the workspace/ directory. The runtime specification .json (2), build scripts .sh (3) and the pipeline configuration .yml (4) are located in the pipelines/ directory according to the Azure DevOps paradigm.

Azure Pipelines

The Pipelines menu provides the following functionality:

  • Pipelines (aka build pipelines),
  • Environments (needed to group Azure resources – not used here),
  • Releases (aka release pipelines – not used here)
  • Library (containing the variable groups)

The build pipelines exclusively used in this demo project are managed under the “Pipelines > Pipelines” menu tab – not really intuitive.

Azure Build Pipelines

The pipeline’s build agents are configured via YAML files (e.g. build-cluster.yml). In this case they install the Databricks CLI on the build machine and then execute CLI commands to create runtimes and move code notebooks to the runtime. The Databricks cluster is configured by a single JSON file (see config.cluster.json).

This minimal integration requires creation of two pipelines:

  1. cluster creation – referencing pipelines/build-cluster.yml and
  2. workspace synchronization – referencing /pipelines/build-workspace.yml

After importing the repo:

  • select the Pipelines > Pipelines menu tab
  • choose Azure Repos Git YAML
  • select the imported repository from the drop-down menu
  • select Existing Azure Pipeline YAML file
  • select the YAML file from the drop-down menu
  • Run the pipeline for the first time – or just save it and run it later.

At this point the Databricks secret access token mentioned in the prerequisite paragraph need to be present in a “databricks_cli” variable group. Otherwise the pipeline run will fail and warn; in this case just create the token (in Databricks) and the variable group (in DevOps) and re-run the pipeline.

After creating the pipelines and saving them (or running them initially), the default pipeline names reference the source repository name which triggers them. For easier monitoring the pipelines should be renamed according to their function, like “create-cluster” and “sync-workspace” in this case.

Summary

This concludes the integration of analytic code from an Azure DevOps repository into a hosted Databricks runtime.

Any change to the config.cluster.json deletes the existing cluster and creates a new one according to the specifications in the JSON file.

Any change to workspace/ will copy the notebook file(s) (R, Python, Scala) to the Databricks workspace for execution on the cluster.

The Databricks workspace in this example was hosted on Azure. Only minor changes are required to use an AWS hosted workspace. On all cloud platforms the host URL and security token is specific for the chosen instance and region. The cloud specific parameter is the node_type_id in the cluster configuration .json file.

Using this skeleton repo as a starting point, it is immediately possible to run interactive workloads on a performant Apacke Spark cloud cluster – instead of “cooking” the local laptop with analytic code – transparently maintained on a professional DevOps platform.

Appendix

Following, a detailed walk-through of the .yml pipeline configurations, .sh build scripts and .json configuration files.

In general, the YAML instructs the build server to 1. start up when a certain file is changed (trigger), 2. copy the contents of the repository to the build server and 3. execute a selection of shell scripts (tasks) from the repository

Pipeline: Create cluster

This is a detailed walk through for the build-cluster.yml pipeline. The .yml files have a hierachical structure and the full hierarchy of the DevOps build pipeline is included although stages could be omitted.

Trigger

The first section of the pipeline YAML specifies the trigger. Any changes to the specified branch of the linked repo will automatically run of the Build Agent.

trigger:
  branches:
    include:
    - master
  paths:
    include:
    - pipelines/config.cluster.json
    - pipelines/databricks-library-install.sh

Without the paths: section, any change to the master branch will run the pipeline. The cluster is rebuild when the configuration changes or the selection of installed Python- or R-libraries changes.

Stages

The stage can be omitted (for a single stage pipeline) and the pool, variables and jobs directly defined. Then the stage would be implicit. It is possible to add testing steps to the pipeline and build fully automated CI/CD pipelines accross environments within on .yml file.

stages:
- stage: "dev"
  displayName: "Development"
  dependsOn: []

Pool

  pool:
    vmImage: "ubuntu-latest"

Selects the type of virtual machine to start when the trigger files are changed. At the time of writing ubuntu_latest will start a Ubuntu 18.04 LTS image.

Variables

  variables:
    - group: databricks_cli

This section references the variable group created in the Prerequisite section. The secret token is transfered to the build server and authorizes the API calls from the server to the Databricks workspace.

Jobs, Steps and Tasks

A job is a sequence of steps which are executed on the build server (pool). In this pipeline only task steps are used (see the docs for all step operations).

  jobs:
    - job: CreateCluster4Dev
      steps:

        - task: UsePythonVersion@0
          inputs:
            versionSpec: "3.8"
            architecture: "x64"

The first step is selecting the Python version for all following Python command on the build server; the Databricks CLI is written in Python and installed via Pip in the following task.

Task: Install and configure the Databricks CLI

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cli-config.sh
            args: "\$(databricks_host) \$(databricks\_token)"
          displayName: "Install and configure the Databricks CLI"

Note that the path is relative to the root of the repo. The secret access token and host URL from the DevOps library are copied into environment variables which can be passed to the script in the args section.

The shell script executes the installation of the Databricks CLI and writes the neccessary CLI configuration on the build server.

python -m pip install databricks-cli
echo -e "[DEFAULT]\nhost: $HOST\ntoken: $TOKEN" > $HOME/.databrickscfg

Task: “Delete previous cluster version (if existing)”

This task will remove any cluster with the name provided in the args: section. This allows for updating the cluster when the configuration file is changed. When no such cluster is present the script will fail. Usually the pipeline will break at this point but here continueOnError is true, so the pipeline will continue when creating a cluster for the first time.

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cluster-delete.sh
            args: "HelloCluster"
          continueOnError: "true"
          displayName: "Delete previous cluster version (if existing)"

The shell script called by this task is a wrapper around the Databricks CLI. First it queries for the cluster-id of any cluster with the name passed.

CLUSTER_ID=$(databricks clusters list --output json | jq -r --arg N "$CLUSTER_NAME" '.clusters[] | select(.cluster_name == $N) | .cluster_id')

It is possible to create multiple clusters with the same name. In case there are multiple all of them are deleted.

for ID in $CLUSTER_ID
do
    echo "Deleting $ID"
    databricks clusters permanent-delete --cluster-id $ID
done

Task: Create new cluster

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cluster-create.sh
            args: "HelloCluster"
          displayName: "Create new cluster"

The build script reads the config.cluster.json and adds the cluster name passed from the pipeline .yml

cat config.cluster.json | sed "s/CLUSTER_NAME/$CLUSTER_NAME/g" > /tmp/conf.json

Now the configuration .json file can be passed to the Databricks CLI. The complete Apache Spark infrastructure is configured in the json. CLUSTER_NAME will be replaced with the name passed from the .yml.

{
    "cluster_name": "CLUSTER_NAME",
    "spark_version": "6.0.x-scala2.11",
    "spark_conf": {
        "spark.sql.execution.arrow.enabled": "true"
    },
    "node_type_id": "Standard_DS3_v2",
    "num_workers": 1,
    "ssh_public_keys": [],
    "custom_tags": {
        "Project": "DevOpsIntegration"
    },
    "cluster_log_conf": {
        "dbfs": {
            "destination": "dbfs:/cluster_logs"
        }
    },
    "spark_env_vars": {
        "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    },
    "autotermination_minutes": 120,
    "enable_elastic_disk": false,
    "init_scripts": []
}

Updating the runtime to another version requires only modifying the spark_version parameter with any supported runtime.

A Spark cluster consists of one driver node and a number of worker nodes and can be scaled horizontally by adding nodes (num_workers) or vertically by choosing larger node types. The node types are cloud provider specific. The Standard_DS3_v2 node type id references the minimal Azure node.

The autotermination feature shuts the cluster down when not in use. Costs are billed per second up time per processing unit.

Any reconfigurations triggers the pipeline and rebuilds the cluster.

CLUSTER_ID=$(databricks clusters create --json-file /tmp/conf.json | jq -r '.cluster_id')

The cluster create call returns the cluster-id of the newly created instance. Since the last step of this pipeline installs additional Python and R libraries (via Pip and CRAN respectively) it is necessary to wait for the cluster to be in pending state.

STATE=$(databricks clusters list --output json | jq -r --arg I "$CLUSTER_ID" '.clusters[] | select(.cluster_id == $I) | .state')

echo "Wait for cluster to be PENDING"
while [[ "$STATE" != "PENDING" ]]
do
    STATE=$(databricks clusters list --output json | jq -r --arg I "$CLUSTER_ID" '.clusters[] | select(.cluster_name == $I) | .state')
done

Task: Install Python and R dependencies on the cluster

The final step is to add additional Python and R packages to the cluster. There are many ways to install packges in Databricks. This is just one way to do it.

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-library-install.sh
            args: "HelloCluster"
          displayName: "Install Python and R dependencies"

Again the shell script wraps the Databricks CLI, here the library install command. The cluster name (“DemoCluster” in this example) has to be passed again.

All CLI calls to Databricks need the cluster-id to delete, create and manupulate instances. So first fetch it with a cluster list call:

CLUSTER_ID=$(databricks clusters list --output json | jq -r --arg N "$CLUSTER_NAME" '.clusters[] | select(.cluster_name == $N) | .cluster_id')

Then install the packages – one call to library install per package:

databricks libraries install --cluster-id $CLUSTER_ID --pypi-package azure
databricks libraries install --cluster-id $CLUSTER_ID --pypi-package googlemaps
databricks libraries install --cluster-id $CLUSTER_ID --pypi-package python-tds
databricks libraries install --cluster-id $CLUSTER_ID --cran-package tidyverse

For additional Python or R package add a line in this build script – this will trigger the pipeline and the cluster is rebuild.

Pipeline: Import workspace

This is a detailed walk through for the build-workspace.yml pipeline. The first part of the pipeline is identical to the build-cluster.yml pipeline. The trigger include differs, since this pipeline is triggered by code pushes to the workspace/ directory. The choice of the build server (pool), the variable reference to the databricks_cli variable group for the Databricks access tokens and the Python version task are identical, also installing and configuring the Databricks CLI with the same build script as above.

The only build task is importing all files in the workspace/ directory to the Databricks Workspace. The args passes a sub-directory name for the /Shared/ folder in Databricks ( /Shared/HelloWorkspace/ in the example).

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-workspace-import.sh
            args: "HelloWorkspace"
          displayName: "Import updated notebooks to workspace to dev"

The specified directory is first deleted. When the directory does not exist, the CLI prints and error in JSON format, but does not break the pipeline. The args: parameter is passed to the $SUBDIR variable in the build script.

databricks workspace delete --recursive /Shared/$SUBDIR

Then the script files in the workspace/ folder of the master branch are copied into the Databricks workspace.

databricks workspace import_dir ../workspace /Shared/$SUBDIR/

Remember that the repo is copied into the pipeline build agent/server and the working directory of the pipeline agent points to the location of the .yml file which defines the pipeline.

Pyspark looks like regular python code, but the distributed nature of the execution requires the whole new way of thinking to optimize the code.

PySpark execution logic and code optimization

PySpark looks like regular python code. In reality the distributed nature of the execution requires the whole new way of thinking to optimize the PySpark code.

This article will focus on understanding PySpark execution logic and performance optimization. PySpark DataFrames are in an important role.

To try PySpark on practice, get your hands dirty with this tutorial: Spark and Python tutorial for data developers in AWS

DataFrames in pandas as a PySpark prerequisite

PySpark needs totally different kind of engineering compared to regular Python code.

If you are going to work with PySpark DataFrames it is likely that you are familiar with the pandas Python library and its DataFrame class.

Here comes the first source of potential confusion: despite their similar names, PySpark DataFrames and pandas DataFrames behave very differently. It is also easy to confuse them in your code. You might want to use suffix like _pDF for pandas DataFrames and _sDF for Spark DataFrames.

The pandas DataFrame object stores all the data represented by the data frame within the memory space of the Python interpreter. All of the data is easily and immediately accessible. The operations on the data are executed immediately when the code is executed, line by line. It is easy to print intermediate results to debug the code.

However, these advantages are offset by the fact that you are limited by the local computer’s memory and processing power constraints – you can only handle data which fits into the local memory. But since the operations are done in memory, with a basic data processing task you do not need to wait more than a few minutes at maximum.

PySpark DataFrames and their execution logic

The PySpark DataFrame object is an interface to Spark’s DataFrame API and a Spark DataFrame within a Spark application. The data in the DataFrame is very likely to be somewhere else than the computer running the Python interpreter – e.g. on a remote Spark cluster running in the cloud.

There are two distinct kinds of operations on Spark DataFrames: transformations and actions. Transformations describe operations on the data, e.g. filtering a column by value, joining two DataFrames by key columns, or sorting data. Actions are operations which take DataFrame(s) as input and output something else. Some examples from action would be showing the contents of a DataFrame or writing a DataFrame to a file system.

The key point to understand how Spark works is that transformations are lazy. Executing a Python command which describes a transformation of a PySpark DataFrame to another does not actually require calculations to take place. Ordering by a column and calculating aggregate values, returning another PySpark DataFrame would be such transformation. Rather, the operation is added to the graph describing what Spark should eventually do.

When an action is requested – e.g. return the contents of this Spark DataFrame as a Pandas DataFrame – Spark looks at the processing graph and then optimizes the tasks which needs to be done. This is the job of the Catalyst optimizer, and it enables Spark to optimize the operations to very high degree.

Also, the actual computation tasks run on the Spark cluster, meaning that you can have huge amounts of memory and processing cores available for the actual computation, even without resorting to the top-of-the-line virtual machines offered by cloud providers.

Consider caching to speed up PySpark

However, the highly optimized and parallelized execution comes at a cost: it is not as easy to see what is going on at each step. Looking at the data after some transformations means that you have to gather the data, or its subset, to a single computer. This is an action, so Spark has to determine the computation graph, optimize it, and execute it.

If your dataset is large, this may take quite some time. This is especially true if caching is not enabled and Spark has to start by reading the input data from a remote source – such as a database cluster or cloud object storage like S3.

You can alleviate this by caching the DataFrame at some suitable point. Caching causes the DataFrame partitions to be retained on the executors and not be removed from memory or disk unless there is a pressing need. In practice this means that the cached version of the DataFrame is available quickly for further calculations. However, playing around with the data is still not as easy or quick as with pandas DataFrames.

Use small scripts and multiple environments in PySpark

As a rule of thumb, one PySpark script should perform just one well defined task. This is due to the fact that any action triggers the transformation plan execution from the beginning. Managing and debugging becomes a pain if the code has lots of actions.

The normal flow is to read the data, transform the data and write the data. Often the write stage is the only place where you need to execute an action. Instead of debugging in the middle of the code, you can review the output of the whole PySpark job.

With large amounts of data this approach would be slow. You would have to wait a long time to see the results after each job.

My suggestion is to create environments that have different sizes of data. In the environment with little data you test the business logic and syntax. The test cycle is rapid as there’s no need process gigabytes of data. Running the PySpark script with the full dataset reveals the performance problems.

This goes well together with the traditional dev, test, prod environment split.

Favor DataFrame over RDD with structured data

RDD (Resilient Distributed Dataset) can be any set of items. For example, a shopping list.

["apple", "milk", "bread"]

RDD is the low-level data representation in Spark, and in earlier versions of Spark it was also the only way to access and manipulate data. However, the DataFrame API was introduced as an abstraction on top of the RDD API. As a rule of thumb, unless you are doing something very involved (and you really know what you are doing!), stick with the DataFrame API.

DataFrame is a tabular structure: a collection of Columns, each of which has a well defined data type. If you have a description and amount for each item in the shopping list, then a DataFrame would do better.

+-------+-----------+------+
|product|description|amount|
+-------+-----------+------+
|apple  |green      |5     |
|milk   |skimmed    |2     |
|bread  |rye        |1     |
+-------+-----------+------+

This is also a very intuitive representation for structured data, something that can be found from a database table. PySpark DataFrames have their own methods for data manipulation just like pandas DataFrames have.

Avoid User Defined Functions in PySpark

As a beginner I thought PySpark DataFrames would integrate seamlessly to Python. That’s why I chose to use UDFs (User Defined Functions) to transform the data.

A UDF is simply a Python function which has been registered to Spark using PySpark’s spark.udf.register method.

With the small sample dataset it was relatively easy to get started with UDF functions. When running the PySpark script with more data, spark popped an OutOfMemory error.

Investigating the issue revealed that the code could not be optimized when using UDFs.  To Spark’s Catalyst optimizer, the UDF is a black box. This means that Spark may have to read in all of the input data, even though the data actually used by the UDF comes from a small fragments in the input I.e. doing data filtering at the data read step near the data, i.e. predicate pushdown, cannot be used.

Additionally, there is a performance penalty: on the Spark executors, where the actual computations take place, data has to be converted (serialized) in the Spark JVM to a format Python can read, a Python interpreter spun up, the data deserialized in the Python interpreter, the UDF executed, and the result serialized and deserialized again to the Spark JVM. All of this takes significant amounts of time!

The recommendation is to stay in native PySpark dataframe functions whenever possible, since they are translated directly to native Scala functions running on Spark.

If you absolutely, positively need to do something with UDFs in PySpark, consider using the pandas vectorized UDFs introduced in Spark 2.3 – the UDFs are still a black box to the optimizer, but at least the performance penalty of moving data between JVM and Python interpreter is lot smaller.

Number of partitions and partition size in PySpark

In order to process data in a parallel fashion on multiple compute nodes, Spark splits data into partitions, smaller data chunks. A DataFrame of 1,000,000 rows could be partitioned to 10 partitions having 100,000 rows each. Additionally, the computation jobs Spark runs are split into tasks, each task acting on a single data partition. Spark cluster has a driver that distributes the tasks to multiple executors. This means that the datasets can be much larger than fits into the memory of a single computer – as long as the partitions fit into the memory of the computers running the executors.

In one of the projects our team encountered an out-of-memory error that we spent a long time figuring out. Finally we found out that the problem was a result of too large partitions. The data in a partition could simply not fit to the memory of a single executor node.

Too few partitions also make the execution inefficient. Some of the executor cores idle while others are working on a full steam, if there are not as many partitions as there are available cores (or, technically, available slots)

However, having a large amount of small partitions is not optimal either – shuffling the data in the small partitions is inefficient. Also reading and writing to disk (not to mention a network destination) in small chunks potentially increases the total execution time.

The Spark programming guide recommends 128 MB partition size as the default. For 128 GB of data this would mean 1000 partitions. Without going too deep in the details, consider partitioning as a crucial part of the optimization toolbox. If your partitions are too large or too small, you can use the coalesce() and repartition() methods of DataFrame to instruct Spark to modify the partition distribution. The number of partitions in a DataFrame sDF can be checked with sDF.rdd.getNumPartitions().

Summary – PySpark basics and optimization

PySpark offers a versatile interface for using powerful Spark clusters, but it requires a completely different way of thinking and being aware of the differences of local and distributed execution models. The functionality offered by the core PySpark interface can be extended by creating User-Defined Functions (UDFs), but as a tradeoff the performance is not as good as for native PySpark functions due to lesser degree of optimization. Partitioning the data correctly and with a reasonable partition size is crucial for efficient execution – and as always, good planning is the key to success.

New call-to-action