In my last article, I briefly explained what Delta Live Tables is. Once you have your pipeline all set, you might want to limit the time it’s running.
In this article, I want to explain firstly why you might want to schedule your (continuous) pipeline and how to do it.
Starting with the why?
If you've been searching for this, you likely have a significant reason already. If not, here are some advantages of setting up a pipeline trigger.
For many businesses, especially here in Europe, Sunday is a complete off-day. So the pipeline should not be up and running 24/7.
If you have a triggered DLT (not continuous), then you can simply use the Databricks UI.
Firstly, when we run the notebook, we need to check what the current time is. So we start off with some imports for the API Call and Datetime.
import requests
import json
from datetime import datetime
from dateutil import tz
desired_timezone = 'Europe/Berlin'
current_time = datetime.now(tz.gettz(desired_timezone))
current_hour = current_time.hour
In the next step we want an authorization_header and a pipeline_id which holds the id of the Databricks pipeline that you want to interact with. You can get the authorization from a access token, which you can find under settings.
The authorization token is needed to authenticate requests to the Databricks API. It's structured as a dictionary with a key "Authorization" and a value that includes the word "Bearer" followed by the actual authorization token.
authorization_header = {"Authorization": "Bearer your_auth"}
pipeline_id = "your_pipeline_id"
def generic_post_same_workspace(headers, body, relative_path):
url = f"https://{spark.conf.get('spark.databricks.workspaceUrl')}{relative_path}"
response = requests.post(url, json= body, headers = headers)
return response.status_code, response.json()
def start_pipeline(pipeline_id, authorization_header):
relative_path = f"/api/2.0/pipelines/{pipeline_id}/updates"
body = {
"full_refresh": False,
"cause": "API_CALL"
}
return generic_post_same_workspace(authorization_header, body, relative_path)
def stop_pipeline(pipeline_id, authorization_header):
relative_path = f"/api/2.0/pipelines/{pipeline_id}/stop"
body = {}
return generic_post_same_workspace(authorization_header, body, relative_path)
Now we want to trigger the function based on time. I simply want my pipeline to start around 9 am and stop around 9 pm.
if current_hour >= 21:
stop_pipeline(pipeline_id, authorization_header)
print(f"Current hour is {current_hour}: starting the pipeline...")
elif current_hour >= 9:
start_pipeline(pipeline_id, authorization_header)
print(f"Current hour is {current_hour}: starting the pipeline...")
else:
print("Skipping pipeline call...")
Now all you need to do is schedule this notebook for 9 am and 9 pm every day (except Sundays in this case), and you’re done. At 9 am, the notebook runs, goes into the "elif" statement, and starts the pipeline. It does not matter if the notebook is rerun again; it simply checks the time and makes a decision accordingly.
There are obviously more ways to do this. You could create separate jobs and notebooks for stopping and starting the pipeline and trigger them based on specific times.
I hope I could help you save some $$ for your business by reducing the time the pipeline is online.