Hi,
I want to run a python code on databricks notebook and return the value to my local machine.
Here is the summary:
I upload files to volumes on databricks. I generate a md5 for local file. Once the upload is finished, I create a python script with that filename locally and upload it to my workspace at databricks. Then I have a job already created with that filename in the pipe, that I execute using "databricks job" CLI command. Now the issue is, if I want to get the output of python code running on databricks, to my local computer this will close the loop but I am not able to. Can anyone point me in the right direction.
here is the snippet of the code.
---------------------------
#!/usr/bin/env python3
def execute_dbcli(my_cmd):
run_args = {"shell":True, "check":True, "capture_output":True, "text":True}
try:
subprocess.run(my_cmd, **run_args)
flag = 1
except:
flag = 0
return(flag)
#-----------------------------------------------------------
def create_md5_file(md5_ip_file,md5_op_file,ip_file):
search_text = "ip_file"
target_text = ip_file
# change in the python code locally
with open(md5_ip_file,"r") as file:
data = file.read()
data = data.replace(search_text,target_text)
with open(md5_op_file,"w") as file:
file.write(data)
return
#-----------------------------------------------------------
def check_md5(ip_file):
md5 = hashlib.md5()
with open(ip_file,'rb')as fip:
fil_has = md5
data = fip.read()
fil_has.update(data)
ip_md5 = fil_has.hexdigest()
return(ip_md5)
#-----------------------------------------------------------
import hashlib
from databricks.sdk import WorkspaceClient
import subprocess
w = WorkspaceClient()
ip_file = "Upload_Summary.csv"
loc_md5 = check_md5(ip_file)
dbfs_loc="dbfs:/Volumes/bgem_dev/wgs_live_data/test/"
my_cmd=f"databricks fs cp {ip_file} {dbfs_loc}{ip_file}"
flag_transfer = execute_dbcli(my_cmd)
if flag_transfer:
print("File:{ip_file} transferred to Databricks successfully\n")
print(f"let's work on MD5 checksum\n")
db_md5_gen = "dbfs_md5_generic.py"
db_md5_file = "dbfs_md5.py"
create_md5_file(db_md5_gen,db_md5_file,ip_file)
print(f"file ready to be transferred to Databricks for MD5 checksum\n")
# if MD5 workspace is there, delete it.
my_cmd = f"/usr/local/bin/databricks workspace list /Workspace/Users/prs0223@baylorgenetics.com/MD5"
flag_workspace = execute_dbcli(my_cmd)
if flag_workspace:
print(f"MD5 workspace exists, so delete it\n")
my_cmd = f"/usr/local/bin/databricks workspace delete /Workspace/Users/prs0223@baylorgenetics.com/MD5"
flag_workspace_delete = execute_dbcli(my_cmd)
if flag_workspace_delete:
print(f"Workspace MD5 deleted, now transfer the MD5 file and recreate workspace\n")
my_cmd = f"/usr/local/bin/databricks workspace import /Workspace/Users/prs0223@baylorgenetics.com/MD5 --file {db_md5_file} --language PYTHON"
flag_workspace_create = execute_dbcli(my_cmd)
if flag_workspace_create:
print("MD5 workspace recreated\n")
job_ID = 887420801374114
my_cmd = f"/usr/local/bin/databricks jobs run-now {job_ID}"
flag_job_run = execute_dbcli(my_cmd)
if flag_job_run:
print(f"job successful")
else:
print(f"job run not successful")
---------------------------------------------------------------