Run Same Databricks Notebook for Multiple Times In Parallel (Concurrently) Using Python

In this blog, I would like to discuss how you will be able to use Python to run a databricks notebook for multiple times in a parallel fashion. Noting that the whole purpose of a service like databricks is to execute code on multiple nodes called the workers in parallel fashion. But there are times where you need to implement your own parallelism logic to fit your needs.

To follow along, you need to have databricks workspace, create a databricks cluster and two notebooks. The parent notebook orchestrates the parallelism process and the child notebook will be executed in parallel fashion. The idea would be that the parent notebook will pass along a parameter for the child notebook and the child notebook will use that parameter and execute a given task. Without further to say, let’s get to it.

For simplicity let’s design a child notebook that takes a number as an input and then print the multiplication of this number by 10. Also, to make sure that we test our parallelism logic, we will introduce 20 seconds sleep time for our child notebook. So open up your child notebook and enter the following code in Command 1 (this code will help us pass a parameter from the parent notebook).

dbutils.widgets.text("numberToProcess","","")
dbutils.widgets.get("numberToProcess")
numberToProcess = int(getArgument("numberToProcess"))

Open up a new command in child notebook and enter the following code which will calculate the 10 multiplier for our number of interest, introduce a sleep time of 20 seconds and then print the output

import time
outputNumber = numberToProcess * 10
time.sleep(20) # sleep for 20 seconds 
print('The desired output is {}'.format(outputNumber))

So your child notebook will look something like this

Now that you have your child notebook setup, go to your parent notebook and paste the following code to import the multithreading packages

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

Then we define the function that will execute a child notebook passing in the number parameter

def processAnIntegerNumber(numberToProcess):
  dbutils.notebook.run(path = "/Shared/ChildNotebook",
                                        timeout_seconds = 300, 
                                        arguments = {"numberToProcess":numberToProcess})

Finally, the magic command, which will take a list of numbers, spin up a multithreading executor and then map the list of numbers to that executor. This command will execute the childnotebook instance for 5 times (because the list of numbers contains five numbers) in a parallel fashion.

listOfNumbers = [87,90,12,34,78]
with ThreadPoolExecutor() as executor:
  results = executor.map(processAnIntegerNumber, listOfNumbers)

Executing the parent notebook, you will notice that 5 databricks jobs will run concurrently each one of these jobs will execute the child notebook with one of the numbers in the list. This is a snapshot of the parent notebook after execution

Notice how the overall time to execute the five jobs is about 40 seconds. Now, if we open the link to any of these jobs, we will notice that the time was also about 32 seconds indicating that jobs were run in parallel

Hope this blog helps you run your jobs faster and satisfies your “Need for Speed”.

You can follow this blog for Machine learning, data engineering, devops, dynamics and power apps tips and tricks.

Mount a Blob Storage in Azure DataBricks Only if Not Mounted Already (Using Python)

As discussed in this article by Databricks that during your work in a notebook, you can mount a Blob Storage container or a folder inside a container to Databricks File System. The whole point of mounting to a blob storage container is simply to use an abbreviated link to your data using the databricks file system rather than having to refer to the whole URL to your blob container every time you need to read/write data from that blob container. More details on mounting and its usage, can be found in the articles referenced above.

The purpose of this article is to suggest a way to check if the mountpoint has been created already and only attempt to create it if it doesn’t exist using python.

This can simply be done if we knew how to list existing mountpoints using python. Luckily, databricks offers this to us using the dbutils.fs.mounts() command. To access the actual mountpoint we can do something like this:

for mount in dbutils.fs.mounts():
  print (mount.mountPoint)

Knowing how to access mountpoints enables us to write some Python syntax to only mount if the mountpoint doesn’t exist. The code should look like the following:

storageAccountName = "your storage account name"
storageAccountAccessKey = "your storage account access key"
blobContainerName = "your blob container name"
if not any(mount.mountPoint == '/mnt/FileStore/MountFolder/' for mount in dbutils.fs.mounts()):
  try:
    dbutils.fs.mount(
    source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
    mount_point = "/mnt/FileStore/MountFolder/",
    extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
  )
except Exception as e:
  print("already mounted. Try to unmount first")

Or, you can add an error handler to print an error message if the the blob is mounted already, as such:

storageAccountName = "your storage account name"
storageAccountAccessKey = "your storage account access key"
blobContainerName = "your blob container name"

try:
  dbutils.fs.mount(
  source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
  mount_point = "/mnt/FileStore/MountFolder/",
  extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
  )
except Exception as e:
  print("already mounted. Try to unmount first")

Enable a Service Principal to Create Access Policies for an Azure Resource Using DevOps

If you got along well with your deployment strategy, you might got to the point where you are using a service principal in DevOps to do your deployments. One of the deployment tasks that I needed to do was to create an access policy using a service principal with Powershell. I simply wanted to do that using command Set-KeyVaultAccessPolicy. More specifically, I wanted to grant an access policy for my data factory to be able to get/list secrets in my key vault. Running my code with my windows account, everything was working as expected. Running my code using the service principal, it was failing with the following error

Exception: An error occurred: System.Exception: Insufficient privileges to complete the operation.at Set-KeyVaultAccessPolicy,

This has occurred even though my service principal was granted contributor access to the resource group where the key vault lives. That was the same level of permissions that my account had. Then why was it that my account was able to run Set-KeyVaultAccessPolicy but my service principal didn’t have enough privileges to do it?

It turns out that in order for a service principal to be able to create an access policy, it will need to validate the object ID for your resource against azure active directory. To fix, you can try one of the following two solutions:

Solution 1: Change the API permissions for your service principal to allow reading of the azure active directory graph.

This can be done by going to your service principal, and then select the API permissions blade. Click add permission as can be seen in the image below

Once there, scroll all the way to the bottom and select “Azure Active Directory Graph”. Then grant Directory.Read.All permissions to your application permissions options as can be seen below

This will enable your service principal to verify object ID and therefore create necessary access policies.

Solution 2: Add flag BypassObjectIdValidation to your command

This will enable your command to skip the object id validation step and create the access policy (even though this solution has worked for many people, it didn’t work for me). So your command will look something like

Set-AzKeyVaultAccessPolicy -ResourceGroupName $resourceGroupName `
                           -VaultName $keyVaultName `
                           -ObjectId $servicePrincipal.Id  `
                           -PermissionsToSecrets get,list `
                           -ErrorAction Stop `
                           -BypassObjectIdValidation

Hope one of these two solutions will help you figure out a solution to this problem.

Select Top n Records For Each Group In Python (Pandas)

Say that you have a dataframe in Pandas and you are interested in finding the top n records for each group. Depending on your need, top n can be defined based on a numeric column in your dataframe or it can simply be based on the count of occurrences for the rows in that group.

For example, suppose (god forbid) that I have a retailer store with four branches (A, B, C, D). Also, suppose that for each day, I would like to get the three branches with most number of items sold. So, I would like to take the day as a group, count the number of sold items in each branch for that day and then pick the three branches with highest number of sales. This is our problem, now, without further to say, let’s see the code for our example.

import pandas as pd
sales = {
'branch_name': [ 'A', 'A',  'B',   'B',   'C', 'C',  'C', 'C',  'D',
                        'A', 'A',  'B',   'B',   'C', 'D',  'D', 'D',  'D'],
    
'date': ['01/01/2020','01/01/2020','01/01/2020','01/01/2020','01/01/2020','01/01/2020',
         '01/01/2020','01/01/2020','01/01/2020', '02/01/2020','02/01/2020','02/01/2020',
         '02/01/2020','02/01/2020','02/01/2020','02/01/2020','02/01/2020','02/01/2020'],
    
'item_no': ['I1','I2','I3','I4','I5','I6','I7','I8','I9',
           'I10','I11','I12','I13','I14','I15','I16','I17','I18']
        
         }
sales = pd.DataFrame(sales)
sales['date'] = pd.to_datetime(sales['date'],format = '%d/%m/%Y')

Sweet, let’s look how our dataframe looks like

        branch_name 	date 	item_no
0 	   A       	2020-01-01 	I1
1 	   A       	2020-01-01 	I2
2 	   B       	2020-01-01 	I3
3 	   B       	2020-01-01 	I4
4 	   C       	2020-01-01 	I5
5 	   C       	2020-01-01 	I6
6 	   C       	2020-01-01 	I7
7 	   C       	2020-01-01 	I8
8 	   D       	2020-01-01 	I9
9 	   A       	2020-01-02 	I10
10 	   A       	2020-01-02 	I11
11 	   B       	2020-01-02 	I12
12 	   B       	2020-01-02 	I13
13 	   C       	2020-01-02 	I14
14 	   D       	2020-01-02 	I15
15 	   D       	2020-01-02 	I16
16 	   D       	2020-01-02 	I17
17 	   D       	2020-01-02 	I18

Great, from the dataframe above, you can see that for 2020-01-01, we have most sales coming from branches A, B and C whereas for 2020-01-02, we have most sales coming from branches A, B and D. Let’s write the expression to return what we just concluded with our eyes.

Solution

The solution here should be as the following:
1- We need to count the number of items sold for each day and each branch.

sales_agg = sales.groupby(['date', 'branch_name']).agg({'item_no':'nunique'}).reset_index()
print(sales_agg)
      date 	   branch_name 	item_no
0 	2020-01-01 	A 	         2
1 	2020-01-01 	B 	         2
2 	2020-01-01 	C 	         4
3 	2020-01-01 	D 	         1
4 	2020-01-02 	A 	         2
5 	2020-01-02 	B 	         2
6 	2020-01-02 	C 	         1
7 	2020-01-02 	D 	         4

2- For each day, we need to sort the branches by the number of items sold in descending order

sales_sorted = sales_agg.groupby(['date']).apply(lambda x: x.sort_values(['item_no'],ascending = False)).reset_index(drop = True)
print(sales_sorted)
 	date 	    branch_name 	item_no
0 	2020-01-01   	C 	         4
1 	2020-01-01   	A 	         2
2 	2020-01-01   	B 	         2
3 	2020-01-01   	D 	         1
4 	2020-01-02   	D 	         4
5 	2020-01-02   	A 	         2
6 	2020-01-02   	B 	         2
7 	2020-01-02   	C 	         1

3- Now for each date, we need to pick the top n records (3 in our case)

sales_sorted.groupby(['date']).head(3)
      date 	   branch_name 	item_no
0 	2020-01-01 	   C 	      4
1 	2020-01-01 	   A 	      2
2 	2020-01-01 	   B 	      2
4 	2020-01-02 	   D 	      4
5 	2020-01-02 	   A 	      2
6 	2020-01-02 	   B 	      2

Great, we are done. This has returned the top 3 branches based on total number of items sold for each day. Hope this helps you solve your own problem.

Read a CSV file stored in blob container using python in DataBricks

Le’ts say that you have a csv file, a blob container and access to a DataBricks workspace. The purpose of this mini blog is to show how easy is the process from having a file on your local computer to reading the data into databricks. I will go through the process of uploading the csv file manually to a an azure blob container and then read it in DataBricks using python code.

Step 1: Upload the file to your blob container

This can be done simply by navigating to your blob container. From there, you can click the upload button and select the file you are interested in. Once selected, you need to click the upload button that in the upload blade. See screenshot below.

Once uploaded, you will be able to see the file available in your blob container as shown below:

Step 2: Get credentials necessary for databricks to connect to your blob container

From your azure portal, you need to navigate to all resources then select your blob storage account and from under the settings select account keys. Once their, copy the key under Key1 to a local notepad.

Step 3: Configure DataBricks to read the file

Here, you need to navigate to your databricks work space (create one if you don’t have one already) and launch it. Once launched, go to workspace and create a new python notebook.

To start reading the data, first, you need to configure your spark session to use credentials for your blob container. This can simply be done through the spark.conf.set command. More precisely, we start with the following

storage_account_name = 'nameofyourstorageaccount' 
storage_account_access_key = 'thekeyfortheblobcontainer'
spark.conf.set('fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net', storage_account_access_key)


Once done, we need to build the file path in the blob container and read the file as a spark dataframe.

blob_container = 'yourblobcontainername'
filePath = "wasbs://" + blob_container + "@" + storage_account_name + ".blob.core.windows.net/Sales/SalesFile.csv"
salesDf = spark.read.format("csv").load(filePath, inferSchema = True, header = True)

And congrats, we are done. You can use the display command to have a sneak peak at our data as shown below.