cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Writing a small pyspark dataframe to a table is taking a very long time

philHarasz
New Contributor III

My experience with Databricks pyspark up to this point has always been to execute a SQL query against existing Databricks tables, then write the resulting pyspark dataframe into a new table. 

For the first time, I am now getting data via an API which returns a large XML blob containing a table of data. I convert the XML object to a BeautifulSoup object and extract the table data into a Pandas data frame. That takes a couple of minutes, not so bad. Then I convert the Pandas data frame to a pyspark data frame. That takes about a minute. Then I issue a command to write that pyspark dataframe to a table, and it takes... TWO HOURS.

Now, the dataframe has about 130K rows with 39 columns. Two hours to write 130K rows? What is going on?

By contrast, yesterday I developed a pyspark script saving a dataframe created from a pyspark SQL command. The dataframe had about 180K rows, and the entire job ran in six minutes. 

Why does a pyspark dataframe created from a pandas dataframe created from a beautifulsoup object take two hours to write to a table, while a similar sized pyspark dataframe created from a pyspark query takes only six minutes from running the query all the way to writing the table?

 

1 ACCEPTED SOLUTION

Accepted Solutions

MariuszK
Contributor III

I can see here a few points to improve:

- Spark can process xml for instance: https://docs.databricks.com/aws/en/query/formats/xml

- You can convert the data list to DataFrame (skip conversion to Pandas) and provide the schema as second parameter so you don't need to iterate throw columns.

- remove secrete keys from the code.

View solution in original post

4 REPLIES 4

MariuszK
Contributor III

Can you share the code? You need to remember that spark uses lazy evaluation so it can give you impression that code works fast and saving works slowly because a code is executed when you it hit an action.

philHarasz
New Contributor III

Yes, I've read about the lazy evaluation. I've been experimenting with different commands, trying to get it to execute some kind of action. But so far, nothing's worked. I put display statements throughout to report how long each step is taking. Attached is the notebook cell with the code. I have a cell below it which simply executes the main function, "entrypoint". Also attached is a picture of the output from my latest test run.

import re, json, sys
import pandas as pd
import numpy as np
import lxml
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
import requests
from datetime import datetime, timedelta, date
from dateutil import tz
from dateutil.relativedelta import relativedelta
from pyspark.sql.functions import *                           # Import all Functions
from pyspark.sql.types import *                               # Import all Types
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark.storagelevel import StorageLevel
import time

def entrypoint():
    _odate = datetime.today().astimezone(tz.gettz('America/New_York')).strftime('%Y-%m-%d')

    spark = SparkSession.builder.getOrCreate()

    def count_number_rows(tabela_count):
        df_table_exist = spark.catalog.tableExists(tabela_count)
        if df_table_exist:
            return spark.table(tabela_count).count()
        else:
            return 0

    ## -----------------------------------------------------------------------------------------
    ## Capture tags
    ## -----------------------------------------------------------------------------------------
    def get_tags_from_cluster_spark_cfg() -> dict:
        all_tags = {}
        for tag in json.loads(spark.conf.get("spark.databricks.clusterUsageTags.clusterAllTags")):
            all_tags[tag['key']] = tag['value']
        return all_tags

    all_tags = get_tags_from_cluster_spark_cfg()
    env = all_tags.get('env').lower()
    if (env == ""): env="dev"

    # -----------------------------------------------------------------------------------------
    # Function to create the table 
    # -----------------------------------------------------------------------------------------
    def create_table(df_name,_odate,default_out_catalog,env,output_schema,table_name):
        # Define output table name
        target_out_schema = f"{default_out_catalog}_{env}.{output_schema}"
        output_table = f"{target_out_schema}.{table_name}"
        sep_print = "x=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-x"
        msg = f"{sep_print}\n{output_table} - {_odate}"
        print(msg)
        # Transform all columns to lower case
        for col in df_name.columns:
            df_name = df_name.withColumnRenamed(col, col.lower())
        # check if the dataframe is empty
        df_count = df_name.count()
        if (df_count == 0):
            msg = f"{_odate}: ERROR table {output_table}: dataframe is empty!"
            print(f"{msg}\n{sep_print}")
        # create schema if it does not exist
        sql_target_schema = 'CREATE SCHEMA IF NOT EXISTS ' +  target_out_schema
        spark.sql(sql_target_schema)
        df_name.write\
                .mode("overwrite")\
                .option("overwriteSchema", "true")\
                .saveAsTable(output_table)
        # check if table was created successfully and has more than zero rows
        table_created_count = count_number_rows(output_table)
        if table_created_count > 0:
            msg = f"{_odate}: Table {output_table} created successfully with {table_created_count} rows."
        else:
            msg = f"{_odate}: Error: Table {output_table} is empty"
            print(f"{msg}\n{sep_print}")
            sys.exit(msg)
        print(f"{msg}\n{sep_print}")


    #Generate user access token to connect to the API
    url = "https://gerdau-scp-dev-neo-br.apimanagement.br1.hana.ondemand.com/v2/OAuthService/GenerateToken"

    payload = {'client_id': 'oCp7NYd5R9BinYAEj8ujk9ISRVnFYe5e',
               'client_secret': '4Nnucf2sGxZ0d3w7',
               'grant_type': 'client_credentials'
    }

    header = {'Content-Type': "application/x-www-form-urlencoded"}

    token_response = requests.post(url, data = payload, headers = header)
    accessToken = token_response.json() 
    mytoken = accessToken['access_token']

    today_date = datetime.today().strftime('%Y-%m-%d')
    yesterday_date = (datetime.today() - timedelta(1)).strftime('%Y-%m-%d')

    url = "https://gerdau-scp-dev-neo-br.apimanagement.br1.hana.ondemand.com/v1/get-io-nodes"

    payload = """<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:urn=\"urn:sap-com:document:sap:rfc:functions\">
   <soapenv:Header/>
   <soapenv:Body>
      <urn:BAPI_PPDSSRVAPS_GET_IO_NODES>
        <EXCLUDE_EXPORT_FLAGS>
            <NO_IO_ELEMENTS>X</NO_IO_ELEMENTS>
            <NO_OUTPUT_NODES>X</NO_OUTPUT_NODES>
            <NO_PEG_INPUT>X</NO_PEG_INPUT>
            <NO_PEG_OUTPUT>X</NO_PEG_OUTPUT>
            <NO_CFG_DATA>X</NO_CFG_DATA>
            <NO_DESCRIPTIVE_CHARS>X</NO_DESCRIPTIVE_CHARS>
            <NO_DUPLICATES>X</NO_DUPLICATES>
         </EXCLUDE_EXPORT_FLAGS>
         <INPUT_NODES>
            <item>
               <ORDID></ORDID>
               <POSITION_NO></POSITION_NO>
               <LINE_NO></LINE_NO>
               <ACTID></ACTID>
               <PRODUCT></PRODUCT>
               <PRODUCT_BSG></PRODUCT_BSG>
               <LOCATION></LOCATION>
               <LOCTYPE></LOCTYPE>
               <LOCATION_BSG></LOCATION_BSG>
               <STORAGE_LOCATION></STORAGE_LOCATION>
               <SPEC_STOCK_IND></SPEC_STOCK_IND>
               <SPEC_STOCK_KEY></SPEC_STOCK_KEY>
               <SPEC_STOCK_EXT></SPEC_STOCK_EXT>
               <SPEC_STOCK_LOGSYS></SPEC_STOCK_LOGSYS>
               <BASE_UOM></BASE_UOM>
               <BASE_UOM_ISO></BASE_UOM_ISO>
               <ATPCAT></ATPCAT>
               <CATEGORY_TYPE></CATEGORY_TYPE>
               <CATEGORY_NAME></CATEGORY_NAME>
               <QUANTITY></QUANTITY>
               <REAL_QUANTITY></REAL_QUANTITY>
               <ORIGINAL_QUANTITY></ORIGINAL_QUANTITY>
               <CONF_QUANTITY></CONF_QUANTITY>
               <IS_MASTER_FLG></IS_MASTER_FLG>
               <IO_TIME></IO_TIME>
               <IGNORE_PEGGING></IGNORE_PEGGING>
               <EXT_FIXED></EXT_FIXED>
               <CONTI_SCHEDULING></CONTI_SCHEDULING>
               <OFFSET_VALUE></OFFSET_VALUE>
               <OFFSET_RELATION></OFFSET_RELATION>
               <OFFSET_IN_PERCENT></OFFSET_IN_PERCENT>
               <SCHEDULE_OFFSET></SCHEDULE_OFFSET>
               <IO_END_TIME></IO_END_TIME>
               <HAS_CHARACTS></HAS_CHARACTS>
               <VERSION></VERSION>
               <ATPCMPSTAT></ATPCMPSTAT>
               <POSEX></POSEX>
               <IS_RPM_FLAG></IS_RPM_FLAG>
               <IONODE_TYPE></IONODE_TYPE>
               <PARENT_POS></PARENT_POS>
            </item>
         </INPUT_NODES>
         <LOCATION_SELECTION>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1300</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1301</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1302</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1321</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1323</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1327</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1330</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1332</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1333</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>1334</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>2790</LOW>
               <HIGH>2791</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>2796</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>EQ</OPTION>
               <LOW>2812</LOW>
               <HIGH></HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>2860</LOW>
               <HIGH>2869</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>4786</LOW>
               <HIGH>4788</HIGH>
            </item>
         </LOCATION_SELECTION>
         <LOGICAL_SYSTEM>SG2CLNT056</LOGICAL_SYSTEM>
         <OBJECT_HEADERS>
            <item>
               <METHOD></METHOD>
               <PARENT_ORDID></PARENT_ORDID>
               <PARENT_ORDER_SYSTEM></PARENT_ORDER_SYSTEM>
               <PARENT_ORDER_NUMBER></PARENT_ORDER_NUMBER>
               <PARENT_ORDER_TYPE></PARENT_ORDER_TYPE>
               <ORDID></ORDID>
               <ORDER_SYSTEM></ORDER_SYSTEM>
               <ORDER_NUMBER></ORDER_NUMBER>
               <ORDER_TYPE></ORDER_TYPE>
               <INT_ORDER_TYPE></INT_ORDER_TYPE>
               <START_EFFECTIVITY></START_EFFECTIVITY>
               <END_EFFECTIVITY></END_EFFECTIVITY>
               <PLANNING_TYPE></PLANNING_TYPE>
               <PRIORITY></PRIORITY>
               <ORDER_START_DATE></ORDER_START_DATE>
               <ORDER_END_DATE></ORDER_END_DATE>
               <PROCUREMENT_TYPE></PROCUREMENT_TYPE>
               <PROD_VERSION></PROD_VERSION>
               <PRODUCABLE></PRODUCABLE>
               <INPUT_FIXED></INPUT_FIXED>
               <OUTPUT_FIXED></OUTPUT_FIXED>
               <LOCATION></LOCATION>
               <LOCTYPE></LOCTYPE>
               <LOCATION_BSG></LOCATION_BSG>
               <PLAN_NUMBER></PLAN_NUMBER>
               <PPM_NUMBER></PPM_NUMBER>
               <PPM_BSG></PPM_BSG>
               <PART_DELIVERED></PART_DELIVERED>
               <FINAL_DELIVERY></FINAL_DELIVERY>
               <STARTED></STARTED>
               <PART_CONFIRMED></PART_CONFIRMED>
               <FINAL_CONFIRMED></FINAL_CONFIRMED>
               <RELEASED></RELEASED>
               <APPLI></APPLI>
               <CREATION_TIME></CREATION_TIME>
               <EXPL_DATE></EXPL_DATE>
               <PLANNING_STATUS></PLANNING_STATUS>
               <SOURCE_TYPE></SOURCE_TYPE>
               <SOURCE_NAME></SOURCE_NAME>
            </item>
         </OBJECT_HEADERS>
         <PLANNING_VERSION>000</PLANNING_VERSION>
         <PRODUCT_SELECTION>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>000000000106000000</LOW>
               <HIGH>000000000106999999</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>000000000109000000</LOW>
               <HIGH>000000000109999999</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>000000000110000000</LOW>
               <HIGH>000000000110999999</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>000000000205000000</LOW>
               <HIGH>000000000205999999</HIGH>
            </item>
            <item>
               <SIGN>I</SIGN>
               <OPTION>BT</OPTION>
               <LOW>000000000207000000</LOW>
               <HIGH>000000000207999999</HIGH>
            </item>
         </PRODUCT_SELECTION>
         <!--Optional:-->
         <RETURN>
            <!--Zero or more repetitions:-->
            <item>
               <TYPE></TYPE>
               <ID></ID>
               <NUMBER></NUMBER>
               <MESSAGE></MESSAGE>
               <LOG_NO></LOG_NO>
               <LOG_MSG_NO></LOG_MSG_NO>
               <MESSAGE_V1></MESSAGE_V1>
               <MESSAGE_V2></MESSAGE_V2>
               <MESSAGE_V3></MESSAGE_V3>
               <MESSAGE_V4></MESSAGE_V4>
               <PARAMETER></PARAMETER>
               <ROW></ROW>
               <FIELD></FIELD>
               <SYSTEM></SYSTEM>
            </item>
         </RETURN>
      </urn:BAPI_PPDSSRVAPS_GET_IO_NODES>
   </soapenv:Body>
</soapenv:Envelope>
"""

    header = {
    'APIKey': 'oCp7NYd5R9BinYAEj8ujk9ISRVnFYe5e',
    'Content-Type': 'text/xml',
    'Authorization': 'Bearer ' + mytoken
    }

    print("Starting get-io-nodes Request...")
    start_time = time.time()

    response = requests.request("POST", url, headers=header, data=payload)

    print(f"get-io-nodes Request took {time.time() - start_time} seconds")
    print("Creating BeautifulSoup object from XML Response...")
    start_time = time.time()
    
    #convert XML response to navigable object
    soup = BeautifulSoup(response.text, 'xml')

    print(f"BeautifulSoup object created in {time.time() - start_time} seconds")
    start_time = time.time()

    #First extract INPUT_NODES table
    data = []
    input_nodes = soup.find('INPUT_NODES')

    for child in input_nodes.children:
        record = {}
        record['ORDID'] = child.find('ORDID').text
        record['POSITION_NO'] = child.find('POSITION_NO').text
        record['LINE_NO'] = child.find('LINE_NO').text
        record['ACTID'] = child.find('ACTID').text
        record['PRODUCT'] = child.find('PRODUCT').text
        record['PRODUCT_BSG'] = child.find('PRODUCT_BSG').text
        record['LOCATION'] = child.find('LOCATION').text
        record['LOCTYPE'] = child.find('LOCTYPE').text
        record['LOCATION_BSG'] = child.find('LOCATION_BSG').text
        record['STORAGE_LOCATION'] = child.find('STORAGE_LOCATION').text
        record['SPEC_STOCK_IND'] = child.find('SPEC_STOCK_IND').text
        record['SPEC_STOCK_KEY'] = child.find('SPEC_STOCK_KEY').text
        record['SPEC_STOCK_EXT'] = child.find('SPEC_STOCK_EXT').text
        record['SPEC_STOCK_LOGSYS'] = child.find('SPEC_STOCK_LOGSYS').text
        record['BASE_UOM'] = child.find('BASE_UOM').text
        record['BASE_UOM_ISO'] = child.find('BASE_UOM_ISO').text
        record['ATPCAT'] = child.find('ATPCAT').text
        record['CATEGORY_TYPE'] = child.find('CATEGORY_TYPE').text
        record['CATEGORY_NAME'] = child.find('CATEGORY_NAME').text
        record['QUANTITY'] = child.find('QUANTITY').text
        record['REAL_QUANTITY'] = child.find('REAL_QUANTITY').text
        record['ORIGINAL_QUANTITY'] = child.find('ORIGINAL_QUANTITY').text
        record['CONF_QUANTITY'] = child.find('CONF_QUANTITY').text
        record['IS_MASTER_FLG'] = child.find('IS_MASTER_FLG').text
        record['IO_TIME'] = child.find('IO_TIME').text
        record['IGNORE_PEGGING'] = child.find('IGNORE_PEGGING').text
        record['EXT_FIXED'] = child.find('EXT_FIXED').text
        record['CONTI_SCHEDULING'] = child.find('CONTI_SCHEDULING').text
        record['OFFSET_VALUE'] = child.find('OFFSET_VALUE').text
        record['OFFSET_RELATION'] = child.find('OFFSET_RELATION').text
        record['OFFSET_IN_PERCENT'] = child.find('OFFSET_IN_PERCENT').text
        record['SCHEDULE_OFFSET'] = child.find('SCHEDULE_OFFSET').text
        record['IO_END_TIME'] = child.find('IO_END_TIME').text
        record['HAS_CHARACTS'] = child.find('HAS_CHARACTS').text
        record['VERSION'] = child.find('VERSION').text
        record['ATPCMPSTAT'] = child.find('ATPCMPSTAT').text
        record['POSEX'] = child.find('POSEX').text
        record['IS_RPM_FLAG'] = child.find('IS_RPM_FLAG').text
        record['IONODE_TYPE'] = child.find('IONODE_TYPE').text
        record['PARENT_POS'] = child.find('PARENT_POS').text
        data.append(record)

    #convert array of dictionaries into a Pandas dataframe
    df = pd.DataFrame(data)
    #Convert Pandas dataframe to spark dataframe
    spark_df = spark.createDataFrame(df)

    print(f"INPUT_NODES Dataframe created in {time.time() - start_time} seconds")        
    start_time = time.time()
    
    #define the schema for the datatypes
    schema = StructType([
      StructField("ORDID",             StringType(), True),
    	StructField("POSITION_NO",        StringType(), True),
    	StructField("LINE_NO",            StringType(), True),
    	StructField("ACTID",              StringType(), True),
    	StructField("PRODUCT",            StringType(), True),
    	StructField("PRODUCT_BSG",        StringType(), True),
    	StructField("LOCATION",           StringType(), True),
    	StructField("LOCTYPE",            StringType(), True),
    	StructField("LOCATION_BSG",       StringType(), True),
    	StructField("STORAGE_LOCATION",   StringType(), True),
    	StructField("SPEC_STOCK_IND",     StringType(), True),
    	StructField("SPEC_STOCK_KEY",     StringType(), True),
    	StructField("SPEC_STOCK_EXT",     StringType(), True),
    	StructField("SPEC_STOCK_LOGSYS",  StringType(), True),
    	StructField("BASE_UOM",           StringType(), True),
    	StructField("BASE_UOM_ISO",       StringType(), True),
    	StructField("ATPCAT",             StringType(), True),
    	StructField("CATEGORY_TYPE",      StringType(), True),
    	StructField("CATEGORY_NAME",      StringType(), True),
    	StructField("QUANTITY",           DecimalType(23, 3), True),
    	StructField("REAL_QUANTITY",      DecimalType(23, 3), True),
    	StructField("ORIGINAL_QUANTITY",  DecimalType(23, 3), True),
    	StructField("CONF_QUANTITY",      DecimalType(23, 3), True),
    	StructField("IS_MASTER_FLG",      StringType(), True),
    	StructField("IO_TIME",            LongType(), True),
    	StructField("IGNORE_PEGGING",     StringType(), True),
    	StructField("EXT_FIXED",          StringType(), True),
    	StructField("CONTI_SCHEDULING",   IntegerType(), True),
    	StructField("OFFSET_VALUE",       IntegerType(), True),
    	StructField("OFFSET_RELATION",    IntegerType(), True),
    	StructField("OFFSET_IN_PERCENT",  StringType(), True),
    	StructField("SCHEDULE_OFFSET",    StringType(), True),
    	StructField("IO_END_TIME",        LongType(), True),
    	StructField("HAS_CHARACTS",       IntegerType(), True),
    	StructField("VERSION",            StringType(), True),
    	StructField("ATPCMPSTAT",         IntegerType(), True),
    	StructField("POSEX",              StringType(), True),
    	StructField("IS_RPM_FLAG",        StringType(), True),
    	StructField("IONODE_TYPE",        IntegerType(), True),
    	StructField("PARENT_POS",         StringType(), True)
    ])

    for field in schema.fields:
        spark_df = spark_df.withColumn(field.name, col(field.name).cast(field.dataType))

    default_out_catalog = "corporate_glndatawarehouse_refined"
    default_out_schema = "robinhood_staging"
    default_out_table = 'tb_input_nodes'

    #create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table,False)
    create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table)

    print(f"INPUT_NODES table write completed in {time.time() - start_time} seconds")
    start_time = time.time()

    #Second extract OBJECT_HEADERS table
    data = []
    object_headers = soup.find('OBJECT_HEADERS')

    for child in object_headers.children:
        record = {}
        record['method'] = child.find('METHOD').text
        record['parent_ordid'] = child.find('PARENT_ORDID').text
        record['parent_order_system'] = child.find('PARENT_ORDER_SYSTEM').text
        record['parent_order_number'] = child.find('PARENT_ORDER_NUMBER').text
        record['parent_order_type'] = child.find('PARENT_ORDER_TYPE').text
        record['ordid'] = child.find('ORDID').text
        record['order_system'] = child.find('ORDER_SYSTEM').text
        record['order_number'] = child.find('ORDER_NUMBER').text
        record['order_type'] = child.find('ORDER_TYPE').text
        record['int_order_type'] = child.find('INT_ORDER_TYPE').text
        record['start_effectivity'] = child.find('START_EFFECTIVITY').text
        record['end_effectivity'] = child.find('END_EFFECTIVITY').text
        record['planning_type'] = child.find('PLANNING_TYPE').text
        record['priority'] = child.find('PRIORITY').text
        record['order_start_date'] = child.find('ORDER_START_DATE').text
        record['order_end_date'] = child.find('ORDER_END_DATE').text
        record['procurement_type'] = child.find('PROCUREMENT_TYPE').text
        record['prod_version'] = child.find('PROD_VERSION').text
        record['producable'] = child.find('PRODUCABLE').text
        record['input_fixed'] = child.find('INPUT_FIXED').text
        record['output_fixed'] = child.find('OUTPUT_FIXED').text
        record['location'] = child.find('LOCATION').text
        record['loctype'] = child.find('LOCTYPE').text
        record['location_bsg'] = child.find('LOCATION_BSG').text
        record['plan_number'] = child.find('PLAN_NUMBER').text
        record['ppm_number'] = child.find('PPM_NUMBER').text
        record['ppm_bsg'] = child.find('PPM_BSG').text
        record['part_delivered'] = child.find('PART_DELIVERED').text
        record['final_delivery'] = child.find('FINAL_DELIVERY').text
        record['started'] = child.find('STARTED').text
        record['part_confirmed'] = child.find('PART_CONFIRMED').text
        record['final_confirmed'] = child.find('FINAL_CONFIRMED').text
        record['released'] = child.find('RELEASED').text
        record['appli'] = child.find('APPLI').text
        record['creation_time'] = child.find('CREATION_TIME').text
        record['expl_date'] = child.find('EXPL_DATE').text
        record['planning_status'] = child.find('PLANNING_STATUS').text
        record['source_type'] = child.find('SOURCE_TYPE').text
        record['source_name'] = child.find('SOURCE_NAME').text
        data.append(record)

    #convert array of dictionaries into a Pandas dataframe
    df = pd.DataFrame(data)
    #Convert Pandas dataframe to spark dataframe
    spark_df = spark.createDataFrame(df)

    print(f"OBJECT_HEADERS Dataframe created in {time.time() - start_time} seconds")        
    start_time = time.time()
    
    #define the schema for the datatypes
    schema = StructType([
        StructField('method',              StringType(), True),
        StructField('parent_ordid',        StringType(), True),
        StructField('parent_order_system', StringType(), True),
        StructField('parent_order_number', StringType(), True),
        StructField('parent_order_type',   StringType(), True),
        StructField('ordid',               StringType(), True),
        StructField('order_system',        StringType(), True),
        StructField('order_number',        StringType(), True),
        StructField('order_type',          StringType(), True),
        StructField('int_order_type',      StringType(), True),
        StructField('start_effectivity',   StringType(), True),
        StructField('end_effectivity',     StringType(), True),
        StructField('planning_type',       StringType(), True),
        StructField('priority',            StringType(), True),
        StructField('order_start_date',    StringType(), True),
        StructField('order_end_date',      StringType(), True),
        StructField('procurement_type',    StringType(), True),
        StructField('prod_version',        StringType(), True),
        StructField('producable',          StringType(), True),
        StructField('input_fixed',         StringType(), True),
        StructField('output_fixed',        StringType(), True),
        StructField('location',            StringType(), True),
        StructField('loctype',             StringType(), True),
        StructField('location_bsg',        StringType(), True),
        StructField('plan_number',         StringType(), True),
        StructField('ppm_number',          StringType(), True),
        StructField('ppm_bsg',             StringType(), True),
        StructField('part_delivered',      StringType(), True),
        StructField('final_delivery',      StringType(), True),
        StructField('started',             StringType(), True),
        StructField('part_confirmed',      StringType(), True),
        StructField('final_confirmed',     StringType(), True),
        StructField('released',            StringType(), True),
        StructField('appli',               StringType(), True),
        StructField('creation_time',       StringType(), True),
        StructField('expl_date',           StringType(), True),
        StructField('planning_status',     StringType(), True),
        StructField('source_type',         StringType(), True),
        StructField('source_name',         StringType(), True)
    ])

    for field in schema.fields:
        spark_df = spark_df.withColumn(field.name, col(field.name).cast(field.dataType))

    default_out_table = 'tb_object_headers'
    create_table(spark_df,_odate,default_out_catalog,env,default_out_schema,default_out_table)

    print(f"OBJECT_HEADERS table write completed in {time.time() - start_time} seconds")
    start_time = time.time()

 

MariuszK
Contributor III

I can see here a few points to improve:

- Spark can process xml for instance: https://docs.databricks.com/aws/en/query/formats/xml

- You can convert the data list to DataFrame (skip conversion to Pandas) and provide the schema as second parameter so you don't need to iterate throw columns.

- remove secrete keys from the code.

philHarasz
New Contributor III

After reading the suggested documentation, I tried using the "Parse nested XML (from_xml and schema_of_xml)". I used this code from the doc:

 

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()

 

and it worked great. But I don't understand what the number 8 signifies in that first argument to the createDataFrame command. I replaced it with different numbers while experimenting and it didn't seem to matter what number I put in there, so I left it as a 1. Once I saw the printed schema, I was then able to code a pyspark command to pull out the columns of a table. But I ended up with a one row dataframe where each column contained an array of all 100K values. So then I added an explode in front of each column, and that appeared to give me a dataframe that looked like a proper table. But when I tried to write it to a delta table, I got "The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.". So I removed the write and instead just put in this command, to see if I had a dataframe with any sensible data in it:

display(df.show(5))
I have now been waiting 53 minutes to see the first five rows of my dataframe. Nope. Still don't have a solution.