Run Same Databricks Notebook for Multiple Times In Parallel (Concurrently) Using Python

In this blog, I would like to discuss how you will be able to use Python to run a databricks notebook for multiple times in a parallel fashion. Noting that the whole purpose of a service like databricks is to execute code on multiple nodes called the workers in parallel fashion. But there are times where you need to implement your own parallelism logic to fit your needs.

To follow along, you need to have databricks workspace, create a databricks cluster and two notebooks. The parent notebook orchestrates the parallelism process and the child notebook will be executed in parallel fashion. The idea would be that the parent notebook will pass along a parameter for the child notebook and the child notebook will use that parameter and execute a given task. Without further to say, let’s get to it.

For simplicity let’s design a child notebook that takes a number as an input and then print the multiplication of this number by 10. Also, to make sure that we test our parallelism logic, we will introduce 20 seconds sleep time for our child notebook. So open up your child notebook and enter the following code in Command 1 (this code will help us pass a parameter from the parent notebook).

dbutils.widgets.text("numberToProcess","","")
dbutils.widgets.get("numberToProcess")
numberToProcess = int(getArgument("numberToProcess"))

Open up a new command in child notebook and enter the following code which will calculate the 10 multiplier for our number of interest, introduce a sleep time of 20 seconds and then print the output

import time
outputNumber = numberToProcess * 10
time.sleep(20) # sleep for 20 seconds 
print('The desired output is {}'.format(outputNumber))

So your child notebook will look something like this

Now that you have your child notebook setup, go to your parent notebook and paste the following code to import the multithreading packages

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

Then we define the function that will execute a child notebook passing in the number parameter

def processAnIntegerNumber(numberToProcess):
  dbutils.notebook.run(path = "/Shared/ChildNotebook",
                                        timeout_seconds = 300, 
                                        arguments = {"numberToProcess":numberToProcess})

Finally, the magic command, which will take a list of numbers, spin up a multithreading executor and then map the list of numbers to that executor. This command will execute the childnotebook instance for 5 times (because the list of numbers contains five numbers) in a parallel fashion.

listOfNumbers = [87,90,12,34,78]
with ThreadPoolExecutor() as executor:
  results = executor.map(processAnIntegerNumber, listOfNumbers)

Executing the parent notebook, you will notice that 5 databricks jobs will run concurrently each one of these jobs will execute the child notebook with one of the numbers in the list. This is a snapshot of the parent notebook after execution

Notice how the overall time to execute the five jobs is about 40 seconds. Now, if we open the link to any of these jobs, we will notice that the time was also about 32 seconds indicating that jobs were run in parallel

Hope this blog helps you run your jobs faster and satisfies your “Need for Speed”.

You can follow this blog for Machine learning, data engineering, devops, dynamics and power apps tips and tricks.

Select Top n Records For Each Group In Python (Pandas)

Say that you have a dataframe in Pandas and you are interested in finding the top n records for each group. Depending on your need, top n can be defined based on a numeric column in your dataframe or it can simply be based on the count of occurrences for the rows in that group.

For example, suppose (god forbid) that I have a retailer store with four branches (A, B, C, D). Also, suppose that for each day, I would like to get the three branches with most number of items sold. So, I would like to take the day as a group, count the number of sold items in each branch for that day and then pick the three branches with highest number of sales. This is our problem, now, without further to say, let’s see the code for our example.

import pandas as pd
sales = {
'branch_name': [ 'A', 'A',  'B',   'B',   'C', 'C',  'C', 'C',  'D',
                        'A', 'A',  'B',   'B',   'C', 'D',  'D', 'D',  'D'],
    
'date': ['01/01/2020','01/01/2020','01/01/2020','01/01/2020','01/01/2020','01/01/2020',
         '01/01/2020','01/01/2020','01/01/2020', '02/01/2020','02/01/2020','02/01/2020',
         '02/01/2020','02/01/2020','02/01/2020','02/01/2020','02/01/2020','02/01/2020'],
    
'item_no': ['I1','I2','I3','I4','I5','I6','I7','I8','I9',
           'I10','I11','I12','I13','I14','I15','I16','I17','I18']
        
         }
sales = pd.DataFrame(sales)
sales['date'] = pd.to_datetime(sales['date'],format = '%d/%m/%Y')

Sweet, let’s look how our dataframe looks like

        branch_name 	date 	item_no
0 	   A       	2020-01-01 	I1
1 	   A       	2020-01-01 	I2
2 	   B       	2020-01-01 	I3
3 	   B       	2020-01-01 	I4
4 	   C       	2020-01-01 	I5
5 	   C       	2020-01-01 	I6
6 	   C       	2020-01-01 	I7
7 	   C       	2020-01-01 	I8
8 	   D       	2020-01-01 	I9
9 	   A       	2020-01-02 	I10
10 	   A       	2020-01-02 	I11
11 	   B       	2020-01-02 	I12
12 	   B       	2020-01-02 	I13
13 	   C       	2020-01-02 	I14
14 	   D       	2020-01-02 	I15
15 	   D       	2020-01-02 	I16
16 	   D       	2020-01-02 	I17
17 	   D       	2020-01-02 	I18

Great, from the dataframe above, you can see that for 2020-01-01, we have most sales coming from branches A, B and C whereas for 2020-01-02, we have most sales coming from branches A, B and D. Let’s write the expression to return what we just concluded with our eyes.

Solution

The solution here should be as the following:
1- We need to count the number of items sold for each day and each branch.

sales_agg = sales.groupby(['date', 'branch_name']).agg({'item_no':'nunique'}).reset_index()
print(sales_agg)
      date 	   branch_name 	item_no
0 	2020-01-01 	A 	         2
1 	2020-01-01 	B 	         2
2 	2020-01-01 	C 	         4
3 	2020-01-01 	D 	         1
4 	2020-01-02 	A 	         2
5 	2020-01-02 	B 	         2
6 	2020-01-02 	C 	         1
7 	2020-01-02 	D 	         4

2- For each day, we need to sort the branches by the number of items sold in descending order

sales_sorted = sales_agg.groupby(['date']).apply(lambda x: x.sort_values(['item_no'],ascending = False)).reset_index(drop = True)
print(sales_sorted)
 	date 	    branch_name 	item_no
0 	2020-01-01   	C 	         4
1 	2020-01-01   	A 	         2
2 	2020-01-01   	B 	         2
3 	2020-01-01   	D 	         1
4 	2020-01-02   	D 	         4
5 	2020-01-02   	A 	         2
6 	2020-01-02   	B 	         2
7 	2020-01-02   	C 	         1

3- Now for each date, we need to pick the top n records (3 in our case)

sales_sorted.groupby(['date']).head(3)
      date 	   branch_name 	item_no
0 	2020-01-01 	   C 	      4
1 	2020-01-01 	   A 	      2
2 	2020-01-01 	   B 	      2
4 	2020-01-02 	   D 	      4
5 	2020-01-02 	   A 	      2
6 	2020-01-02 	   B 	      2

Great, we are done. This has returned the top 3 branches based on total number of items sold for each day. Hope this helps you solve your own problem.