- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-19-2025 11:41 AM
My experience with Databricks pyspark up to this point has always been to execute a SQL query against existing Databricks tables, then write the resulting pyspark dataframe into a new table.
For the first time, I am now getting data via an API which returns a large XML blob containing a table of data. I convert the XML object to a BeautifulSoup object and extract the table data into a Pandas data frame. That takes a couple of minutes, not so bad. Then I convert the Pandas data frame to a pyspark data frame. That takes about a minute. Then I issue a command to write that pyspark dataframe to a table, and it takes... TWO HOURS.
Now, the dataframe has about 130K rows with 39 columns. Two hours to write 130K rows? What is going on?
By contrast, yesterday I developed a pyspark script saving a dataframe created from a pyspark SQL command. The dataframe had about 180K rows, and the entire job ran in six minutes.
Why does a pyspark dataframe created from a pandas dataframe created from a beautifulsoup object take two hours to write to a table, while a similar sized pyspark dataframe created from a pyspark query takes only six minutes from running the query all the way to writing the table?
- Labels:
-
Delta Lake
-
Spark
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-20-2025 03:55 AM
I can see here a few points to improve:
- Spark can process xml for instance: https://docs.databricks.com/aws/en/query/formats/xml
- You can convert the data list to DataFrame (skip conversion to Pandas) and provide the schema as second parameter so you don't need to iterate throw columns.
- remove secrete keys from the code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-19-2025 01:00 PM
Can you share the code? You need to remember that spark uses lazy evaluation so it can give you impression that code works fast and saving works slowly because a code is executed when you it hit an action.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-20-2025 03:19 AM
Yes, I've read about the lazy evaluation. I've been experimenting with different commands, trying to get it to execute some kind of action. But so far, nothing's worked. I put display statements throughout to report how long each step is taking. Attached is the notebook cell with the code. I have a cell below it which simply executes the main function, "entrypoint". Also attached is a picture of the output from my latest test run.
import re, json, sys
import pandas as pd
import numpy as np
import lxml
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
import requests
from datetime import datetime, timedelta, date
from dateutil import tz
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import * # Import all Functions
from pyspark.sql.types import * # Import all Types
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark.storagelevel import StorageLevel
import time
def entrypoint():
_odate = datetime.today().astimezone(tz.gettz('America/New_York')).strftime('%Y-%m-%d')
spark = SparkSession.builder.getOrCreate()
def count_number_rows(tabela_count):
df_table_exist = spark.catalog.tableExists(tabela_count)
if df_table_exist:
return spark.table(tabela_count).count()
else:
return 0
## -----------------------------------------------------------------------------------------
## Capture tags
## -----------------------------------------------------------------------------------------
def get_tags_from_cluster_spark_cfg() -> dict:
all_tags = {}
for tag in json.loads(spark.conf.get("spark.databricks.clusterUsageTags.clusterAllTags")):
all_tags[tag['key']] = tag['value']
return all_tags
all_tags = get_tags_from_cluster_spark_cfg()
env = all_tags.get('env').lower()
if (env == ""): env="dev"
# -----------------------------------------------------------------------------------------
# Function to create the table
# -----------------------------------------------------------------------------------------
def create_table(df_name,_odate,default_out_catalog,env,output_schema,table_name):
# Define output table name
target_out_schema = f"{default_out_catalog}_{env}.{output_schema}"
output_table = f"{target_out_schema}.{table_name}"
sep_print = "x=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-x"
msg = f"{sep_print}\n{output_table} - {_odate}"
print(msg)
# Transform all columns to lower case
for col in df_name.columns:
df_name = df_name.withColumnRenamed(col, col.lower())
# check if the dataframe is empty
df_count = df_name.count()
if (df_count == 0):
msg = f"{_odate}: ERROR table {output_table}: dataframe is empty!"
print(f"{msg}\n{sep_print}")
# create schema if it does not exist
sql_target_schema = 'CREATE SCHEMA IF NOT EXISTS ' + target_out_schema
spark.sql(sql_target_schema)
df_name.write\
.mode("overwrite")\
.option("overwriteSchema", "true")\
.saveAsTable(output_table)
# check if table was created successfully and has more than zero rows
table_created_count = count_number_rows(output_table)
if table_created_count > 0:
msg = f"{_odate}: Table {output_table} created successfully with {table_created_count} rows."
else:
msg = f"{_odate}: Error: Table {output_table} is empty"
print(f"{msg}\n{sep_print}")
sys.exit(msg)
print(f"{msg}\n{sep_print}")
#Generate user access token to connect to the API
url = "https://gerdau-scp-dev-neo-br.apimanagement.br1.hana.ondemand.com/v2/OAuthService/GenerateToken"
payload = {'client_id': 'oCp7NYd5R9BinYAEj8ujk9ISRVnFYe5e',
'client_secret': '4Nnucf2sGxZ0d3w7',
'grant_type': 'client_credentials'
}
header = {'Content-Type': "application/x-www-form-urlencoded"}
token_response = requests.post(url, data = payload, headers = header)
accessToken = token_response.json()
mytoken = accessToken['access_token']
today_date = datetime.today().strftime('%Y-%m-%d')
yesterday_date = (datetime.today() - timedelta(1)).strftime('%Y-%m-%d')
url = "https://gerdau-scp-dev-neo-br.apimanagement.br1.hana.ondemand.com/v1/get-io-nodes"
payload = """<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:urn=\"urn:sap-com:document:sap:rfc:functions\">
<soapenv:Header/>
<soapenv:Body>
<urn:BAPI_PPDSSRVAPS_GET_IO_NODES>
<EXCLUDE_EXPORT_FLAGS>
<NO_IO_ELEMENTS>X</NO_IO_ELEMENTS>
<NO_OUTPUT_NODES>X</NO_OUTPUT_NODES>
<NO_PEG_INPUT>X</NO_PEG_INPUT>
<NO_PEG_OUTPUT>X</NO_PEG_OUTPUT>
<NO_CFG_DATA>X</NO_CFG_DATA>
<NO_DESCRIPTIVE_CHARS>X</NO_DESCRIPTIVE_CHARS>
<NO_DUPLICATES>X</NO_DUPLICATES>
</EXCLUDE_EXPORT_FLAGS>
<INPUT_NODES>
<item>
<ORDID></ORDID>
<POSITION_NO></POSITION_NO>
<LINE_NO></LINE_NO>
<ACTID></ACTID>
<PRODUCT></PRODUCT>
<PRODUCT_BSG></PRODUCT_BSG>
<LOCATION></LOCATION>
<LOCTYPE></LOCTYPE>
<LOCATION_BSG></LOCATION_BSG>
<STORAGE_LOCATION></STORAGE_LOCATION>
<SPEC_STOCK_IND></SPEC_STOCK_IND>
<SPEC_STOCK_KEY></SPEC_STOCK_KEY>
<SPEC_STOCK_EXT></SPEC_STOCK_EXT>
<SPEC_STOCK_LOGSYS></SPEC_STOCK_LOGSYS>
<BASE_UOM></BASE_UOM>
<BASE_UOM_ISO></BASE_UOM_ISO>
<ATPCAT></ATPCAT>
<CATEGORY_TYPE></CATEGORY_TYPE>
<CATEGORY_NAME></CATEGORY_NAME>
<QUANTITY></QUANTITY>
<REAL_QUANTITY></REAL_QUANTITY>
<ORIGINAL_QUANTITY></ORIGINAL_QUANTITY>
<CONF_QUANTITY></CONF_QUANTITY>
<IS_MASTER_FLG></IS_MASTER_FLG>
<IO_TIME></IO_TIME>
<IGNORE_PEGGING></IGNORE_PEGGING>
<EXT_FIXED></EXT_FIXED>
<CONTI_SCHEDULING></CONTI_SCHEDULING>
<OFFSET_VALUE></OFFSET_VALUE>
<OFFSET_RELATION></OFFSET_RELATION>
<OFFSET_IN_PERCENT></OFFSET_IN_PERCENT>
<SCHEDULE_OFFSET></SCHEDULE_OFFSET>
<IO_END_TIME></IO_END_TIME>
<HAS_CHARACTS></HAS_CHARACTS>
<VERSION></VERSION>
<ATPCMPSTAT></ATPCMPSTAT>
<POSEX></POSEX>
<IS_RPM_FLAG></IS_RPM_FLAG>
<IONODE_TYPE></IONODE_TYPE>
<PARENT_POS></PARENT_POS>
</item>
</INPUT_NODES>
<LOCATION_SELECTION>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1300</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1301</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1302</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1321</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1323</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1327</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1330</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1332</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1333</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>1334</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>2790</LOW>
<HIGH>2791</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>2796</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>EQ</OPTION>
<LOW>2812</LOW>
<HIGH></HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>2860</LOW>
<HIGH>2869</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>4786</LOW>
<HIGH>4788</HIGH>
</item>
</LOCATION_SELECTION>
<LOGICAL_SYSTEM>SG2CLNT056</LOGICAL_SYSTEM>
<OBJECT_HEADERS>
<item>
<METHOD></METHOD>
<PARENT_ORDID></PARENT_ORDID>
<PARENT_ORDER_SYSTEM></PARENT_ORDER_SYSTEM>
<PARENT_ORDER_NUMBER></PARENT_ORDER_NUMBER>
<PARENT_ORDER_TYPE></PARENT_ORDER_TYPE>
<ORDID></ORDID>
<ORDER_SYSTEM></ORDER_SYSTEM>
<ORDER_NUMBER></ORDER_NUMBER>
<ORDER_TYPE></ORDER_TYPE>
<INT_ORDER_TYPE></INT_ORDER_TYPE>
<START_EFFECTIVITY></START_EFFECTIVITY>
<END_EFFECTIVITY></END_EFFECTIVITY>
<PLANNING_TYPE></PLANNING_TYPE>
<PRIORITY></PRIORITY>
<ORDER_START_DATE></ORDER_START_DATE>
<ORDER_END_DATE></ORDER_END_DATE>
<PROCUREMENT_TYPE></PROCUREMENT_TYPE>
<PROD_VERSION></PROD_VERSION>
<PRODUCABLE></PRODUCABLE>
<INPUT_FIXED></INPUT_FIXED>
<OUTPUT_FIXED></OUTPUT_FIXED>
<LOCATION></LOCATION>
<LOCTYPE></LOCTYPE>
<LOCATION_BSG></LOCATION_BSG>
<PLAN_NUMBER></PLAN_NUMBER>
<PPM_NUMBER></PPM_NUMBER>
<PPM_BSG></PPM_BSG>
<PART_DELIVERED></PART_DELIVERED>
<FINAL_DELIVERY></FINAL_DELIVERY>
<STARTED></STARTED>
<PART_CONFIRMED></PART_CONFIRMED>
<FINAL_CONFIRMED></FINAL_CONFIRMED>
<RELEASED></RELEASED>
<APPLI></APPLI>
<CREATION_TIME></CREATION_TIME>
<EXPL_DATE></EXPL_DATE>
<PLANNING_STATUS></PLANNING_STATUS>
<SOURCE_TYPE></SOURCE_TYPE>
<SOURCE_NAME></SOURCE_NAME>
</item>
</OBJECT_HEADERS>
<PLANNING_VERSION>000</PLANNING_VERSION>
<PRODUCT_SELECTION>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>000000000106000000</LOW>
<HIGH>000000000106999999</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>000000000109000000</LOW>
<HIGH>000000000109999999</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>000000000110000000</LOW>
<HIGH>000000000110999999</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>000000000205000000</LOW>
<HIGH>000000000205999999</HIGH>
</item>
<item>
<SIGN>I</SIGN>
<OPTION>BT</OPTION>
<LOW>000000000207000000</LOW>
<HIGH>000000000207999999</HIGH>
</item>
</PRODUCT_SELECTION>
<!--Optional:-->
<RETURN>
<!--Zero or more repetitions:-->
<item>
<TYPE></TYPE>
<ID></ID>
<NUMBER></NUMBER>
<MESSAGE></MESSAGE>
<LOG_NO></LOG_NO>
<LOG_MSG_NO></LOG_MSG_NO>
<MESSAGE_V1></MESSAGE_V1>
<MESSAGE_V2></MESSAGE_V2>
<MESSAGE_V3></MESSAGE_V3>
<MESSAGE_V4></MESSAGE_V4>
<PARAMETER></PARAMETER>
<ROW></ROW>
<FIELD></FIELD>
<SYSTEM></SYSTEM>
</item>
</RETURN>
</urn:BAPI_PPDSSRVAPS_GET_IO_NODES>
</soapenv:Body>
</soapenv:Envelope>
"""
header = {
'APIKey': 'oCp7NYd5R9BinYAEj8ujk9ISRVnFYe5e',
'Content-Type': 'text/xml',
'Authorization': 'Bearer ' + mytoken
}
print("Starting get-io-nodes Request...")
start_time = time.time()
response = requests.request("POST", url, headers=header, data=payload)
print(f"get-io-nodes Request took {time.time() - start_time} seconds")
print("Creating BeautifulSoup object from XML Response...")
start_time = time.time()
#convert XML response to navigable object
soup = BeautifulSoup(response.text, 'xml')
print(f"BeautifulSoup object created in {time.time() - start_time} seconds")
start_time = time.time()
#First extract INPUT_NODES table
data = []
input_nodes = soup.find('INPUT_NODES')
for child in input_nodes.children:
record = {}
record['ORDID'] = child.find('ORDID').text
record['POSITION_NO'] = child.find('POSITION_NO').text
record['LINE_NO'] = child.find('LINE_NO').text
record['ACTID'] = child.find('ACTID').text
record['PRODUCT'] = child.find('PRODUCT').text
record['PRODUCT_BSG'] = child.find('PRODUCT_BSG').text
record['LOCATION'] = child.find('LOCATION').text
record['LOCTYPE'] = child.find('LOCTYPE').text
record['LOCATION_BSG'] = child.find('LOCATION_BSG').text
record['STORAGE_LOCATION'] = child.find('STORAGE_LOCATION').text
record['SPEC_STOCK_IND'] = child.find('SPEC_STOCK_IND').text
record['SPEC_STOCK_KEY'] = child.find('SPEC_STOCK_KEY').text
record['SPEC_STOCK_EXT'] = child.find('SPEC_STOCK_EXT').text
record['SPEC_STOCK_LOGSYS'] = child.find('SPEC_STOCK_LOGSYS').text
record['BASE_UOM'] = child.find('BASE_UOM').text
record['BASE_UOM_ISO'] = child.find('BASE_UOM_ISO').text
record['ATPCAT'] = child.find('ATPCAT').text
record['CATEGORY_TYPE'] = child.find('CATEGORY_TYPE').text
record['CATEGORY_NAME'] = child.find('CATEGORY_NAME').text
record['QUANTITY'] = child.find('QUANTITY').text
record['REAL_QUANTITY'] = child.find('REAL_QUANTITY').text
record['ORIGINAL_QUANTITY'] = child.find('ORIGINAL_QUANTITY').text
record['CONF_QUANTITY'] = child.find('CONF_QUANTITY').text
record['IS_MASTER_FLG'] = child.find('IS_MASTER_FLG').text
record['IO_TIME'] = child.find('IO_TIME').text
record['IGNORE_PEGGING'] = child.find('IGNORE_PEGGING').text
record['EXT_FIXED'] = child.find('EXT_FIXED').text
record['CONTI_SCHEDULING'] = child.find('CONTI_SCHEDULING').text
record['OFFSET_VALUE'] = child.find('OFFSET_VALUE').text
record['OFFSET_RELATION'] = child.find('OFFSET_RELATION').text
record['OFFSET_IN_PERCENT'] = child.find('OFFSET_IN_PERCENT').text
record['SCHEDULE_OFFSET'] = child.find('SCHEDULE_OFFSET').text
record['IO_END_TIME'] = child.find('IO_END_TIME').text
record['HAS_CHARACTS'] = child.find('HAS_CHARACTS').text
record['VERSION'] = child.find('VERSION').text
record['ATPCMPSTAT'] = child.find('ATPCMPSTAT').text
record['POSEX'] = child.find('POSEX').text
record['IS_RPM_FLAG'] = child.find('IS_RPM_FLAG').text
record['IONODE_TYPE'] = child.find('IONODE_TYPE').text
record['PARENT_POS'] = child.find('PARENT_POS').text
data.append(record)
#convert array of dictionaries into a Pandas dataframe
df = pd.DataFrame(data)
#Convert Pandas dataframe to spark dataframe
spark_df = spark.createDataFrame(df)
print(f"INPUT_NODES Dataframe created in {time.time() - start_time} seconds")
start_time = time.time()
#define the schema for the datatypes
schema = StructType([
StructField("ORDID", StringType(), True),
StructField("POSITION_NO", StringType(), True),
StructField("LINE_NO", StringType(), True),
StructField("ACTID", StringType(), True),
StructField("PRODUCT", StringType(), True),
StructField("PRODUCT_BSG", StringType(), True),
StructField("LOCATION", StringType(), True),
StructField("LOCTYPE", StringType(), True),
StructField("LOCATION_BSG", StringType(), True),
StructField("STORAGE_LOCATION", StringType(), True),
StructField("SPEC_STOCK_IND", StringType(), True),
StructField("SPEC_STOCK_KEY", StringType(), True),
StructField("SPEC_STOCK_EXT", StringType(), True),
StructField("SPEC_STOCK_LOGSYS", StringType(), True),
StructField("BASE_UOM", StringType(), True),
StructField("BASE_UOM_ISO", StringType(), True),
StructField("ATPCAT", StringType(), True),
StructField("CATEGORY_TYPE", StringType(), True),
StructField("CATEGORY_NAME", StringType(), True),
StructField("QUANTITY", DecimalType(23, 3), True),
StructField("REAL_QUANTITY", DecimalType(23, 3), True),
StructField("ORIGINAL_QUANTITY", DecimalType(23, 3), True),
StructField("CONF_QUANTITY", DecimalType(23, 3), True),
StructField("IS_MASTER_FLG", StringType(), True),
StructField("IO_TIME", LongType(), True),
StructField("IGNORE_PEGGING", StringType(), True),
StructField("EXT_FIXED", StringType(), True),
StructField("CONTI_SCHEDULING", IntegerType(), True),
StructField("OFFSET_VALUE", IntegerType(), True),
StructField("OFFSET_RELATION", IntegerType(), True),
StructField("OFFSET_IN_PERCENT", StringType(), True),
StructField("SCHEDULE_OFFSET", StringType(), True),
StructField("IO_END_TIME", LongType(), True),
StructField("HAS_CHARACTS", IntegerType(), True),
StructField("VERSION", StringType(), True),
StructField("ATPCMPSTAT", IntegerType(), True),
StructField("POSEX", StringType(), True),
StructField("IS_RPM_FLAG", StringType(), True),
StructField("IONODE_TYPE", IntegerType(), True),
StructField("PARENT_POS", StringType(), True)
])
for field in schema.fields:
spark_df = spark_df.withColumn(field.name, col(field.name).cast(field.dataType))
default_out_catalog = "corporate_glndatawarehouse_refined"
default_out_schema = "robinhood_staging"
default_out_table = 'tb_input_nodes'
#create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table,False)
create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table)
print(f"INPUT_NODES table write completed in {time.time() - start_time} seconds")
start_time = time.time()
#Second extract OBJECT_HEADERS table
data = []
object_headers = soup.find('OBJECT_HEADERS')
for child in object_headers.children:
record = {}
record['method'] = child.find('METHOD').text
record['parent_ordid'] = child.find('PARENT_ORDID').text
record['parent_order_system'] = child.find('PARENT_ORDER_SYSTEM').text
record['parent_order_number'] = child.find('PARENT_ORDER_NUMBER').text
record['parent_order_type'] = child.find('PARENT_ORDER_TYPE').text
record['ordid'] = child.find('ORDID').text
record['order_system'] = child.find('ORDER_SYSTEM').text
record['order_number'] = child.find('ORDER_NUMBER').text
record['order_type'] = child.find('ORDER_TYPE').text
record['int_order_type'] = child.find('INT_ORDER_TYPE').text
record['start_effectivity'] = child.find('START_EFFECTIVITY').text
record['end_effectivity'] = child.find('END_EFFECTIVITY').text
record['planning_type'] = child.find('PLANNING_TYPE').text
record['priority'] = child.find('PRIORITY').text
record['order_start_date'] = child.find('ORDER_START_DATE').text
record['order_end_date'] = child.find('ORDER_END_DATE').text
record['procurement_type'] = child.find('PROCUREMENT_TYPE').text
record['prod_version'] = child.find('PROD_VERSION').text
record['producable'] = child.find('PRODUCABLE').text
record['input_fixed'] = child.find('INPUT_FIXED').text
record['output_fixed'] = child.find('OUTPUT_FIXED').text
record['location'] = child.find('LOCATION').text
record['loctype'] = child.find('LOCTYPE').text
record['location_bsg'] = child.find('LOCATION_BSG').text
record['plan_number'] = child.find('PLAN_NUMBER').text
record['ppm_number'] = child.find('PPM_NUMBER').text
record['ppm_bsg'] = child.find('PPM_BSG').text
record['part_delivered'] = child.find('PART_DELIVERED').text
record['final_delivery'] = child.find('FINAL_DELIVERY').text
record['started'] = child.find('STARTED').text
record['part_confirmed'] = child.find('PART_CONFIRMED').text
record['final_confirmed'] = child.find('FINAL_CONFIRMED').text
record['released'] = child.find('RELEASED').text
record['appli'] = child.find('APPLI').text
record['creation_time'] = child.find('CREATION_TIME').text
record['expl_date'] = child.find('EXPL_DATE').text
record['planning_status'] = child.find('PLANNING_STATUS').text
record['source_type'] = child.find('SOURCE_TYPE').text
record['source_name'] = child.find('SOURCE_NAME').text
data.append(record)
#convert array of dictionaries into a Pandas dataframe
df = pd.DataFrame(data)
#Convert Pandas dataframe to spark dataframe
spark_df = spark.createDataFrame(df)
print(f"OBJECT_HEADERS Dataframe created in {time.time() - start_time} seconds")
start_time = time.time()
#define the schema for the datatypes
schema = StructType([
StructField('method', StringType(), True),
StructField('parent_ordid', StringType(), True),
StructField('parent_order_system', StringType(), True),
StructField('parent_order_number', StringType(), True),
StructField('parent_order_type', StringType(), True),
StructField('ordid', StringType(), True),
StructField('order_system', StringType(), True),
StructField('order_number', StringType(), True),
StructField('order_type', StringType(), True),
StructField('int_order_type', StringType(), True),
StructField('start_effectivity', StringType(), True),
StructField('end_effectivity', StringType(), True),
StructField('planning_type', StringType(), True),
StructField('priority', StringType(), True),
StructField('order_start_date', StringType(), True),
StructField('order_end_date', StringType(), True),
StructField('procurement_type', StringType(), True),
StructField('prod_version', StringType(), True),
StructField('producable', StringType(), True),
StructField('input_fixed', StringType(), True),
StructField('output_fixed', StringType(), True),
StructField('location', StringType(), True),
StructField('loctype', StringType(), True),
StructField('location_bsg', StringType(), True),
StructField('plan_number', StringType(), True),
StructField('ppm_number', StringType(), True),
StructField('ppm_bsg', StringType(), True),
StructField('part_delivered', StringType(), True),
StructField('final_delivery', StringType(), True),
StructField('started', StringType(), True),
StructField('part_confirmed', StringType(), True),
StructField('final_confirmed', StringType(), True),
StructField('released', StringType(), True),
StructField('appli', StringType(), True),
StructField('creation_time', StringType(), True),
StructField('expl_date', StringType(), True),
StructField('planning_status', StringType(), True),
StructField('source_type', StringType(), True),
StructField('source_name', StringType(), True)
])
for field in schema.fields:
spark_df = spark_df.withColumn(field.name, col(field.name).cast(field.dataType))
default_out_table = 'tb_object_headers'
create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table)
print(f"OBJECT_HEADERS table write completed in {time.time() - start_time} seconds")
start_time = time.time()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-20-2025 03:55 AM
I can see here a few points to improve:
- Spark can process xml for instance: https://docs.databricks.com/aws/en/query/formats/xml
- You can convert the data list to DataFrame (skip conversion to Pandas) and provide the schema as second parameter so you don't need to iterate throw columns.
- remove secrete keys from the code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-21-2025 10:18 AM
After reading the suggested documentation, I tried using the "Parse nested XML (from_xml and schema_of_xml)". I used this code from the doc:
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
and it worked great. But I don't understand what the number 8 signifies in that first argument to the createDataFrame command. I replaced it with different numbers while experimenting and it didn't seem to matter what number I put in there, so I left it as a 1. Once I saw the printed schema, I was then able to code a pyspark command to pull out the columns of a table. But I ended up with a one row dataframe where each column contained an array of all 100K values. So then I added an explode in front of each column, and that appeared to give me a dataframe that looked like a proper table. But when I tried to write it to a delta table, I got "The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.". So I removed the write and instead just put in this command, to see if I had a dataframe with any sensible data in it:

