Cover image from Azure Synapse

Azure Synapse Analytics – Unifying your data pipeline toolbox

This blog post will serve as an overview of the capabilities to give you insight on how to position Synapse Analytics to your overall data platform architecture. We won’t do a deep dive to technical implementation in this post.

Introduction

Azure SQL capabilities have evolved a lot over the years. Azure offers everything from VMs for running your own SQL Server setup, to SQL DB Hyperscale where you are getting your extremely scalable, but still traditional, DB as a service. But what about massively parallel processing data warehousing? Data integrations? Data science? What about limitless analytics? For that, there’s Azure Synapse Analytics.

In November 2019 Microsoft announced Azure Synapse as limitless analytics service and the next evolution of SQL Data Warehouse. Only thing released in GA was re-naming of SQL DW. Other synapse features were only available in limited private preview and for most setting up Synapse Analytics meant setting up former SQL DW. Nowadays Azure Synapse Analytics is a name for the whole analytics service with former SQL DW being part of it. While writing this blog the former SQL DW (also known as Synapse SQL provisioned) is still the only thing in GA, rest of the features are in public preview and anyone can set up Synapse Analytics workspace. Yes, the naming is confusing but hang on, we will try to clear that for you.

This blog post will serve as an overview of the capabilities to give you insight on how to position Synapse Analytics to your overall data platform architecture. We won’t do a deep dive to technical implementation in this post.

Synapse Analytics Architecture

Note: At the moment of writing this blog, Synapse Analytics can refer to both “Synapse Analytics (formerly SQL DW)” and “Synapse Analytics (workspace preview)” in Azure documentation. Here we are talking about unified experience in the workspace preview.

Azure Synapse Analytics is the common naming for integrated tooling providing everything from source system integration to relational databases to data science tooling to reporting data sets.  Synapse Analytics contains the following

"<yoastmark

  • Synapse SQL
    • This is the data warehouse part formerly known as Azure SQL DW.
    • Provides both serverless (SQL on-demand) and pre-allocated (SQL Pool) resources
    • Shares a common metastore with the Spark engine for seamless integration
  • Apache Spark
    • Seamlessly integrated big data engine.
    • Shares a common metastore with Synapse SQL
  • Data Flow and Integrations
    • Shares codebase with Azure Data Factory, so has everything you expect and more
    • Integrate to nearly a hundred data sources to ingest data
    • Orchestrate SQL Procedures and Spark Notebooks
  • Management and Monitoring
    • Familiar management and monitoring tools from Azure Data Factory are available.
  • Synapse Studio
    • Single web UI where you can create everything:
      • Integrate to source systems
      • Land data to Azure Data Lake
      • Explore the data using Spark notebooks
      • Load data to Synapse SQL using T-SQL scripts
      • Predict what needs predicting using Python, Scala C# or SQL in Spark
      • Publish data sets to Power BI
      • Manage and orchestrate everything with pipelines

In conclusion, Synapse Analytics refers to all of the capabilities available to you and not a single included tool. Synapse SQL, as the name suggests, is perhaps the most recognizable part with the SQL DW and T-SQL. However, Synapse Analytics also contains a serverless SQL form factor named SQL On-demand. So using T-SQL no longer requires a provisioned SQL Pool.

Synapse Analytics Unique value proposal

By unifying all of the tools mentioned above, Synapse Analytics really brings something new to the table. Using Synapse Analytics you can, without ever leaving the Synapse Studio, connect to a new on-premises data source, extract, load and transform that data to Data Lake and Synapse SQL, enrich it further with ML models, and provide it for reporting usage.

Synapse Analytics provides capabilities for each of the steps in a data pipeline: ingestion, preparing data, storing, exploring, transforming and serving the data:

Azure Synapse Analytics fit

Ingest

If you have previously used Azure Data Factory, you will be right at home using the data integration tools in Synapse Analytics. Synapse Analytics even shares the same codebase with Data Factory, so everything you have grown accustomed to is already there (almost everything, you can check the complete differences from https://docs.microsoft.com/en-us/azure/synapse-analytics/data-integration/concepts-data-factory-differences). You can use an Integration Runtime running inside your on-premises network to access all your data sources inside your network, or an Azure hosted one for massive scale. The list of natively supported systems is constantly growing, and you can find up-to-date information from https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview#supported-data-stores-and-formats. And if your source system is not on the list, you can always use a self hosted integration runtime with ODBC or JDBC drivers.

Despite their similarities, there’s currently no tooling to migrate ADF pipelines to Synapse Analytics Pipelines, and you can’t use the same Integration Runtime for both ADF and Synapse Analytics.

Another interesting preview feature is Azure Synapse Link. It allows you to run analytical workloads on top of operational data in Azure Cosmos DB near real time without affecting the operational usage. This means that application data in Cosmos DB is available for Synapse SQL and Spark using cloud native HTAP without building ETL workflows. https://docs.microsoft.com/en-us/azure/cosmos-db/configure-synapse-link

Prepare

After connecting to your data source, you can extract your data to Azure. There are numerous ways to organize the data in the data lake. Generally, you’ll probably want to have the original untouched data in a “raw” area, and processed and more refined data in another area.

With the new COPY INTO T-SQL command, you can use gzipped csv format to reduce the storage footprint. If using csvs, it is advisable to split large files into smaller ones, depending on your DWU capacity. And finally, you don’t have to worry about the 1 MB row limit and hard coded separator values as with PolyBase.

To really leverage all the possibilities of Synapse Analytics, you should use the parquet format in your data lake. In addition to being a compressed format, using parquet supports predicate pushdown in Synapse SQL and Spark. This greatly speeds up your exploratory queries to data lake from Synapse SQL or Spark, as you don’t have to read all the files from a given folder structure to get the rows you want. With parquet, the COPY INTO command automatically splits your files to speed up the processing.

Using Data Flows to clean and unify your data

Data Flows provide a no-code approach to transforming your data. As integrations, Data Flows are also already familiar from Azure Data Factory. With Data Flows, you could for example combine a few columns, delete a redundant one, calculate a working unique identifier and join the data with another flow before storing back to Data Lake as a processed file and also persisting it to SQL Pool table.

Explore data

Synapse Studio gives you multiple options to explore your data. We can graphically view the data lake structure, and easily get a SQL script or Spark Notebook to view the file contents just by right clicking a file.

Select from data lake file

With SQL On-demand, you can explore the contents of files without moving or importing them anywhere, and generate simple graphs in Synapse Studio UI to give you an idea of what you are looking at.

Graph T-SQL results in Azure Synapse Analytics

Synapse SQL, both on-demand and provisioned can be connected outside Synapse Studio with different clients using application protocols like ODBC, JDBC and ADO.NET.

In Apache Spark (example in Scala), using data from the SQL Pool is as easy as:

%%spark
val df = spark.read.sqlanalytics("SQLPool.schema.table")

This doesn’t require any configurations on Spark’s side, as the Spark and SQL are just two different runtimes operating on the same metadata and data sources.

Transform and enrich data

For transforming and enrichment, Synapse Analytics offers Spark Notebooks in addition to T-SQL. However, it has already been pretty easy to add Databricks notebooks as a part of your Azure Data Factory pipelines. With Synapse Analytics, again this integration is a bit more ready-made and easier.

One interesting possibility is SQL On-demand and it’s external tables. SQL On-demand doesn’t get access to SQL Pool’s tables (as you don’t even need to provision a SQL Pool or Synapse SQL to use SQL On-demand), but you can create external tables using T-SQL. External tables are stored as parquet backed files in Azure Data Lake Storage Gen 2. Compared to the current Azure General Availability offering, moving towards SQL On-demand and external tables in your transformations, offers serverless and fully scalable architecture. What’s the performance of external tables and SQL On-demand is going to be like, remains to be seen.

Serve

Synapse Analytics offers a few new interesting features for serving enriched data: SQL On-demand and Power BI Service connection.

With SQL On-demand, it is possible to create a highly scalable data pipeline all the way from source systems to transforming and storing data to serving it to Power BI, without any predetermined service level or capacity choices:

  • Ingest and Transform data using Data Pipelines and Data Flow. Use SQL On-demand or Data Flow as a part of your pipeline where needed
  • Store the results with “CETAS”, a T-SQL command CREATE EXTERNAL TABLE AS SELECT to store the results of your final SELECT statement to Azure Data Lake Storage Gen 2
  • Create a data set in Power BI. You can either use the files directly or use Direct Query via SQL On-demand.

Lastly, Synapse Analytics has the capability to link to Power BI Service! You can create linked services with Power BI Workspaces, view datasets and also create Power BI reports without ever leaving Synapse Studio. This greatly simplifies the process of creating reports from new data.

Connect Synapse Analytics with Power BI workspace

Do note that at the moment, you can only link to ONE Power BI Workspace. After connecting to a Power BI Workspace, you can create new Power BI reports based on published data sets without leaving Synapse Studio.

Create Power BI Reports straight from Synapse Studio

Manage and orchestrate

For orchestrating your pipelines, Synapse Analytics offers pretty much the same tooling as Azure Data Factory. You can easily combine your Data Flows, Spark Notebooks, T-SQL queries and everything to form pipelines as in Azure Data Factory. The familiar triggers are also there.

Management views also share much of the same as Azure Data Factory.

Synapse Analytics excels

Synapse Analytics strengths lie in it’s unified and yet versatile tooling.

  • Starting a new data platform project
    • Synapse Analytics offers an unified experience creating ingestion, preparation, transformations and serving your data in one place
  • Architecture with Data Lake
    • Synapse Analytics’s unified tooling makes it easier to work with and explore Data Lake
  • Architecture with Cosmos DB (or other future possible Synapse Link sources)
    • Near-real-time analytics based on operative data sources without any manual ETL processes
  • Explorative work on unknown data
    • Many tools and languages in one place
  • Security management in Synapse Workspace
    • Instead of setting up and configuring multiple separate tools with authentications and networks between them, you have only the Synapse Analytics workspace to setup
  • Possibility for both no-code and code approaches
    • A suitable approach probably exists for many developers to get started
  • Synapse SQL Pool as an MPP database
    • Synapse SQL is a powerful database in it’s own right

What we wish to see in future updates

  • While Synapse SQL Pool does pack a punch, it does require dba work and manual maintenance. Maybe more automation regarding this in the future?
    • Different indexes, partitions and distributions will have an impact on performance and storage costs
    • Workload management: classification, importance, isolation all need to be planned for and addressed for full scale operations
  • More dynamic resource scaling
    • Scaling Synapse SQL is an offline operation
    • While SQL On-demand offers some exciting possibilities, it fails to deliver any database functionalities: no access to relational tables for example as it doesn’t require a SQL Pool. There are parquet backed external tables, but their performance remains to be seen
  • Development in the Spark-sector as Synapse’s Spark is not on par with Databricks
    • For example, one notebook locks one Spark pool
    • Different runtime versions compared to Databricks
  • Implementation of version control
    • There’s no GIT or any other version control system support. The only way to save work is to publish, which also makes it visible to all other developers.
    • Managing different environments without version control system requires the use of SDK or APIs, which means a lot of work to get production ready
  • Getting from preview to GA
    • There still are instabilities and UI issues. The usual Preview shenanigans which are hopefully taken care of before moving to general availability.

Conclusion

In conclusion, Synapse Analytics’s vision is a great one: unified tool and experience to create almost everything you require in Azure to get your data from a data source to a published report. At the moment, there are some drawbacks as the whole workspace experience is still in preview, of which the lack of version control is definitely not the smallest.

At this point, we haven’t made any performance analysis and it is too early to say, if Synapse Analytics can really be a silver bullet for combining data pipelines, warehousing and analytics. In addition to performance, some key unique features are somewhat handicapped at this point: for example you can only connect to one Power BI Workspace. The vision is definitely there, and we will be anxiously waiting for future updates.

Automatized Code Deployment from Azure DevOps to Databricks

Target audience are data practitioners looking for a method to practice DataOps with a simple method even in restricted environments. A walk-through of the code is detailed in the appendix.

The linked code repository contains a minimal setup to automatize infrastructure and code deployment simultaneously from Azure DevOps Git Repositories to Databricks.

TL;DR:

  1. Import the repo into a fresh Azure DevOps Project,
  2. get a secret access token from your Databricks Workspace,
  3. paste the token and the Databricks URL into a Azure DevOps Library’s variable group named “databricks_cli”,
  4. Create and run two pipelines referencing the YAML in the repo’s pipelines/ directory.
  5. Any Databricks compatible (Python, Scala, R) code pushed to the remote repository’s workspace/ directory will be copied to the Databricks workspace with an interactive cluster waiting to execute it.

Background

Azure DevOps and Databricks have one thing in common – providing industry standard technology and offering them as an intuitive, managed platform:

  • Databricks for running Apache Spark
  • DevOps for Git repos and build pipelines

Both platforms have much more to offer then what is used in this minimal integration example. DevOps offers wiki, bug-, task- and issue tracking, canban, scrum and workflow functionality among others.

Databricks is a fully managed and optimized Apache Spark PaaS. It can natively execute Scala, Python, PySpark, R, SparkR, SQL and Bash code; some cluster types have Tensorflow installed and configured (inclusive GPU drivers). Integration of the H2O machine learning platform is quite straight forward. In essence Databricks is a highly performant general purpose data science and engineering platform which tackles virtually any challenge in the Big Data universe.

Both have free tiers and a pay-as-you-go pricing model.

Databricks provides infrastructure as code. A few lines of JSON consistently deploy an optimized Apache Spark runtime.

After several projects and the increasing need to build and prototype in a managed and reproducible way the DevOps-Databricks combination became very appreciated: It enables quick and responsive interactive runtimes and provides best industry practice for software development and data engineering. Deployment into (scheduled), performant, resilient production environments is possible without changes to the platform and without any need for refactoring.

The core of the integration uses Databricks infrastructure-as-code (IaC) capability together with DevOps pipelines functionality to deploy any kind of code.

  1. the Databricks CLI facilitates programmatic access to Databricks and
  2. the managed Build Agents in DevOps deploy both infrastructure and analytic code.

Azure pipelines deploy both the infrastructure code and the notebook code from the repository to the Databricks workspace. This enables version control of both the runtime and the code in one compact, responsive repository.

All pieces of the integration are hosted in a single, compact repository which make all parts of a data and modeling pipeline fully reproducible.

Prerequisites

Log into Azure DevOps and Databricks Workspace. There are free tiers for both of them. Setup details are explained extensively in the canonical quick start sections of either service:

For the integration Databricks can be hosted in either the Azure or AWS cloud.

1. Import the Repository

To use this demo as a starting point for a new project, prepare a Azure DevOps project:

  • create a new project (with an empty repository by default)
  • select the repository tab and choose “Import a repository”
  • paste the URL of this demo into the Clone URL field: https://dev.azure.com/reinhardseifert/DatabricksDevOps/_git/DatabricksDevOps
  • wait for the import to complete
  • clone the newly imported repository to your local computer to start deploying your own code into the workspace directory

Then create two Azure pipelines which create the runtime and sync any code updates into it (see below).

2. Create Databricks Secret Token

Log into the Databricks Workspace and under User settings (icon in the top right corner) and select “Generate New Token”. Choose a descriptive name (“DevOps Build Agent Key”) and copy the token to a notebook or clipboard. The token is displayed just once – directly after creation; you can create as many tokens as you wish.

Databricks > User Settings > Create New Token

3. Add the token to the Azure DevOps Library

The Databricks Secret Token has to be added to a Variable Group named “databricks_cli”. Variable groups are created under Pipelines > Library. Note that the name of the variable group is referenced in both pipeline definitions (/pipelines/build-cluster.yml and /pipelines/build-workspace.yml). Two variables have to be defined: 1. databricks_host and 2. databricks_token

The variable names are referenced in the .yml file – changing them in the DevOps library requires also changing them correspondingly in the .yml files. When clicking the lock icon after defining the variable it is treated as a secret and not visible after that action in the DevOps project. Neither in the Library nor in the Build servers (even when accidentially echo-ing them. But of course writing them to the Databricks environment would potentially expose them. This is a security concern when collaborating with non-trusted parties on a Project.

Pipelines > Library > Add Variable Group

 

Azure DevOps

Generally the Azure DevOps portal offers as minimal functionality a git repository to maintain code and pipelines to deploy the code from the repository into runtimes.

Azure Repositories

The Azure repo contains the full logic of the integration:

  1. the actual (Python) code to run,
  2. the JSON specification of the Spark-cluster which will run the code,
  3. shell build scripts which are executed in the pipeline/ build server,
  4. the YAML configuration which define the pipelines.

The complete CI/CD pipeline is contained in a single Git repository in a very compact fashion. Following Databricks’ terminology the Python code (1) is located in the workspace/ directory. The runtime specification .json (2), build scripts .sh (3) and the pipeline configuration .yml (4) are located in the pipelines/ directory according to the Azure DevOps paradigm.

Azure Pipelines

The Pipelines menu provides the following functionality:

  • Pipelines (aka build pipelines),
  • Environments (needed to group Azure resources – not used here),
  • Releases (aka release pipelines – not used here)
  • Library (containing the variable groups)

The build pipelines exclusively used in this demo project are managed under the “Pipelines > Pipelines” menu tab – not really intuitive.

Azure Build Pipelines

The pipeline’s build agents are configured via YAML files (e.g. build-cluster.yml). In this case they install the Databricks CLI on the build machine and then execute CLI commands to create runtimes and move code notebooks to the runtime. The Databricks cluster is configured by a single JSON file (see config.cluster.json).

This minimal integration requires creation of two pipelines:

  1. cluster creation – referencing pipelines/build-cluster.yml and
  2. workspace synchronization – referencing /pipelines/build-workspace.yml

After importing the repo:

  • select the Pipelines > Pipelines menu tab
  • choose Azure Repos Git YAML
  • select the imported repository from the drop-down menu
  • select Existing Azure Pipeline YAML file
  • select the YAML file from the drop-down menu
  • Run the pipeline for the first time – or just save it and run it later.

At this point the Databricks secret access token mentioned in the prerequisite paragraph need to be present in a “databricks_cli” variable group. Otherwise the pipeline run will fail and warn; in this case just create the token (in Databricks) and the variable group (in DevOps) and re-run the pipeline.

After creating the pipelines and saving them (or running them initially), the default pipeline names reference the source repository name which triggers them. For easier monitoring the pipelines should be renamed according to their function, like “create-cluster” and “sync-workspace” in this case.

Summary

This concludes the integration of analytic code from an Azure DevOps repository into a hosted Databricks runtime.

Any change to the config.cluster.json deletes the existing cluster and creates a new one according to the specifications in the JSON file.

Any change to workspace/ will copy the notebook file(s) (R, Python, Scala) to the Databricks workspace for execution on the cluster.

The Databricks workspace in this example was hosted on Azure. Only minor changes are required to use an AWS hosted workspace. On all cloud platforms the host URL and security token is specific for the chosen instance and region. The cloud specific parameter is the node_type_id in the cluster configuration .json file.

Using this skeleton repo as a starting point, it is immediately possible to run interactive workloads on a performant Apacke Spark cloud cluster – instead of “cooking” the local laptop with analytic code – transparently maintained on a professional DevOps platform.

Appendix

Following, a detailed walk-through of the .yml pipeline configurations, .sh build scripts and .json configuration files.

In general, the YAML instructs the build server to 1. start up when a certain file is changed (trigger), 2. copy the contents of the repository to the build server and 3. execute a selection of shell scripts (tasks) from the repository

Pipeline: Create cluster

This is a detailed walk through for the build-cluster.yml pipeline. The .yml files have a hierachical structure and the full hierarchy of the DevOps build pipeline is included although stages could be omitted.

Trigger

The first section of the pipeline YAML specifies the trigger. Any changes to the specified branch of the linked repo will automatically run of the Build Agent.

trigger:
  branches:
    include:
    - master
  paths:
    include:
    - pipelines/config.cluster.json
    - pipelines/databricks-library-install.sh

Without the paths: section, any change to the master branch will run the pipeline. The cluster is rebuild when the configuration changes or the selection of installed Python- or R-libraries changes.

Stages

The stage can be omitted (for a single stage pipeline) and the pool, variables and jobs directly defined. Then the stage would be implicit. It is possible to add testing steps to the pipeline and build fully automated CI/CD pipelines accross environments within on .yml file.

stages:
- stage: "dev"
  displayName: "Development"
  dependsOn: []

Pool

  pool:
    vmImage: "ubuntu-latest"

Selects the type of virtual machine to start when the trigger files are changed. At the time of writing ubuntu_latest will start a Ubuntu 18.04 LTS image.

Variables

  variables:
    - group: databricks_cli

This section references the variable group created in the Prerequisite section. The secret token is transfered to the build server and authorizes the API calls from the server to the Databricks workspace.

Jobs, Steps and Tasks

A job is a sequence of steps which are executed on the build server (pool). In this pipeline only task steps are used (see the docs for all step operations).

  jobs:
    - job: CreateCluster4Dev
      steps:

        - task: UsePythonVersion@0
          inputs:
            versionSpec: "3.8"
            architecture: "x64"

The first step is selecting the Python version for all following Python command on the build server; the Databricks CLI is written in Python and installed via Pip in the following task.

Task: Install and configure the Databricks CLI

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cli-config.sh
            args: "\$(databricks_host) \$(databricks\_token)"
          displayName: "Install and configure the Databricks CLI"

Note that the path is relative to the root of the repo. The secret access token and host URL from the DevOps library are copied into environment variables which can be passed to the script in the args section.

The shell script executes the installation of the Databricks CLI and writes the neccessary CLI configuration on the build server.

python -m pip install databricks-cli
echo -e "[DEFAULT]\nhost: $HOST\ntoken: $TOKEN" > $HOME/.databrickscfg

Task: “Delete previous cluster version (if existing)”

This task will remove any cluster with the name provided in the args: section. This allows for updating the cluster when the configuration file is changed. When no such cluster is present the script will fail. Usually the pipeline will break at this point but here continueOnError is true, so the pipeline will continue when creating a cluster for the first time.

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cluster-delete.sh
            args: "HelloCluster"
          continueOnError: "true"
          displayName: "Delete previous cluster version (if existing)"

The shell script called by this task is a wrapper around the Databricks CLI. First it queries for the cluster-id of any cluster with the name passed.

CLUSTER_ID=$(databricks clusters list --output json | jq -r --arg N "$CLUSTER_NAME" '.clusters[] | select(.cluster_name == $N) | .cluster_id')

It is possible to create multiple clusters with the same name. In case there are multiple all of them are deleted.

for ID in $CLUSTER_ID
do
    echo "Deleting $ID"
    databricks clusters permanent-delete --cluster-id $ID
done

Task: Create new cluster

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-cluster-create.sh
            args: "HelloCluster"
          displayName: "Create new cluster"

The build script reads the config.cluster.json and adds the cluster name passed from the pipeline .yml

cat config.cluster.json | sed "s/CLUSTER_NAME/$CLUSTER_NAME/g" > /tmp/conf.json

Now the configuration .json file can be passed to the Databricks CLI. The complete Apache Spark infrastructure is configured in the json. CLUSTER_NAME will be replaced with the name passed from the .yml.

{
    "cluster_name": "CLUSTER_NAME",
    "spark_version": "6.0.x-scala2.11",
    "spark_conf": {
        "spark.sql.execution.arrow.enabled": "true"
    },
    "node_type_id": "Standard_DS3_v2",
    "num_workers": 1,
    "ssh_public_keys": [],
    "custom_tags": {
        "Project": "DevOpsIntegration"
    },
    "cluster_log_conf": {
        "dbfs": {
            "destination": "dbfs:/cluster_logs"
        }
    },
    "spark_env_vars": {
        "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
    },
    "autotermination_minutes": 120,
    "enable_elastic_disk": false,
    "init_scripts": []
}

Updating the runtime to another version requires only modifying the spark_version parameter with any supported runtime.

A Spark cluster consists of one driver node and a number of worker nodes and can be scaled horizontally by adding nodes (num_workers) or vertically by choosing larger node types. The node types are cloud provider specific. The Standard_DS3_v2 node type id references the minimal Azure node.

The autotermination feature shuts the cluster down when not in use. Costs are billed per second up time per processing unit.

Any reconfigurations triggers the pipeline and rebuilds the cluster.

CLUSTER_ID=$(databricks clusters create --json-file /tmp/conf.json | jq -r '.cluster_id')

The cluster create call returns the cluster-id of the newly created instance. Since the last step of this pipeline installs additional Python and R libraries (via Pip and CRAN respectively) it is necessary to wait for the cluster to be in pending state.

STATE=$(databricks clusters list --output json | jq -r --arg I "$CLUSTER_ID" '.clusters[] | select(.cluster_id == $I) | .state')

echo "Wait for cluster to be PENDING"
while [[ "$STATE" != "PENDING" ]]
do
    STATE=$(databricks clusters list --output json | jq -r --arg I "$CLUSTER_ID" '.clusters[] | select(.cluster_name == $I) | .state')
done

Task: Install Python and R dependencies on the cluster

The final step is to add additional Python and R packages to the cluster. There are many ways to install packges in Databricks. This is just one way to do it.

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-library-install.sh
            args: "HelloCluster"
          displayName: "Install Python and R dependencies"

Again the shell script wraps the Databricks CLI, here the library install command. The cluster name (“DemoCluster” in this example) has to be passed again.

All CLI calls to Databricks need the cluster-id to delete, create and manupulate instances. So first fetch it with a cluster list call:

CLUSTER_ID=$(databricks clusters list --output json | jq -r --arg N "$CLUSTER_NAME" '.clusters[] | select(.cluster_name == $N) | .cluster_id')

Then install the packages – one call to library install per package:

databricks libraries install --cluster-id $CLUSTER_ID --pypi-package azure
databricks libraries install --cluster-id $CLUSTER_ID --pypi-package googlemaps
databricks libraries install --cluster-id $CLUSTER_ID --pypi-package python-tds
databricks libraries install --cluster-id $CLUSTER_ID --cran-package tidyverse

For additional Python or R package add a line in this build script – this will trigger the pipeline and the cluster is rebuild.

Pipeline: Import workspace

This is a detailed walk through for the build-workspace.yml pipeline. The first part of the pipeline is identical to the build-cluster.yml pipeline. The trigger include differs, since this pipeline is triggered by code pushes to the workspace/ directory. The choice of the build server (pool), the variable reference to the databricks_cli variable group for the Databricks access tokens and the Python version task are identical, also installing and configuring the Databricks CLI with the same build script as above.

The only build task is importing all files in the workspace/ directory to the Databricks Workspace. The args passes a sub-directory name for the /Shared/ folder in Databricks ( /Shared/HelloWorkspace/ in the example).

        - task: ShellScript@2
          inputs:
            scriptPath: pipelines/databricks-workspace-import.sh
            args: "HelloWorkspace"
          displayName: "Import updated notebooks to workspace to dev"

The specified directory is first deleted. When the directory does not exist, the CLI prints and error in JSON format, but does not break the pipeline. The args: parameter is passed to the $SUBDIR variable in the build script.

databricks workspace delete --recursive /Shared/$SUBDIR

Then the script files in the workspace/ folder of the master branch are copied into the Databricks workspace.

databricks workspace import_dir ../workspace /Shared/$SUBDIR/

Remember that the repo is copied into the pipeline build agent/server and the working directory of the pipeline agent points to the location of the .yml file which defines the pipeline.

Lessons learned from combining SQS and Lambda in a data project

In June 2018, AWS Lambda added Amazon Simple Queue Service (SQS) to supported event sources, removing a lot of heavy lifting of running a polling service or creating extra SQS to SNS mappings. In a recent project we utilized this functionality and configured our data pipelines to use AWS Lambda functions for processing the incoming data items and SQS queues for buffering them. The built-in functionality of SQS and Lambda provided us serverless, scalable and fault-tolerant basis, but while running the solution we also learned some important lessons. In this blog post I will discuss the issue of valid messages ending up in dead-letter queues (DLQ) and correctly configuring your DLQ to catch only erroneous messages from your source SQS queue.

What are Amazon SQS and Lambda?

In brief, Amazon SQS is a lightweight, fully managed message queueing service, that enables decoupling and scaling microservices, distributed systems and serverless applications. With SQS, it is easy to send, store, and receive messages between software components, without losing messages.

AWS Lambda is a fully managed, automatically scaling service that lets you run code in multiple different languages in a serverless manner, without having to provision any servers. You can configure a Lambda function to execute on response to various events or orchestrate the invocations. Your code runs in parallel and processes each invocation individually, scaling with the size of the workload.

When you configure an SQS queue as an event source for your Lambda, Lambda functions are automatically triggered when messages arrive to the SQS queue. The Lambda service automatically scales up and down based on the number of inflight messages in the queue. The polling, reading and removing of messages from the queue will be thus automatically handled by the built-in functionality. Successfully processed messages will be removed and the failed ones will be returned to the queue or forwarded to the DLQ, without needing to explicitly configure these steps inside your Lambda function code.

Problems with valid messages ending up in DLQ

In the recent project we needed to process data that would be coming in daily as well as in larger batches with historical data loadings through the same data processing pipeline. In order to handle changing data loads, SQS decouples the source system from processing and balances the load for both use cases. We used SQS for queueing metadata of new data files and Lambda function for processing the messages and passing on metadata to next steps in the pipeline. When testing our solution with pushing thousands of messages rapidly to the queue, we observed many of the messages ending up in a dead-letter queue, even though they were not erroneous.

From the CloudWatch metrics, we found no execution errors during the given period, but instead there was a peak in the Lambda throttling metric. We had configured a DLQ to catch erroneous messages, but ended up having completely valid and unprocessed messages in the DLQ. How does this happen? To understand this, let’s dive deeper into how Lambda functions are triggered and scaled when they have SQS configured as the event source.

Lambda scales automatically with the number of messages arriving to SQS – up to a limit

Let’s first introduce briefly the parameters of SQS and Lambda that are relevant to this problem.

SQS

ReceiveMessageWaitTimeSeconds: Time that the poller waits for new messages before returning a response. Your messages might be arriving to the SQS queue unevenly, sometimes in bursts and sometimes there might be no messages arriving at all. The default value is 0, which equals constant polling of messages. If the queue is empty and your solution allows some lag time, it might be beneficial not to poll the queue all the time and return empty responses. Instead of polling for messages constantly, you can specify a wait time between 1 and 20 seconds.

VisibilityTimeout: The length of time during which a message will be invisible to consumers after the message is read from the queue. When a poller reads a message from the SQS queue, that message still stays in the queue but becomes invisible for the period of VisibilityTimeout. During this time the read message will be unavailable for any other poller trying to read the same message and gives the initial component time to process and delete the message from the queue.

maxReceiveCount: Defines the number of times a message can be delivered back to being visible in the source queue before moving it to the DLQ. If the processing of the message is successful, the consumer will delete it from the queue. When ever an error occurs in processing of a message and it cannot be deleted from the queue, the message will become visible again in the queue with an increased ReceiveCount. When the ReceiveCount for a message exceeds the maxReceiveCount for a queue, message is moved to a dead-letter queue.

Lambda

Reserved concurrency limit: The number of executions of the Lambda function that can run simultaneously. There is an account specific limit how many executions of Lambda functions can run simultaneously (by default 1,000) and it is shared between all your Lambda functions. By reserving part of it for one function, other functions running at the same time can’t prevent your function from scaling.

BatchSize: The maximum number of messages that Lambda retrieves from the SQS queue in a single batch. Batchsize is related to the Lambda event source mapping, which defines what triggers the Lambda functions. In this case they are triggered from SQS.

In the Figure 1 below, it is illustrated how Lambda actually scales when messages arrive in bursts to the SQS queue. Lambda uses long polling to poll messages in the queue, which means that it does not poll the queue all the time but instead on an interval between 1 to 20 seconds, depending on what you have configured to be your queue’s ReceiveMessageWaitTimeSeconds. Lambda service’s internal poller reads messages as batches from the queue and invokes the Lambda function synchronously with an event that contains a batch of messages. The number of messages in a batch is defined by the BatchSize that is configured in the Lambda event source mapping.

When messages start arriving to the queue, Lambda reads first up to 5 batches and invokes a function for each. If there are messages still available, the number of processes reading the batches are increased by up to 60 more instances per minute (Figure 2), until it reaches the 1) reserved concurrency limit configured for the Lambda function in question or 2) the account’s limit of total concurrent Lambda executions (by default 1,000), whichever is lower (N  in the figure).

By setting up a reserved concurrency limit for your Lambda, you guarantee that it will get part of the account’s Lambda resources at any time, but at the same time you also limit your function from scaling over that limit, even though there would be Lambda resources available for your account to use. When that limit is reached and there are still messages available in the queue, one might assume that those messages will stay visible in the queue, waiting until there’s free Lambda capacity available to handle those messages. Instead, the internal poller still tries to invoke new Lambda functions for all the new messages and therefore causes throttling of the Lambda invokes (figure 2). Why are some messages ending up in DLQ then?

Let’s look at how the workflow goes for an individual message batch if it succeeds or fails (figure 3). First, the Lambda internal poller reads a message batch from the queue and those messages stay in the queue but become invisible for the length of the configured VisibilityTimeout. Then it invokes a function synchronously, meaning that it will wait for a response that indicates a successful processing or an error, that can be caused by e.g. function code error, function timeout or throttling. In the case of a successful processing, the message batch is deleted from the queue. In the case of failure, however, the message becomes visible again.

The SQS queue is not aware of what happens beyond the event source mapping, if the invocations are failed or throttled. It keeps the messages in the queue invisible, until they get either deleted or turned back to visible after the length of VisibilityTimeout has passed. Effectually this means that throttled messages are treated as any other failed messages, so their ReceiveCount is increased every time they are made visible in the queue. If there is a huge burst of messages coming in, some of the messages might get throttled, retried, throttled again, and retried again, until they reach the limit of maxReceiveCount and then moved to the DLQ.

The automatic scaling and concurrency achieved with SQS and Lambda sounds promising, but unfortunately like all AWS services, this combination has its limits as well. Throttling of valid messages can be avoided with the following considerations:

Be careful when configuring a reserved concurrency to your Lambda function: the smaller the concurrency, the greater the chance that the messages get throttled. AWS suggests the reserved concurrency for a Lambda function to be 5 or greater.

Set the maxReceiveCount big enough in your SQS queue’s properties, so that the throttled messages will eventually get processed after the burst of messages. AWS suggest you set it to 5 or greater.

By increasing message VisibilityTimeout of the source queue, you can give more time for your Lambda to retry the messages in the case of message bursts. AWS suggests this to be set to at least 6 times the timeout you configure to your Lambda function.

Of course, tuning these parameters is an act of balancing with what best suits the purpose of your application.

Configuring DLQ to your SQS and Lambda combination

If you don’t configure a DLQ, you will lose all the erroneous (or valid and throttled) messages. If you are familiar with the topic this seems obvious, but it’s worth stating since it is quite important. What is confusing now in this combo, is that you can configure a dead-letter queue to both SQS and Lambda. The AWS documentation states:

Make sure that you configure the dead-letter queue on the source queue, not on the Lambda function. The dead-letter queue that you configure on a function is used for the function’s asynchronous invocation queue, not for event source queues.

To understand this one needs to dive into the difference between synchronous and asynchronous invocation of Lambda functions.

When you invoke a function synchronously Lambda runs the function and waits for a response from it. The function code returns the response, and Lambda returns you this response with some additional data, including e.g. the function version. In the case of asynchronous invocation, however, Lambda sends the invocation event to an internal queue. If the event is successfully sent to the internal queue, Lambda returns success response without waiting for any response from the function execution, unlike in synchronous invocation. Lambda manages the internal queue and attempts to retry failed events automatically with its own logic. If the execution of the function is failing after retries as well, the event is sent to the DLQ configured to the Lambda function. With event source mapping to SQS, Lambda is invoked synchronously, therefore there are no retries like in asynchronous invocation and the DLQ on Lambda is useless.

Recently, AWS launched Lambda Destinations, that makes it possible to route asynchronous function results to a destination resource that can be either SQS, SNS, another Lambda function or EventBridge. With DLQs you can handle asynchronous failure situations and catch the failing events, but with Destinations you can actually get more detailed information on function execution in both success and failure, such as code exception stack traces. Although, Destinations and DLQs can be used together and at the same time, AWS suggests Destinations should be considered a more preferred solution.

Conclusions

The described problems are all stated and deductible from the AWS documentation, but still not completely obvious. With carefully tuning the parameters of our SQS queue, mainly by increasing the maxReceiveCount and VisibilityTimeOut, we were able to overcome the problems with Lambda functions throttling. With configuring the DLQ to the source SQS queue instead of configuring it to Lambda, we were able to catch erroneous or throttled messages. Although adding a DLQ to your source SQS does not solve anything by itself, but you also need to handle the failing messages in some way. We configured a Lambda function also to the DLQ to write the erroneous messages to DynamoDB. This way we have a log of the unhandled messages in DynamoDB and the messages can be resubmitted or investigated further from there.

Of course, there are always several kinds of architectural options to solve these kind of problems in AWS environment. Amazon Kinesis, for example, is a real-time stream processing service, but designed to ingest large volumes of continuous streaming data. Our data, however, comes in uneven bursts, and SQS acts better for that scenario as a message broker and decoupling mechanism. One just needs to be careful with setting up the parameters correctly, as well as be sure that the actions intended for the Lambda function will execute within Lambda limits (including 15 minutes timeout and 3,008 MB maximum memory allocation). The built-in logic with Lambda and SQS enabled the minimal infrastructure to manage and monitor as well as high concurrency capabilities within the given limits.

Getting started with your Azure data pipeline

Majority of data project limbo around text files. Companies have 10s of different software that they use. Integrating all of the can be sometimes impossible and not so business oriented. That’s why exporting text files from different systems is always easier, than building a datahub.

Self-service solutions make it very easy to import text files and visualise them. This can at one point mean that an individual in an organisation has X amount of excels and csv files that take a lot of space and became a problem. What was the file and is it up to date?

Data in Azure

Storaging, analysing and loading them into Azure is a good option. Azure has several different storage related services available, choosing the right one should always be done case by case. Most convenient one is Azure Data Lake (ADL). ADL is the dream storage place for developer, data scientists and analysts, that need scalable data storage, with easy access to big data. It has all the capabiltities an enterpise needs, security, manageability, scalability, reliability and availability to serve demanding storage requierments. Data projects start with gathering these text files into ADL and then copying them into Azure data warehouse(ADW)/SQL server for reporting. Finding suitable way for doing that can be challenging and time consuming.

Azure provides options like Azure Data Factor(ADF) and PolyBase. Azure Data factor(ADF) is a data processing tool, for managing data pipelines. It is a fully managed ETL service in cloud. ADF can orchestrate data flows from on-premise and cloud sources, which makes it a very flexible and easy to use tool for moving data to and from ADL. It is not just for copying data into databases, you can schedule, manage, analyse, processes and monitor your data pipeline with it. As of most of the systems change, schemas and data models do that as well. ADF works well when nothing isn’t changed in the table side.

On average text files that are > 1 GB that need to be load into ADW/SQL server, would be suggestively done with Polybase. PolyBase is a technology that connects external/internal data with database via t-sql language.

Polybase and ADF loading time are different, for a 5 GB text file it varies from 20 -30 minutes. ADF has a “warming up time”, which means that the system needs some time to be fully available. With PolyBase you can make an insert and it will take around 3-5 minutes for the text file to be in ADW.

Practicalities with PolyBase

Both ADF and Polybase are very sensitive with the data. Key thing to make sure before you start querying data:

  • Identical schemas and data types
  • Most errors are symbols inside text rows
  • Header is not specified
  • Source file in ADL/Blob has to have permission (Read, Write, Execute)
  • Reject_value for column names

For making PolyBase data pipeline you need the following: Database scope credentials, External data source, External file Format and External Table. For querying data, I suggest to insert data from external table into regular table.

How it works in practice?

Before you can create Scope credentials you need your client_id and Token_EndPoint, which can be found from azure portal, under Azure Active Directory. After scope credentials have been created, use the same credentials name in the external data source credential phase. Location of the data source, is the place where you have the file.

Creating a file format, define the format based on the text file you are trying to insert. Here is where you can define what type of a text file you have.

CREATE DATABASE SCOPED CREDENTIAL
WITH
IDENTITY = '@',
SECRET = ''
;

CREATE EXTERNAL DATA SOURCE
WITH (
TYPE = HADOOP,
LOCATION = 'adl://.azuredatalakestore.net,
CREDENTIAL =
);

CREATE EXTERNAL FILE FORMAT TextFile
WITH (
FORMAT_TYPE = DelimitedText,
FORMAT_OPTIONS (FIELD_TERMINATOR = '|'
,STRING_DELIMITER = '' ),

);

After that create external table add the location in the Data Lake.

CREATE EXTERNAL TABLE [TABLE.NAME]
(
COLUMN INT,
COLUMN2 VARCHAR(10)
)
WITH (DATA_SOURCE = [Data Lakename ], LOCATION = N'/foldername/', FILE_FORMAT = [TextFiletype], REJECT_TYPE = VALUE, REJECT_VALUE = 1)

Credentials are connected to the folder inside the Data Lake, so you can’t create the external table before the folder has credentials. According to Microsoft this is the best practice to insert data into ADW.

Summary:

In case where schemas won’t change or have little changes external tables are a good way of managing regular data flows. Automating loads between ADL and ADW, Microsoft suggest SSIS usage. There is also an open source option Airflow, which is a platform to programmatically author, schedule and monitor workflows.

CREATE EXTERNAL TABLE (Transact-SQL.
Access control in Azure Data Lake Store.