Lambda Architecture with Azure Databricks

697

Introducing Lambda Architecture

It is imperative to know what is a Lambda Architecture, before jumping into Azure Databricks. The greek symbol lambda(λ) signifies divergence to two paths. Hence, owing to the explosion volume, variety, and velocity of data, two tracks emerged in Data Processing i.e. the hot path and the cold path or Real-time processing and Batch Processing. The below image illustrates the high-level overview of this concept.

Furthermore, with evolving technologies, many alternatives to realize the lambda architecture cropped up and Microsoft Azure ecosystem did not stay behind. The below image gives an integrated view of the azure big data landscape:

Also read : Machine learning in Azure Databricks.

Azure Databricks Lambda Architecture

If we observe the Microsoft big data landscape, Azure Databricks appears at multiple places. From batch processing for traditional ETL processes to real-time analytics to Machine Learning, Databricks can be leveraged for any of the tasks mentioned above. The below image represents the recommended Microsoft Big Data lambda architecture.

Batch Processing with Azure Databricks

Firstly, we will touch base on the Batch Processing aspect of Databricks. In this article, we will use Azure SQL Database as sink, since Azure SQL DW has Polybase option available for ETL/ELT.

In the above architecture, data is being extracted from Data Lake, transformed on the fly using Azure Databricks. Finally, we persist the transformed data into Azure SQL Database. However, there are a couple of nuances that need attention viz.

  1. Azure Databricks needs access to the Data Lake Store to extract the data. Active Directory app registration comes to our rescue here.
  2. To write to Azure SQL Database, we need authorization. However, we cannot expose sensitive credential information in the Notebook. Hence, we need to define secret scope using a key-vault(applicable in data lake access control as well)

Step 1: Creating an App Registration

This Microsoft doc elucidates on creating app registrations. Once done with app registration, open a notebook in your Databricks workspace. Acquaint yourself with Databricks workspaces, clusters and notebooks using this documentation

Further, open a Scala notebook. In the Notebook, write the code in the following format(See this GitHub link for the entire code). This code ensures that Azure databricks can access Azure Data lake using Azure service principal authentication.

spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", "<Application ID>")
spark.conf.set("dfs.adls.oauth2.credential", "<Key>")
spark.conf.set("dfs.adls.oauth2.refresh.url", "<https://login.microsoftonline.com/<Directory ID>/oauth2/token>")

Step 2: Creating a secret scope

Since we are using Azure SQL database as our sink, which is a PaaS offering, sensitive authentication information comes into the picture. This information cannot be exposed in the notebook and hence, we need to create a key-vault backed secret scope. We have created a secret scope called as ‘AvroScope’ as opposed to ‘key-vault-secret’ mentioned in the doc.

Once the scope is created, use secrets in Azure Key-vault to Generate/Import the secret fields. The fields username and password are the ones that we will be using.

Step 3: Secret Credentials

These secret credentials can be redacted using the following code:

val jdbcUsername = dbutils.secrets.get(scope = "AvroScope", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "AvroScope", key = "password")

After redacting the credentials, we build the connection string of the sink database, i.e., Azure SQL Database using the following code:

val jdbcHostname = "<My Database Server Name>"
val jdbcPort = 1433
val jdbcDatabase = "<My Database Name>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

Now, as the source and sink are ready, we can move ahead with the ETL process. Data is extracted from the Azure data lake using sqlContext.read.format API. The Data Lake folder path can be found in folder properties of data explorer. Go to the folder which consists of data and copy the full path:

Paste the copied path along with the file name in the load function of the below code:

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("adl://srcdl.azuredatalakestore.net/FinancialData/cs-training_new.csv")

Using the show function of Dataframe API, we can visualize the data in tabular format, since sqlContext.read.format reads the data into a Data frame. Initial Data Analysis reveals that there is a debt ratio in the data has outliers, while the monthly income field consists of missing values. We perform data cleansing here using the filter function:

df.show()

val df1=df.filter("DebtRatio<1")
val df2=df1.filter("MonthlyIncome <> 'NA'")

After data cleansing, we wish to add a new column with the name ‘IncomeConsumption’ which is a ratio of Monthly Income and Number of dependents(minimum being 1). The ‘withColumn’ spark SQL function comes to our aid here:

import org.apache.spark.sql.functions._
val finaldf =df2.withColumn("IncomeConsumption", df2("MonthlyIncome")/(df2("NumberOfDependents")+1))

Having performed the cleansing and transformations, we further go ahead and save the data to the sink, i.e., our Azure SQL database using jdbcUrl created in connection string formation elucidated above. Note that the mode is specified as ‘Overwrite,’ which is basic SCD-1:

finaldf.write.mode("Overwrite").jdbc(jdbcUrl, "creditdata", connectionProperties)

Final Data

 

Also read : Spark Dataframe performance benefits

The Real-Time Processing with Azure Databricks

We have ventured into the era of the Internet of Things and real-time feeds, thus leading to the high-velocity paradigm of Big Data along with IoT. This real-time path of the lambda architecture augments a wide variety of critical applications like predictive maintenance, disaster prediction, etc. where timely actions can save assets as well as lives.

In Azure, there are multiple ways to realize real-time architecture, thus enabling faster analytics. Broadly it can be classified as the Infrastructure as a service (IaaS) way or the Platform as a Service (PaaS) way. With IaaS, we have Kafka in Azure to receive real-time feeds. This streaming data can then be fed into Storm (or any PaaS service like Databricks) enabling stream analytics. Although the IaaS way has its advantages, to realize the architecture in a serverless fashion, we will go PaaS way; the IoT Hub way

IoT Hub is the bidirectional messaging PaaS to communicate with your devices/sensors etc. Data from IoT hub can be processed using two PaaS services in Azure viz. Azure Stream Analytics and Azure Databricks. We want to clarify that Azure Stream Analytics is an excellent service and it is widely used in the Industry.

Also read: An Introduction to Azure IoT with Machine Learning

However, in this article, we will stick with Azure Databricks for three reasons: An Introduction to Azure IoT with Machine Learning

  1.  It gives us an integrated platform for both batch processing and real-time analytics of the lambda architecture.
  2.  It helps us leverage the power of Spark streaming under the hood.
  3.  The cluster autoscaling feature enables us to save a lot of expenses.

Roughly the architecture looks like this:

Step 1: Device to IoT hub

For demonstration purpose, we will introduce a Raspberry PI simulator which will push the fabricated weather data to IoT hub. To achieve this, we need to declare a device in the IoT hub, which is the simulator in this case. Click on the add icon and mention the device name:

After this click on the registered device and retrieve the Primary connection string from the device details:

Paste the connection String extracted into the connection string field in the Raspberry PI simulator:

A sanity check here would be the glowing of the LED in the picture.

Step 2: IoT Hub to Azure Databricks

Once the IoT hub setup is ready, it is essential to read and process the streaming data. Here services like Azure Stream Analytics and Databricks comes into the picture. In Databricks, we leverage the power of Spark Streaming to perform SQL like manipulations on Streaming Data. The first step here is to establish a connection between the IoT hub and Databricks. First, we need to install the spark.eventhubs library to the pertinent cluster. After that, we need to write the below code(Scala):

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Build connection string with the above information
val connectionString = ConnectionStringBuilder("<Event Hub Compatible endpoint of IoT Hub>")
.setEventHubName("<IoT Hub Name>")
.build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream);
//follow by the different options usable

val incomingStream = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.option("eventhubs.partition.count", "4")
.load()

incomingStream.printSchema

After establishing the connection, we need to define the JSON Schema to match the structure of the incoming stream. It can be achieved using the below scala code.

import org.apache.spark.sql.types._ // https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/package-summary.html
import org.apache.spark.sql.functions._

// Our JSON Schema
val jsonSchema = new StructType()
.add("messageId", StringType)
.add("deviceId", StringType)
.add("temperature", StringType)
.add("humidity", StringType)

// Convert our EventHub data, where the body contains our message and which we decode the JSON
val messages = incomingStream
// Parse our columns from what EventHub gives us (which is the data we are sending, plus metadata such as offset, enqueueTime, ...)
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select them so we can play with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", jsonSchema) as "sensors")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"sensors.messageId".cast("string"),
$"sensors.deviceId".cast("string"),
$"sensors.temperature".cast("double") as "tempVal",
$"sensors.humidity".cast("double") as "humVal"
)
messages.printSchema()

Please note that we create a temporary view on top of the JSON Schema in order to write SQL queries to perform advanced analytics using the function ‘createOrReplaceTempView’:

messages.createOrReplaceTempView("dataStreamsView")

After this your streaming data is ready for advanced analytics:

Conclusion

Read this article for Machine learning in Azure Databricks. Please read DataBricks Part 2 – Big Data Lambda Architecture and Batch Processing and Build your Data Estate with Azure DataBricks – Part 3 – IoT which have inspired this consolidated article of mine.

Disclaimer: The articles and code snippets on data4v are for general information purposes only. We make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose.




5 thoughts on “Lambda Architecture with Azure Databricks

Leave a Reply