cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER]

JW_99
New Contributor II

I've troubleshot this like 20+ times. I am aware that the current code is causing the spark session to be passed to the workers, where it should only be applied to the driver. Can someone please help me resolve this (the schema is defined earlier)?

---------------------------------------------------------------------

from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql import functions as F
from azure.storage.blob import ContainerClient
from pyspark.sql.types import *
from delta.tables import DeltaTable
import os

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, DoubleType, BooleanType, FloatType, LongType
)

# Define schemas
schemas = {
    "AccountingDetail.JSON"StructType([
        StructField("JobID"IntegerType(), True),
        StructField("Invoices"ArrayType(StructType([
            StructField("InvoiceID"IntegerType(), True),
            StructField("ExternalID"StringType(), True),
            StructField("InvoiceNumber"StringType(), True),
            StructField("DateAdded"TimestampType(), True),
            StructField("DateLastUpdate"TimestampType(), True),
            StructField("Memo"StringType(), True),
            StructField("DateInvoiced"TimestampType(), True),
            StructField("BillTo"StringType(), True),
            StructField("InvoiceClass"StringType(), True),
            StructField("Amount"DoubleType(), True),
            StructField("Tax"DoubleType(), True),
            StructField("Source"StringType(), True),
            StructField("Status"StringType(), True),
            StructField("InvoiceBalance"DoubleType(), True),
            StructField("InvoiceLineItems"ArrayType(StructType([
                StructField("InvoiceLineItemID"IntegerType(), True),
                StructField("InvoiceID"IntegerType(), True),
                StructField("LineDate"TimestampType(), True),
                StructField("Room"StringType(), True),
                StructField("CategoryID"StringType(), True),
                StructField("Details"StringType(), True),
                StructField("ItemDescription"StringType(), True),
                StructField("UnitQuantity"IntegerType(), True),
                StructField("UnitOfMeasure"StringType(), True),
                StructField("Quantity"DoubleType(), True),
                StructField("LaborRate"DoubleType(), True),
                StructField("MaterialRate"DoubleType(), True),
                StructField("EquipmentRate"DoubleType(), True),
                StructField("Adjustment"DoubleType(), True),
                StructField("Overhead"DoubleType(), True),
                StructField("Profit"DoubleType(), True),
                StructField("Rate"DoubleType(), True),
                StructField("ExtendedAmount"DoubleType(), True),
                StructField("AddedDate"TimestampType(), True),
                StructField("UpdatedDate"TimestampType(), True),
                StructField("TimeModified"TimestampType(), True),
                StructField("Status"StringType(), True)
            ])), True),
            StructField("AdditionalInfoAmount"DoubleType(), True),
            StructField("InvoiceType"StringType(), True)
        ])), True),
        StructField("Payments"ArrayType(StructType([
            StructField("PaymentID"IntegerType(), True),
            StructField("InvoiceID"IntegerType(), True),
            StructField("DateAdded"TimestampType(), True),
            StructField("DateLastUpdate"TimestampType(), True),
            StructField("Mode"StringType(), True),
            StructField("DatePaid"TimestampType(), True),
            StructField("ReferenceNumber"StringType(), True),
            StructField("Amount"DoubleType(), True),
            StructField("Memo"StringType(), True),
            StructField("Source"StringType(), True),
            StructField("IsPaid"BooleanType(), True),
            StructField("Status"StringType(), True),
            StructField("DiscountAmount"DoubleType(), True)
        ])), True),
        StructField("JobCostings"ArrayType(StructType([
            StructField("JobCostingID"IntegerType(), True),
            StructField("Date"TimestampType(), True),
            StructField("LaborHours"DoubleType(), True),
            StructField("JobCostTypeCategory"StringType(), True),
            StructField("Quantity"DoubleType(), True),
            StructField("UOM"StringType(), True),
            StructField("Rate"DoubleType(), True),
            StructField("Extended"DoubleType(), True),
            StructField("Billable"BooleanType(), True),
            StructField("PaymentTo"StringType(), True),
            StructField("Description"StringType(), True),
            StructField("TxnType"StringType(), True),
            StructField("TimeModified"TimestampType(), True),
            StructField("TransferFrom"StringType(), True),
            StructField("Status"StringType(), True),
            StructField("AddedBy"StringType(), True),
            StructField("PO"StringType(), True),
            StructField("Item"StringType(), True),
            StructField("Service"StringType(), True),
            StructField("Memo"StringType(), True),
            StructField("BillTxnId"StringType(), True),
            StructField("CalculatedBurden"DoubleType(), True),
            StructField("LinkedVendorPayments"ArrayType(StructType([
                StructField("VendorPaymentID"LongType(), False),
                StructField("AmountPaid"DoubleType(), True),
                StructField("DatePaid"StringType(), True),
                StructField("CheckNumber"StringType(), True)
            ])), False)
        ])), True)
    ]),
    "AccountingSummary.JSON"StructType([
    StructField("IsForcedSync"BooleanType(), False),  
    StructField("MD5HashAsHexString"StringType(), True),  
    StructField("ActualGrossProfit"DoubleType(), True),  
    StructField("ActualGrossProfitPercentage"DoubleType(), True),  
    StructField("AdjustedInvoiceSubtotal"DoubleType(), True),  
    StructField("BalanceOwing"DoubleType(), True),  
    StructField("ChangeOrderAmount"DoubleType(), True),  
    StructField("CollectedSubtotal"DoubleType(), True),  
    StructField("ConsumablesCost"DoubleType(), True),  
    StructField("EquipmentCost"DoubleType(), True),  
    StructField("EstimateGrossProfitAmountAfterWOAdjustment"DoubleType(), True),
    StructField("EstimateGrossProfitPercentageAfterWOAdjustment"DoubleType(), True),  
    StructField("EstimatedGrossProfitAmountFromEstimateImport"DoubleType(), True),  
    StructField("EstimatedGrossProfitPercentageFromEstimateImport"DoubleType(), True),  
    StructField("EstimateGrossProfit"DoubleType(), True),  
    StructField("EstimateUninvoicedAmount"DoubleType(), True),  
    StructField("EstimateUnpaid"DoubleType(), True),  
    StructField("GrossProfitPercentage"DoubleType(), True),  
    StructField("InitialEstimate"DoubleType(), True),  
    StructField("InvoiceSubtotal"DoubleType(), True),  
    StructField("TotalInvoiced"DoubleType(), True),  
    StructField("LaborCost"DoubleType(), True),  
    StructField("MaterialsCost"DoubleType(), True),  
    StructField("OriginalEstimate"DoubleType(), True),  
    StructField("OtherCost"DoubleType(), True),  
    StructField("ProfessionalFee"DoubleType(), True),  
    StructField("RecognizedRevenue"DoubleType(), True),  
    StructField("ReferralFeeCost"DoubleType(), True),  
    StructField("SubtradeCost"DoubleType(), True),  
    StructField("SupplementEstimate"DoubleType(), True),  
    StructField("TotalCollected"DoubleType(), True),  
    StructField("TotalEstimates"DoubleType(), True),  
    StructField("TotalJobCost"DoubleType(), True),
    StructField("TotalWorkOrderBudget"DoubleType(), True),
    StructField("WarrantyCost"DoubleType(), True),  
    StructField("EstimateDepreciation"DoubleType(), True),  
    StructField("JobId"LongType(), False)
]),
    "CompanyContacts.JSON"StructType([
        StructField("CompanyID"IntegerType(), True),
        StructField("Contacts"ArrayType(
        StructType([
            StructField("ContactID"IntegerType(), True),
            StructField("CorrespondenceEmail"StringType(), True),
            StructField("InquiryEmail"StringType(), True),
            StructField("Website"StringType(), True),

            # Addresses
            StructField("Address"StructType([
                StructField("AddressLine1"StringType(), True),
                StructField("AddressLine2"StringType(), True),
                StructField("City"StringType(), True),
                StructField("StateProvince"StringType(), True),
                StructField("Country"StringType(), True),
                StructField("PostalCode"StringType(), True),
                StructField("County"StringType(), True)
            ]), True),
            StructField("BillingAddress"StructType([
                StructField("AddressLine1"StringType(), True),
                StructField("AddressLine2"StringType(), True),
                StructField("City"StringType(), True),
                StructField("StateProvince"StringType(), True),
                StructField("Country"StringType(), True),
                StructField("PostalCode"StringType(), True),
                StructField("County"StringType(), True)
            ]), True),
            StructField("MailingAddress"StructType([
                StructField("AddressLine1"StringType(), True),
                StructField("AddressLine2"StringType(), True),
                StructField("City"StringType(), True),
                StructField("StateProvince"StringType(), True),
                StructField("Country"StringType(), True),
                StructField("PostalCode"StringType(), True),
                StructField("County"StringType(), True)
            ]), True),

            # Main Phone
            StructField("MainPhone"StructType([
                StructField("Number"StringType(), True),
                StructField("Extension"StringType(), True)
            ]), True),

            # Other Phones
            StructField("OtherPhones"ArrayType(
                StructType([
                    StructField("Category"StringType(), True),
                    StructField("PhoneNumbers"ArrayType(
                        StructType([
                            StructField("Number"StringType(), True),
                            StructField("Extension"StringType(), True)
                        ])
                    ), True)
                ])
            ), True)
        ])
    ), True)
]),
    "IndividualContacts.JSON"StructType([
    StructField("IndividualID"IntegerType(), True),
    StructField("Contacts"ArrayType(StructType([
        StructField("ContactID"IntegerType(), True),
        StructField("Email"StringType(), True),
        StructField("SecondaryEmail"StringType(), True),
        StructField("MainPhone"StructType([
            StructField("Number"StringType(), True),
            StructField("Extension"StringType(), True)
        ]), True),  # Nullable struct
        StructField("OtherPhones"ArrayType(StructType([
            StructField("Category"StringType(), True),
            StructField("PhoneNumbers"ArrayType(StructType([
                StructField("Number"StringType(), True),
                StructField("Extension"StringType(), True)
            ]), True), True)  
        ])), False),  
        StructField("Address"StructType([
            StructField("AddressLine1"StringType(), True),
            StructField("AddressLine2"StringType(), True),
            StructField("City"StringType(), True),
            StructField("StateProvince"StringType(), True),
            StructField("Country"StringType(), True),
            StructField("PostalCode"StringType(), True),
            StructField("County"StringType(), True)
        ]), False)  
    ])), False)
]),
     "IndividualDetail.JSON" : StructType([
    StructField("IndividualID"IntegerType(), True),
    StructField("IsActive"BooleanType(), True),
    StructField("IndividualDetailInfo"StructType([
        StructField("Anniversary"StringType(), True),
        StructField("Assistant"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("Categories"ArrayType(StringType(), True), True),
        StructField("CompanyID"IntegerType(), True),
        StructField("ContactType"StringType(), True),
        StructField("DateOfBirth"StringType(), True),
        StructField("FranchiseeId"IntegerType(), True),
        StructField("GroupAndRoutes"StringType(), True),
        StructField("JobTitle"StringType(), True),
        StructField("MarketingCampaigns"ArrayType(StringType(), True), True),
        StructField("Name"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("ParentCompany"StringType(), True),
        StructField("PotentialReferralRevenue"StringType(), True),
        StructField("PotentialReferralVolume"StringType(), True),
        StructField("Rank"StringType(), True),
        StructField("ReferralType"StringType(), True),
        StructField("ReferredBy"StringType(), True),
        StructField("ResponsibleRep"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("SageAccountNumber"StringType(), True),
        StructField("SalesStage"StringType(), True),
        StructField("SalesStatus"StringType(), True),
        StructField("Title"StringType(), True)
    ]), True)
]),
     "LocationScores.JSON" : StructType([
    StructField("LocationID"LongType(), False),  
    StructField("CloseRatio"DoubleType(), True),
    StructField("NpsScore"IntegerType(), True),  
    StructField("Vpasses"ArrayType(StructType([
        StructField("Score"IntegerType(), False),  
        StructField("IsAdminScore"BooleanType(), False)  
    ])), False)  
]),
     "ProviderScores.JSON" : StructType([
    StructField("ProviderID"IntegerType(), False),  
    StructField("Vpasses"ArrayType(StructType([
        StructField("Score"IntegerType(), True),
        StructField("IsAdminScore"BooleanType(), True)
    ])), False),  
    StructField("NpsScore"IntegerType(), True)
]),
    "JobDetail.JSON" : StructType([
    StructField("JobID"IntegerType(), True),
    StructField("FranchiseeInfo"StructType([
        StructField("FranchiseeID"IntegerType(), True),
        StructField("DashCompanyID"IntegerType(), True),
        StructField("Name"StringType(), True),
        StructField("ExternalEnterpriseID"StringType(), True)
    ]), True),
    StructField("ClaimInfo"StructType([
        StructField("ClaimID"IntegerType(), True),
        StructField("ReferenceID"StringType(), True),
        StructField("TypeOfLoss"StringType(), True),
        StructField("LossCategory"StringType(), True),
        StructField("ProviderLossCategory"StringType(), True),
        StructField("ClientName"StringType(), True),
        StructField("DateOfLoss"StringType(), True),
        StructField("DateReceived"StringType(), True),
        StructField("CatReferenceNumber"StringType(), True),
        StructField("PolicyLimits"StructType([
            StructField("DwellingAmount"DoubleType(), True),
            StructField("ContentsAmount"DoubleType(), True),
            StructField("OtherStructuresAmount"DoubleType(), True)
        ]), True),
        StructField("ClientID"IntegerType(), True),
        StructField("Caller"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("ClaimEnteredBy"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("PreferredProviderName"StringType(), True),
        StructField("PreferredProviderLocationName"StringType(), True),
        StructField("IsProviderCreatedClaim"BooleanType(), True),
        StructField("ReportedBy"StringType(), True),
        StructField("CodeRedID"StringType(), True),
        StructField("ClaimCustomer"StructType([
            StructField("ClaimCustomerType"IntegerType(), True),
            StructField("BillingAddress"StructType([
                StructField("AddressLine1"StringType(), True),
                StructField("AddressLine2"StringType(), True),
                StructField("City"StringType(), True),
                StructField("StateProvince"StringType(), True),
                StructField("PostalCode"StringType(), True),
                StructField("Country"StringType(), True)
            ]), True),
            StructField("CustomerName"StringType(), True),
            StructField("IndividualDetails"StructType([
                StructField("IndividualID"IntegerType(), True),
                StructField("ContactsDetails"ArrayType(StructType([
                    StructField("Email"StringType(), True),
                    StructField("MainPhone"StructType([
                        StructField("Number"StringType(), True),
                        StructField("Extension"StringType(), True)
                    ]), True)
                ])), True),
                StructField("PersonName"StructType([
                    StructField("FirstName"StringType(), True),
                    StructField("LastName"StringType(), True)
                ]), True)
            ]), True),
            StructField("CompanyDetails"StructType([
                StructField("CompanyName"StringType(), True),
                StructField("ContactsDetails"ArrayType(StructType([
                    StructField("Email"StringType(), True),
                    StructField("MainPhone"StructType([
                        StructField("Number"StringType(), True),
                        StructField("Extension"StringType(), True)
                    ]), True)
                ])), True)
            ]), True)
        ]), True)
    ]), True),
    StructField("JobInfo"StructType([
        StructField("PolicyHolderType"StringType(), True),
        StructField("Division"StringType(), True),
        StructField("ProviderReasonForClosing"StringType(), True),
        StructField("IsAdminJob"BooleanType(), True),
        StructField("DateAdded"StringType(), True),
        StructField("DateLastUpdate"StringType(), True),
        StructField("JobNumber"StringType(), True),
        StructField("JobName"StringType(), True),
        StructField("Status"StringType(), True),
        StructField("IsClosed"BooleanType(), True),
        StructField("IsOnHold"BooleanType(), True),
        StructField("EnvironmentalCode"StringType(), True),
        StructField("ReceivedByFullName"StringType(), True),
        StructField("Priority"StringType(), True),
        StructField("LossDescription"StringType(), True),
        StructField("SpecialInstructions"StringType(), True),
        StructField("InitialFindings"StringType(), True),
        StructField("RoomsAffected"ArrayType(StringType()), True),
        StructField("AssociatedJobID"IntegerType(), True),
        StructField("ProviderCatastropheName"StringType(), True),
        StructField("ProviderDivisionType"StringType(), True),
        StructField("EnvironmentalCodeDescription"StringType(), True),
        StructField("JobCompletionPercentage"DoubleType(), True),
        StructField("MasterBuilderNumber"IntegerType(), True),
        StructField("ReferralFeeDatePaid"StringType(), True),
        StructField("RegionalAreaManager"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("WaterJobCat"StringType(), True),
        StructField("WaterJobClass"StringType(), True),
        StructField("JobSource"StringType(), True),
        StructField("LocationID"IntegerType(), True)
    ]), True),
    StructField("AdminInfo"StructType([
        StructField("AdminJobNumber"StringType(), True),
        StructField("AdminDivision"StringType(), True),
        StructField("AdminReasonForClosing"StringType(), True),
        StructField("Program"StructType([
            StructField("ProgramID"IntegerType(), True),
            StructField("ProgramName"StringType(), True),
            StructField("NationalAccountDirector"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), True)
        ]), True),
        StructField("ReasonForClosingCategoryName"StringType(), True),
        StructField("IsEmergencyServiceRequired"BooleanType(), True),
        StructField("LicenseNumber"StringType(), True),
        StructField("AdminInsuranceCarrier"StringType(), True),
        StructField("AdminInsuranceCarrierID"IntegerType(), True),
        StructField("AdminJobStatus"StringType(), True),
        StructField("JobLeadID"StringType(), True),
        StructField("Services"ArrayType(StringType()), True)
    ]), True),
    StructField("PolicyClaimInformation"StructType([
        StructField("ClaimNumber"StringType(), True),
        StructField("PolicyNumber"StringType(), True),
        StructField("DatePolicyStart"StringType(), True),
        StructField("DatePolicyExpiration"StringType(), True),
        StructField("YearBuilt"StringType(), True),
        StructField("ExternalFileNumber"StringType(), True),
        StructField("LossType"StringType(), True),
        StructField("SecondaryLossType"StringType(), True),
        StructField("ReportedBy"StringType(), True),
        StructField("ReferredByFullName"StringType(), True),
        StructField("SourceOfLoss"StringType(), True),
        StructField("JobSize"StringType(), True),
        StructField("LossCategory"StringType(), True),
        StructField("ProviderLossCategory"StringType(), True)
    ]), True),
    StructField("PaymentServices"StructType([
        StructField("DeductibleAmount"DoubleType(), True),
        StructField("CollectWhen"StringType(), True),
        StructField("DateLienRights"StringType(), True),
        StructField("DateLienLiened"StringType(), True),
        StructField("DateLienReleased"StringType(), True),
        StructField("MbJobNumber"StringType(), True)
    ]), True),
    StructField("LossContactInfo"StructType([
        StructField("ContactPerson"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("Address"StructType([
            StructField("AddressLine1"StringType(), True),
            StructField("AddressLine2"StringType(), True),
            StructField("City"StringType(), True),
            StructField("StateProvince"StringType(), True),
            StructField("PostalCode"StringType(), True),
            StructField("Country"StringType(), True)
        ]), True),
        StructField("MainPhone"StructType([
            StructField("Number"StringType(), True),
            StructField("Extension"StringType(), True)
        ]), True)
    ]), True),
    StructField("Customer"StructType([
        StructField("JobPolicyHolderType"IntegerType(), True),
        StructField("IndividualDetails"StructType([
            StructField("IndividualID"IntegerType(), True),
            StructField("Title"StringType(), True),
            StructField("PersonName"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), True)
        ]), True),
        StructField("CompanyDetails"StructType([
            StructField("CompanyName"StringType(), True),
            StructField("ContactsDetails"ArrayType(StructType([
                StructField("Email"StringType(), True),
                StructField("MainPhone"StructType([
                    StructField("Number"StringType(), True),
                    StructField("Extension"StringType(), True)
                ]), True)
            ])), True)
        ]), True)
    ]), True),
    StructField("JobReferralDetails"StructType([
        StructField("ReferralCategory"IntegerType(), True),
        StructField("ReferralType"StringType(), True),
        StructField("SourceIndividual"StructType([
            StructField("PersonName"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), True)
        ]), True),
        StructField("SourceCompany"StructType([
            StructField("CompanyName"StringType(), True)
        ]), True),
        StructField("SourceEmployee"StructType([
            StructField("PersonName"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), True)
        ]), True),
        StructField("SourceMarketingCampaign"StructType([
            StructField("CampaignName"StringType(), True)
        ]), True),
        StructField("SalesStage"StringType(), True),
        StructField("SalesStatus"StringType(), True)
    ]), True)
]),
     "CompanyDetail.JSON"StructType([
    StructField("CompanyID"LongType(), False),  
    StructField("IsActive"BooleanType(), False),  
    StructField("CompanyInfo"StructType([
        StructField("Categories"ArrayType(StringType()), True),  
        StructField("FranchiseeID"LongType(), True),  
        StructField("GroupAndRoutes"StringType(), True),
        StructField("MarketingCampaigns"ArrayType(StringType()), True),  
        StructField("Name"StringType(), True),
        StructField("ParentCompanyID"LongType(), True),  
        StructField("Rank"StringType(), True),
        StructField("ReferralType"StringType(), True),
        StructField("ReferredBy"StringType(), True),  
        StructField("ResponsibleRep"StructType([  
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("SageAccountNumber"StringType(), True),
        StructField("SalesStage"StringType(), True),
        StructField("SalesStatus"StringType(), True),
        StructField("Type"StringType(), True)
    ]), True)  
]),
     "EmployeeDetail.JSON"StructType([
    StructField("IsActive"BooleanType(), True),
    StructField("EmployeeInfo"StructType([
        StructField("FranchiseeID"IntegerType(), True),
        StructField("Title"StringType(), True),
        StructField("FirstName"StringType(), True),
        StructField("LastName"StringType(), True),
        StructField("DateOfBirth"StringType(), True),
        StructField("LocationName"StringType(), True)
    ]), True),
    StructField("EmployeeID"IntegerType(), True)
]),
     "InternalParticipants.JSON"StructType([
    StructField("JobID"LongType(), False),
    StructField("Participants"ArrayType(
        StructType([
            StructField("Type"StringType(), True),
            StructField("PersonName"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), False),
            StructField("ID"LongType(), False)
        ])
    ), False)
]),
    "JobExternalParticipants.JSON"StructType([
    StructField("JobID"IntegerType(), False),
    StructField("CompanyRelations"ArrayType(StructType([
        StructField("RelationshipType"IntegerType(), False),
        StructField("RelationshipTypeName"StringType(), True),
        StructField("CompanyDetails"StructType([
            StructField("CompanyID"IntegerType(), True),
            StructField("CompanyName"StringType(), True),
            StructField("MarketingRank"StringType(), True),
            StructField("ContactsDetails"ArrayType(StructType([
                StructField("Email"StringType(), True),
                StructField("Website"StringType(), True),
                StructField("Address"StructType([
                    StructField("AddressLine1"StringType(), True),
                    StructField("AddressLine2"StringType(), True),
                    StructField("City"StringType(), True),
                    StructField("StateProvince"StringType(), True),
                    StructField("PostalCode"StringType(), True),
                    StructField("Country"StringType(), True),
                    StructField("County"StringType(), True)
                ]), False),
                StructField("BillingAddress"StructType([
                    StructField("AddressLine1"StringType(), True),
                    StructField("AddressLine2"StringType(), True),
                    StructField("City"StringType(), True),
                    StructField("StateProvince"StringType(), True),
                    StructField("PostalCode"StringType(), True),
                    StructField("Country"StringType(), True),
                    StructField("County"StringType(), True)
                ]), True),
                StructField("MailingAddress"StructType([
                    StructField("AddressLine1"StringType(), True),
                    StructField("AddressLine2"StringType(), True),
                    StructField("City"StringType(), True),
                    StructField("StateProvince"StringType(), True),
                    StructField("PostalCode"StringType(), True),
                    StructField("Country"StringType(), True),
                    StructField("County"StringType(), True)
                ]), True),
                StructField("MainPhone"StructType([
                    StructField("Number"StringType(), True),
                    StructField("Extension"StringType(), True)
                ]), True),
                StructField("OtherPhones"ArrayType(StructType([
                    StructField("Category"StringType(), True),
                    StructField("PhoneNumbers"ArrayType(StructType([
                        StructField("Number"StringType(), True),
                        StructField("Extension"StringType(), True)
                    ])), True)
                ])), True)
            ])), False)
        ]), False)
    ])), False),
    StructField("IndividualRelations"ArrayType(StructType([
        StructField("RelationshipType"IntegerType(), True),
        StructField("RelationshipTypeName"StringType(), True),
        StructField("IndividualDetails"StructType([
            StructField("IndividualID"IntegerType(), True),
            StructField("Title"StringType(), True),
            StructField("CompanyName"StringType(), True),
            StructField("PreferredCommunicationMethod"StringType(), True),
            StructField("MarketingRank"StringType(), True),
            StructField("ContactsDetails"ArrayType(StructType([
                StructField("Email"StringType(), True),
                StructField("Address"StructType([
                    StructField("AddressLine1"StringType(), True),
                    StructField("AddressLine2"StringType(), True),
                    StructField("City"StringType(), True),
                    StructField("StateProvince"StringType(), True),
                    StructField("PostalCode"StringType(), True),
                    StructField("Country"StringType(), True),
                    StructField("County"StringType(), True)
                ]), True),
                StructField("MainPhone"StructType([
                    StructField("Number"StringType(), True),
                    StructField("Extension"StringType(), True)
                ]), True),
                StructField("OtherPhones"ArrayType(StructType([
                    StructField("Category"StringType(), True),
                    StructField("PhoneNumbers"ArrayType(StructType([
                        StructField("Number"StringType(), True),
                        StructField("Extension"StringType(), True)
                    ])), True)
                ])), True)
            ])), False),
            StructField("PersonName"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), False)
        ]), False)
    ])), False),
    StructField("BillToType"IntegerType(), True)
]),
    "JobDates.JSON" : StructType([
    StructField("JobID"LongType(), False),
    StructField("Dates"ArrayType(
        StructType([
            StructField("DateTypeID"IntegerType(), False),
            StructField("DateName"StringType(), True),
            StructField("Value"TimestampType(), False),
            StructField("AuditDetails"StructType([
                StructField("EnteredBy"StructType([
                    StructField("FirstName"StringType(), True),
                    StructField("LastName"StringType(), True)
                ]), True),
                StructField("WhenEntered"TimestampType(), True)
            ]), True)
        ])
    ), False)
]),
    "LinkedAssignmentDetails.JSON"StructType([
    StructField("JobID"LongType(), False),
    StructField("LinkedExternalAssignments"ArrayType(
        StructType([
            StructField("ID"IntegerType(), False),
            StructField("ExternalSystemType"IntegerType(), False),
            StructField("ExternalID"StringType(), True)
        ])
    ), False)
]),
    "NotesSummary.JSON"StructType([
    StructField("JobID"LongType(), False),
    StructField("LastNoteAdded"StringType(), True),
    StructField("LastNoteAddedTime"StringType(), True)
]),
    "SurveySummary.JSON"StructType([
    StructField("JobID"LongType(), False),
    StructField("MostRecentSurveyDate"StringType(), True),
    StructField("SurveyDetails"ArrayType(
        StructType([
            StructField("SurveyScore"StringType(), True),
            StructField("SurveyStatus"IntegerType(), True),
            StructField("IsIncludedForSurvey"BooleanType(), False),
            StructField("IsSendTriggerCompleted"BooleanType(), False),
            StructField("SurveyScoreValue"IntegerType(), True),
            StructField("SurveyScoreLabel"StringType(), True),
            StructField("NoSurveyReason"StringType(), True)
        ])
    ), False)
]),
    "EmployeeContacts.JSON"StructType([
    StructField("EmployeeID"IntegerType(), True),
    StructField("Contacts"ArrayType(
        StructType([
            StructField("ContactID"IntegerType(), True),
            StructField("Address"StructType([
                StructField("AddressLine1"StringType(), True),
                StructField("AddressLine2"StringType(), True),
                StructField("City"StringType(), True),
                StructField("StateProvince"StringType(), True),
                StructField("Country"StringType(), True),
                StructField("PostalCode"StringType(), True),
                StructField("County"StringType(), True)
            ]), True),
            StructField("Phones"ArrayType(StringType()), True),
            StructField("Email"StringType(), True)
        ])
    ), True)
]),
    "LocationDetail.JSON"StructType([
    StructField("LocationID"LongType(), False),
    StructField("ProviderID"LongType(), True),  
    StructField("LocationType"StringType(), True),
    StructField("Name"StringType(), True),
    StructField("UFOC"StringType(), True),
    StructField("ExternalEnterpriseLocationID"LongType(), True),  
    StructField("MainFax"StringType(), True),
    StructField("Address"StructType([
        StructField("AddressLine1"StringType(), True),
        StructField("AddressLine2"StringType(), True),
        StructField("City"StringType(), True),
        StructField("StateProvince"StringType(), True),
        StructField("Country"StringType(), True),
        StructField("PostalCode"StringType(), True),
        StructField("County"StringType(), True)
    ]), False),
    StructField("MainPhone"StructType([
        StructField("Number"StringType(), True),
        StructField("Extension"StringType(), True)
    ]), True),
    StructField("EdiAddresses"ArrayType(StructType([
        StructField("AddressType"StringType(), True),
        StructField("AddressValue"StringType(), True)
    ])), False),
    StructField("PrimaryContact"StructType([
        StructField("Name"StructType([
            StructField("FirstName"StringType(), True),
            StructField("LastName"StringType(), True)
        ]), True),
        StructField("Email"StringType(), True),
        StructField("Phone"StructType([
            StructField("Number"StringType(), True),
            StructField("Extension"StringType(), True)
        ]), True)
    ]), False),
    StructField("BusinessDevelopmentManagerName"StructType([
        StructField("FirstName"StringType(), True),
        StructField("LastName"StringType(), True)
    ]), False)
]),
    "ProviderDetail.JSON"StructType([
    StructField("ProviderID"IntegerType(), False),  
    StructField("FranchisorID"IntegerType(), True),
    StructField("Name"StringType(), True),
    StructField("CompanyID"StringType(), True),
    StructField("EnterpriseID"StringType(), True),
    StructField("Type"StringType(), True),
    StructField("Address"StructType([
        StructField("AddressLine1"StringType(), True),
        StructField("AddressLine2"StringType(), True),
        StructField("City"StringType(), True),
        StructField("StateProvince"StringType(), True),
        StructField("Country"StringType(), True),
        StructField("PostalCode"StringType(), True),
        StructField("County"StringType(), True)
    ]), True),  
    StructField("PrimaryEmail"StringType(), True),
    StructField("Website"StringType(), True),
    StructField("IsActive"BooleanType(), True)  
]),
    "ProviderSuspensions.JSON" : StructType([
    StructField("ProviderID"IntegerType(), False),  
    StructField("Suspensions"ArrayType(StructType([
        StructField("SuspensionID"IntegerType(), False),
        StructField("LocationID"IntegerType(), True),  
        StructField("StartDate"StringType(), True),  
        StructField("LiftDate"StringType(), True),  
        StructField("Description"StringType(), True),  
        StructField("Status"StringType(), True),  
        StructField("IsValid"BooleanType(), False),  
        StructField("Level"StringType(), True),  
        StructField("Reason"StringType(), True),  
        StructField("LiftMethod"StringType(), True),  
        StructField("SuspensionType"StringType(), True),  
        StructField("Visibility"StringType(), True),  
        StructField("AffectedDivisions"ArrayType(StructType([
            StructField("Name"StringType(), True)  
        ])), True)  
    ])), True)  
])
}

--------------------------------------------------------------

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from azure.storage.blob import ContainerClient
import os

# Function to create or get Spark session with optimized configurations
def create_spark_session():
    return SparkSession.builder \
        .appName("Highly Optimized JSON Pipeline") \
        .config("spark.sql.catalogImplementation""hive") \
        .config("spark.sql.adaptive.enabled""true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled""true") \
        .config("spark.sql.files.maxPartitionBytes"128 * 1024 * 1024) \
        .config("spark.sql.files.minPartitionNum""256") \
        .config("spark.dynamicAllocation.enabled""true") \
        .config("spark.dynamicAllocation.minExecutors""4") \
        .config("spark.dynamicAllocation.maxExecutors""16") \
        .config("spark.executor.cores""32") \
        .config("spark.executor.memory""200g") \
        .config("spark.driver.memory""64g") \
        .config("spark.driver.cores""8") \
        .config("spark.sql.parquet.compression.codec""zstd") \
        .config("spark.memory.fraction""0.9") \
        .config("spark.memory.storageFraction""0.3") \
        .config("spark.sql.shuffle.partitions""256") \
        .config("spark.sql.adaptive.skewJoin.enabled""true") \
        .config("spark.serializer""org.apache.spark.serializer.KryoSerializer") \
        .config("fs.azure.account.key.amrdatalake01.blob.core.windows.net""e3dTBwzl+nu4xTxqsBj+WgWOy5fa2ve7SICaBjVOmrLOw5fYshIZk+FSoz8IoM87GttRSYyL5yJM+AStFYUMGw==") \
        .getOrCreate()

# Initialize Spark session globally (only on driver)
spark = create_spark_session()
sc = spark.sparkContext

# Define Storage Account Details
storage_account_name = "secret##"
container_name = "secret##"

# Define base paths using abfss directly
output_base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/dash.test/output"

def list_json_files():
    """List JSON files from Azure Blob Storage with robust authentication handling."""
    try:
        container_client = ContainerClient(
            account_url=f"https://{storage_account_name}.blob.core.windows.net",
            container_name=container_name,
            credential="secret##"
        )
        container_client.get_container_properties()
        print(f"Successfully authenticated with container {container_name}")

        all_blobs = list(container_client.list_blobs())
        print(f"Total blobs found in container: {len(all_blobs)}")
        print(f"Sample blob names: {''.join(blob.name for blob in all_blobs[:5]) if all_blobs else 'None'}")

        json_files = [f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{blob.name}" for blob in all_blobs if blob.name.endswith('.JSON')]
        print(f"Total JSON Files Found (.JSON): {len(json_files)}")
        print(f"Sample JSON File Paths: {json_files[:5if json_files else 'No JSON files found'}")

        return json_files
    except Exception as e:
        print(f"Error listing files: {e}")
        return []

# Helper functions for DataFrame processing
def flatten_structs(dfJW_99_0-1740614786516.png

 

    struct_fields = [field.name for field in df.schema.fields if isinstance(field.dataType, StructType)]
    while struct_fields:
        for field in struct_fields:
            empty_struct = spark.createDataFrame([], df.schema[field].dataType).first().asDict()
            empty_struct = {k: None for k in empty_struct}
            df = df.withColumn(field, when(col(field).isNotNull(), col(field)).otherwise(struct(**empty_struct)))
            subfields = df.schema[field].dataType.fields
            for subfield in subfields:
                new_col_name = f"{field}_{subfield.name}"
                df = df.withColumn(new_col_name, col(f"{field}.{subfield.name}"))
            df = df.drop(field)
        struct_fields = [field.name for field in df.schema.fields if isinstance(field.dataType, StructType)]
    return df

def process_array_columns(dfexplode_columnsJW_99_1-1740614786523.png

 

    array_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, ArrayType)]
    for col_name in explode_columns:
        if col_name in array_columns:
            df = df.withColumn(col_name, when(col(col_name).isNull(), array()).otherwise(col(col_name)))
            df = df.withColumn(col_name, explode_outer(col(col_name)))
    return df

def ensure_all_columns(dfexpected_columnsJW_99_2-1740614786524.png

 

    current_columns = df.columns
    for col_name, col_type in expected_columns.items():
        if col_name not in current_columns:
            df = df.withColumn(col_name, lit(None).cast(col_type))
    return df

def null_out_problematic_columns(dfJW_99_3-1740614786524.png

 

    problematic_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))]
    for col_name in problematic_columns:
        df = df.withColumn(col_name, lit(None).cast(StringType()))
    return df

def infer_and_merge_schema(file_pathpredefined_schema=NoneJW_99_4-1740614786525.png

 

    try:
        inferred_schema = spark.read.json(file_path, multiLine=True).schema
        if predefined_schema:
            for field in inferred_schema.fields:
                if field.name not in [f.name for f in predefined_schema.fields]:
                    predefined_schema.add(field)
            return predefined_schema
        return inferred_schema
    except Exception as e:
        print(f"Error inferring schema for {file_path}{e}")
        return predefined_schema

# Flattening functions for DataFrame processing
def flatten_accounting_detail(dffile_pathJW_99_5-1740614786525.png

 

    df = df.withColumn("JobID"col("JobID").cast(IntegerType()))
    df = df.withColumn("Invoice"explode_outer(col("Invoices")))
    df = df.withColumn("InvoiceID"col("Invoice.InvoiceID").cast(IntegerType())) \
           .withColumn("ExternalID"col("Invoice.ExternalID")) \
           .withColumn("InvoiceNumber"col("Invoice.InvoiceNumber")) \
           .withColumn("InvoiceDateAdded"col("Invoice.DateAdded").cast(TimestampType())) \
           .withColumn("InvoiceDateLastUpdate"col("Invoice.DateLastUpdate").cast(TimestampType())) \
           .withColumn("InvoiceMemo"col("Invoice.Memo")) \
           .withColumn("InvoiceDateInvoiced"col("Invoice.DateInvoiced").cast(TimestampType())) \
           .withColumn("BillTo"col("Invoice.BillTo")) \
           .withColumn("InvoiceClass"col("Invoice.InvoiceClass")) \
           .withColumn("InvoiceAmount"col("Invoice.Amount").cast(DoubleType())) \
           .withColumn("Tax"col("Invoice.Tax").cast(DoubleType())) \
           .withColumn("Source"col("Invoice.Source")) \
           .withColumn("InvoiceStatus"col("Invoice.Status")) \
           .withColumn("InvoiceBalance"col("Invoice.InvoiceBalance").cast(DoubleType())) \
           .withColumn("AdditionalInfoAmount"col("Invoice.AdditionalInfoAmount").cast(DoubleType())) \
           .withColumn("InvoiceType"col("Invoice.InvoiceType"))
    df = df.withColumn("InvoiceLineItem"explode_outer(col("Invoice.InvoiceLineItems")))
    df = df.withColumn("InvoiceLineItemID"col("InvoiceLineItem.InvoiceLineItemID").cast(IntegerType())) \
           .withColumn("InvoiceLineItemDate"col("InvoiceLineItem.LineDate").cast(TimestampType())) \
           .withColumn("Room"col("InvoiceLineItem.Room")) \
           .withColumn("CategoryID"col("InvoiceLineItem.CategoryID").cast(StringType())) \
           .withColumn("Details"col("InvoiceLineItem.Details")) \
           .withColumn("ItemDescription"col("InvoiceLineItem.ItemDescription")) \
           .withColumn("UnitQuantity"col("InvoiceLineItem.UnitQuantity").cast(IntegerType())) \
           .withColumn("UnitOfMeasure"col("InvoiceLineItem.UnitOfMeasure")) \
           .withColumn("Quantity"col("InvoiceLineItem.Quantity").cast(DoubleType())) \
           .withColumn("LaborRate"col("InvoiceLineItem.LaborRate").cast(DoubleType())) \
           .withColumn("MaterialRate"col("InvoiceLineItem.MaterialRate").cast(DoubleType())) \
           .withColumn("EquipmentRate"col("InvoiceLineItem.EquipmentRate").cast(DoubleType())) \
           .withColumn("Adjustment"col("InvoiceLineItem.Adjustment").cast(DoubleType())) \
           .withColumn("Overhead"col("InvoiceLineItem.Overhead").cast(DoubleType())) \
           .withColumn("Profit"col("InvoiceLineItem.Profit").cast(DoubleType())) \
           .withColumn("InvoiceLineItemRate"col("InvoiceLineItem.Rate").cast(DoubleType())) \
           .withColumn("InvoiceLineItemExtendedAmount"col("InvoiceLineItem.ExtendedAmount").cast(DoubleType())) \
           .withColumn("InvoiceLineItemAddedDate"col("InvoiceLineItem.AddedDate").cast(TimestampType())) \
           .withColumn("InvoiceLineItemUpdatedDate"col("InvoiceLineItem.UpdatedDate").cast(TimestampType())) \
           .withColumn("InvoiceLineItemTimeModified"col("InvoiceLineItem.TimeModified").cast(TimestampType())) \
           .withColumn("InvoiceLineItemStatus"col("InvoiceLineItem.Status"))
    df = df.withColumn("Payment"explode_outer(col("Payments")))
    df = df.withColumn("PaymentID"col("Payment.PaymentID").cast(IntegerType())) \
           .withColumn("PaymentInvoiceID"col("Payment.InvoiceID").cast(IntegerType())) \
           .withColumn("PaymentDateAdded"col("Payment.DateAdded").cast(TimestampType())) \
           .withColumn("PaymentDateLastUpdate"col("Payment.DateLastUpdate").cast(TimestampType())) \
           .withColumn("Mode"col("Payment.Mode")) \
           .withColumn("DatePaid"col("Payment.DatePaid").cast(TimestampType())) \
           .withColumn("ReferenceNumber"col("Payment.ReferenceNumber")) \
           .withColumn("PaymentAmount"col("Payment.Amount").cast(DoubleType())) \
           .withColumn("PaymentMemo"col("Payment.Memo")) \
           .withColumn("PaymentSource"col("Payment.Source")) \
           .withColumn("IsPaid"col("Payment.IsPaid").cast(BooleanType())) \
           .withColumn("PaymentStatus"col("Payment.Status")) \
           .withColumn("PaymentDiscountAmount"col("Payment.DiscountAmount").cast(DoubleType()))
    df = df.withColumn("JobCosting"explode_outer(col("JobCostings")))
    df = df.withColumn("JobCostingID"col("JobCosting.JobCostingID").cast(IntegerType())) \
           .withColumn("JobCostingDate"col("JobCosting.Date").cast(TimestampType())) \
           .withColumn("LaborHours"col("JobCosting.LaborHours").cast(DoubleType())) \
           .withColumn("JobCostTypeCategory"col("JobCosting.JobCostTypeCategory")) \
           .withColumn("JobCostingQuantity"col("JobCosting.Quantity").cast(DoubleType())) \
           .withColumn("UOM"col("JobCosting.UOM")) \
           .withColumn("JobCostingRate"col("JobCosting.Rate").cast(DoubleType())) \
           .withColumn("Extended"col("JobCosting.Extended").cast(DoubleType())) \
           .withColumn("Billable"col("JobCosting.Billable").cast(BooleanType())) \
           .withColumn("PaymentTo"col("JobCosting.PaymentTo")) \
           .withColumn("Description"col("JobCosting.Description")) \
           .withColumn("TxnType"col("JobCosting.TxnType")) \
           .withColumn("JobCostingStatus"col("JobCosting.Status")) \
           .withColumn("TransferFrom"col("JobCosting.TransferFrom")) \
           .withColumn("AddedBy"col("JobCosting.AddedBy")) \
           .withColumn("PO"col("JobCosting.PO")) \
           .withColumn("Item"col("JobCosting.Item")) \
           .withColumn("Service"col("JobCosting.Service")) \
           .withColumn("Memo"col("JobCosting.Memo")) \
           .withColumn("BillTxnId"col("JobCosting.BillTxnId")) \
           .withColumn("CalculatedBurden"col("JobCosting.CalculatedBurden").cast(DoubleType()))
    df = df.withColumn("LinkedVendorPayment"explode_outer(col("JobCosting.LinkedVendorPayments")))
    df = df.withColumn("VendorPaymentID"col("LinkedVendorPayment.VendorPaymentID").cast(LongType())) \
           .withColumn("VendorPaymentAmountPaid"col("LinkedVendorPayment.AmountPaid").cast(DoubleType())) \
           .withColumn("VendorPaymentDatePaid"col("LinkedVendorPayment.DatePaid")) \
           .withColumn("CheckNumber"col("LinkedVendorPayment.CheckNumber"))
    df = df.drop("Invoices""Payments""JobCostings""Invoice""InvoiceLineItem""Payment""JobCosting""LinkedVendorPayment")
    return df

def flatten_accounting_summary(dffile_pathJW_99_6-1740614786525.png

 

    if "JobId" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(LongType()))
    return df.select(
        col("JobId").alias("JobID"),
        col("IsForcedSync").alias("IsForcedSync"),
        col("MD5HashAsHexString").alias("MD5HashAsHexString"),
        col("ActualGrossProfit").alias("ActualGrossProfit"),
        col("ActualGrossProfitPercentage").alias("ActualGrossProfitPercentage"),
        col("AdjustedInvoiceSubtotal").alias("AdjustedInvoiceSubtotal"),
        col("BalanceOwing").alias("BalanceOwing"),
        col("ChangeOrderAmount").alias("ChangeOrderAmount"),
        col("CollectedSubtotal").alias("CollectedSubtotal"),
        col("ConsumablesCost").alias("ConsumablesCost"),
        col("EquipmentCost").alias("EquipmentCost"),
        col("EstimateGrossProfitAmountAfterWOAdjustment").alias("EstimateGrossProfitAmountAfterWOAdjustment"),
        col("EstimateGrossProfitPercentageAfterWOAdjustment").alias("EstimateGrossProfitPercentageAfterWOAdjustment"),
        col("EstimatedGrossProfitAmountFromEstimateImport").alias("EstimatedGrossProfitAmountFromEstimateImport"),
        col("EstimatedGrossProfitPercentageFromEstimateImport").alias("EstimatedGrossProfitPercentageFromEstimateImport"),
        col("EstimateGrossProfit").alias("EstimateGrossProfit"),
        col("EstimateUninvoicedAmount").alias("EstimateUninvoicedAmount"),
        col("EstimateUnpaid").alias("EstimateUnpaid"),
        col("GrossProfitPercentage").alias("GrossProfitPercentage"),
        col("InitialEstimate").alias("InitialEstimate"),
        col("InvoiceSubtotal").alias("InvoiceSubtotal"),
        col("TotalInvoiced").alias("TotalInvoiced"),
        col("LaborCost").alias("LaborCost"),
        col("MaterialsCost").alias("MaterialsCost"),
        col("OriginalEstimate").alias("OriginalEstimate"),
        col("OtherCost").alias("OtherCost"),
        col("ProfessionalFee").alias("ProfessionalFee"),
        col("RecognizedRevenue").alias("RecognizedRevenue"),
        col("ReferralFeeCost").alias("ReferralFeeCost"),
        col("SubtradeCost").alias("SubtradeCost"),
        col("SupplementEstimate").alias("SupplementEstimate"),
        col("TotalCollected").alias("TotalCollected"),
        col("TotalEstimates").alias("TotalEstimates"),
        col("TotalJobCost").alias("TotalJobCost"),
        col("TotalWorkOrderBudget").alias("TotalWorkOrderBudget"),
        col("WarrantyCost").alias("WarrantyCost"),
        col("EstimateDepreciation").alias("EstimateDepreciation")
    )

def flatten_internal_participants(dffile_pathJW_99_7-1740614786525.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = df.withColumn("Participant"explode_outer(col("Participants")))
    return df.select(
        col("JobID").alias("JobID"),
        col("Participant.ID").cast(LongType()).alias("ParticipantID"),
        col("Participant.Type").alias("ParticipantType"),
        col("Participant.PersonName.FirstName").alias("FirstName"),
        col("Participant.PersonName.LastName").alias("LastName")
    )

def flatten_job_dates(dffile_pathJW_99_8-1740614786526.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = df.withColumn("Dates"when(col("Dates").isNotNull(), col("Dates")).otherwise(array()))
    df = df.withColumn("Date"explode_outer(col("Dates")))
    df = df.withColumn("Date_AuditDetails",
                       when(col("Date.AuditDetails").isNotNull(), col("Date.AuditDetails"))
                       .otherwise(struct(lit(None).alias("EnteredBy"), lit(None).alias("WhenEntered"))))
    df = df.withColumn("Date_AuditDetails_EnteredBy",
                       when(col("Date_AuditDetails.EnteredBy").isNotNull(), col("Date_AuditDetails.EnteredBy"))
                       .otherwise(struct(lit(None).alias("FirstName"), lit(None).alias("LastName"))))
    return df.select(
        col("JobID").alias("JobID"),
        col("Date.DateTypeID").cast(IntegerType()).alias("DateTypeID"),
        col("Date.DateName").alias("DateName"),
        col("Date.Value").cast(TimestampType()).alias("DateValue"),
        col("Date_AuditDetails.WhenEntered").cast(TimestampType()).alias("WhenEntered"),
        col("Date_AuditDetails_EnteredBy.FirstName").alias("EnteredByFirstName"),
        col("Date_AuditDetails_EnteredBy.LastName").alias("EnteredByLastName")
    )

def flatten_job_detail(dffile_pathJW_99_9-1740614786526.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = flatten_structs(df)
    df = process_array_columns(df, [
        "Customer_IndividualDetails_ContactsDetails_Email",
        "Customer_IndividualDetails_ContactsDetails_Address_AddressLine1",
        "Customer_IndividualDetails_ContactsDetails_Address_AddressLine2",
        "Customer_IndividualDetails_ContactsDetails_Address_City",
        "Customer_IndividualDetails_ContactsDetails_Address_StateProvince",
        "Customer_IndividualDetails_ContactsDetails_Address_PostalCode",
        "Customer_IndividualDetails_ContactsDetails_Address_Country",
        "Customer_IndividualDetails_ContactsDetails_MainPhone_Number",
        "Customer_IndividualDetails_ContactsDetails_MainPhone_Extension",
        "Customer_IndividualDetails_ContactsDetails_OtherPhones_Category",
        "Customer_IndividualDetails_ContactsDetails_OtherPhones_PhoneNumbers_Number",
        "Customer_IndividualDetails_ContactsDetails_OtherPhones_PhoneNumbers_Extension",
        "Customer_CompanyDetails_ContactsDetails_Email",
        "Customer_CompanyDetails_ContactsDetails_Address_AddressLine1",
        "Customer_CompanyDetails_ContactsDetails_Address_AddressLine2",
        "Customer_CompanyDetails_ContactsDetails_Address_City",
        "Customer_CompanyDetails_ContactsDetails_Address_StateProvince",
        "Customer_CompanyDetails_ContactsDetails_Address_PostalCode",
        "Customer_CompanyDetails_ContactsDetails_Address_Country",
        "Customer_CompanyDetails_ContactsDetails_MainPhone_Number",
        "Customer_CompanyDetails_ContactsDetails_MainPhone_Extension",
        "Customer_CompanyDetails_ContactsDetails_OtherPhones_Category",
        "Customer_CompanyDetails_ContactsDetails_OtherPhones_PhoneNumbers_Number",
        "Customer_CompanyDetails_ContactsDetails_OtherPhones_PhoneNumbers_Extension",
        "JobInfo_RoomsAffected",
        "AdminInfo_Services",
        "JobReferralDetails_SourceCompany",
        "JobReferralDetails_SourceIndividual",
        "JobReferralDetails_SourceEmployee",
        "JobReferralDetails_SourceMarketingCampaign",
    ])
    df = ensure_all_columns(df, df.schema.fields)
    df = null_out_problematic_columns(df)
    return df

def flatten_job_external_participants(dffile_pathJW_99_10-1740614786526.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = process_array_columns(df, [
        "CompanyRelations",
        "CompanyRelations.CompanyDetails.ContactsDetails",
        "IndividualRelations",
        "IndividualRelations.IndividualDetails.ContactsDetails"
    ])
    df = df.withColumn("CompanyRelations"explode_outer(col("CompanyRelations")))
    df = df.withColumn("IndividualRelations"explode_outer(col("IndividualRelations")))
    df = df.withColumn("CompanyContacts"explode_outer(col("CompanyRelations.CompanyDetails.ContactsDetails")))
    df = df.withColumn("IndividualContacts"explode_outer(col("IndividualRelations.IndividualDetails.ContactsDetails")))
    df = df.select(
        col("JobID").alias("JobID"),
        col("BillToType").alias("BillToType"),
        col("CompanyRelations.RelationshipType").alias("CompanyRelationshipType"),
        col("CompanyRelations.RelationshipTypeName").alias("CompanyRelationshipTypeName"),
        col("CompanyRelations.CompanyDetails.CompanyID").alias("CompanyID"),
        col("CompanyRelations.CompanyDetails.CompanyName").alias("CompanyName"),
        col("CompanyRelations.CompanyDetails.MarketingRank").alias("CompanyMarketingRank"),
        col("CompanyContacts.Email").alias("CompanyContactEmail"),
        col("CompanyContacts.Website").alias("CompanyContactWebsite"),
        col("CompanyContacts.Address.AddressLine1").alias("CompanyAddressLine1"),
        col("CompanyContacts.Address.AddressLine2").alias("CompanyAddressLine2"),
        col("CompanyContacts.Address.City").alias("CompanyAddressCity"),
        col("CompanyContacts.Address.StateProvince").alias("CompanyAddressStateProvince"),
        col("CompanyContacts.Address.Country").alias("CompanyAddressCountry"),
        col("CompanyContacts.Address.PostalCode").alias("CompanyAddressPostalCode"),
        col("CompanyContacts.Address.County").alias("CompanyAddressCounty"),
        col("CompanyContacts.MainPhone.Number").alias("CompanyMainPhoneNumber"),
        col("CompanyContacts.MainPhone.Extension").alias("CompanyMainPhoneExtension"),
        col("IndividualRelations.RelationshipType").alias("IndividualRelationshipType"),
        col("IndividualRelations.RelationshipTypeName").alias("IndividualRelationshipTypeName"),
        col("IndividualRelations.IndividualDetails.IndividualID").alias("IndividualID"),
        col("IndividualRelations.IndividualDetails.Title").alias("IndividualTitle"),
        col("IndividualRelations.IndividualDetails.CompanyName").alias("IndividualCompanyName"),
        col("IndividualRelations.IndividualDetails.PreferredCommunicationMethod").alias("IndividualPreferredCommunicationMethod"),
        col("IndividualRelations.IndividualDetails.MarketingRank").alias("IndividualMarketingRank"),
        col("IndividualContacts.Email").alias("IndividualContactEmail"),
        col("IndividualContacts.Address.AddressLine1").alias("IndividualAddressLine1"),
        col("IndividualContacts.Address.AddressLine2").alias("IndividualAddressLine2"),
        col("IndividualContacts.Address.City").alias("IndividualAddressCity"),
        col("IndividualContacts.Address.StateProvince").alias("IndividualAddressStateProvince"),
        col("IndividualContacts.Address.Country").alias("IndividualAddressCountry"),
        col("IndividualContacts.Address.PostalCode").alias("IndividualAddressPostalCode"),
        col("IndividualContacts.Address.County").alias("IndividualAddressCounty"),
        col("IndividualContacts.MainPhone.Number").alias("IndividualMainPhoneNumber"),
        col("IndividualContacts.MainPhone.Extension").alias("IndividualMainPhoneExtension"),
        col("IndividualRelations.IndividualDetails.PersonName.FirstName").alias("IndividualFirstName"),
        col("IndividualRelations.IndividualDetails.PersonName.LastName").alias("IndividualLastName")
    )
    df = ensure_all_columns(df, {
        "JobID"IntegerType(),
        "BillToType"IntegerType(),
        "CompanyRelationshipType"IntegerType(),
        "CompanyRelationshipTypeName"StringType(),
        "CompanyID"IntegerType(),
        "CompanyName"StringType(),
        "CompanyMarketingRank"StringType(),
        "CompanyContactEmail"StringType(),
        "CompanyContactWebsite"StringType(),
        "CompanyAddressLine1"StringType(),
        "CompanyAddressLine2"StringType(),
        "CompanyAddressCity"StringType(),
        "CompanyAddressStateProvince"StringType(),
        "CompanyAddressCountry"StringType(),
        "CompanyAddressPostalCode"StringType(),
        "CompanyAddressCounty"StringType(),
        "CompanyMainPhoneNumber"StringType(),
        "CompanyMainPhoneExtension"StringType(),
        "IndividualRelationshipType"IntegerType(),
        "IndividualRelationshipTypeName"StringType(),
        "IndividualID"IntegerType(),
        "IndividualTitle"StringType(),
        "IndividualCompanyName"StringType(),
        "IndividualPreferredCommunicationMethod"StringType(),
        "IndividualMarketingRank"StringType(),
        "IndividualContactEmail"StringType(),
        "IndividualAddressLine1"StringType(),
        "IndividualAddressLine2"StringType(),
        "IndividualAddressCity"StringType(),
        "IndividualAddressStateProvince"StringType(),
        "IndividualAddressCountry"StringType(),
        "IndividualAddressPostalCode"StringType(),
        "IndividualAddressCounty"StringType(),
        "IndividualMainPhoneNumber"StringType(),
        "IndividualMainPhoneExtension"StringType(),
        "IndividualFirstName"StringType(),
        "IndividualLastName"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_job_tags(dffile_pathJW_99_11-1740614786527.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = process_array_columns(df, ["JobTags"])
    df = df.withColumn("JobTag"explode_outer(col("JobTags")))
    df = df.withColumn("JobTags_TagName"col("JobTag.TagName").cast(StringType())).drop("JobTag")
    df = ensure_all_columns(df, {"JobID"IntegerType(), "JobTags_TagName"StringType()})
    df = null_out_problematic_columns(df)
    return df

def flatten_survey_summary(dffile_pathJW_99_12-1740614786527.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = process_array_columns(df, ["SurveyDetails"])
    df = df.withColumn("SurveyDetail"explode_outer(col("SurveyDetails")))
    df = df.select(
        col("JobID").alias("JobID"),
        col("MostRecentSurveyDate").alias("MostRecentSurveyDate"),
        col("SurveyDetail.SurveyScore").alias("SurveyScore"),
        col("SurveyDetail.SurveyStatus").alias("SurveyStatus"),
        col("SurveyDetail.IsIncludedForSurvey").alias("IsIncludedForSurvey"),
        col("SurveyDetail.IsSendTriggerCompleted").alias("IsSendTriggerCompleted"),
        col("SurveyDetail.SurveyScoreValue").alias("SurveyScoreValue"),
        col("SurveyDetail.SurveyScoreLabel").alias("SurveyScoreLabel"),
        col("SurveyDetail.NoSurveyReason").alias("NoSurveyReason")
    )
    df = df.withColumn("SurveyStatus",
                       when(col("SurveyStatus"== 0"Yes")
                       .when(col("SurveyStatus"== 1"No")
                       .when(col("SurveyStatus"== 2"Sent")
                       .when(col("SurveyStatus"== 3"Received")
                       .otherwise(lit(None)))
    return df

def flatten_linked_assignments(dffile_pathJW_99_13-1740614786527.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(LongType()))
    df = df.withColumn("Assignment"explode_outer(col("LinkedExternalAssignments")))
    return df.select(
        col("JobID").alias("JobID"),
        col("Assignment.ID").cast(IntegerType()).alias("AssignmentID"),
        col("Assignment.ExternalSystemType").cast(IntegerType()).alias("ExternalSystemType"),
        col("Assignment.ExternalID").alias("ExternalID")
    )

def flatten_notes_summary(dffile_pathJW_99_14-1740614786527.png

 

    if "JobID" not in df.columns:
        df = df.withColumn("JobID"lit(file_path.split("/")[-2]).cast(LongType()))
    return df.select(
        col("JobID").alias("JobID"),
        col("LastNoteAdded").alias("LastNoteAdded"),
        col("LastNoteAddedTime").alias("LastNoteAddedTime")
    )

def flatten_company_contacts(dffile_pathJW_99_15-1740614786527.png

 

    if "CompanyID" not in df.columns:
        df = df.withColumn("CompanyID"lit(file_path.split("/")[-2]).cast(LongType()))
    df = df.withColumn("Contacts"when(col("Contacts").isNull(), array()).otherwise(col("Contacts")))
    df = df.withColumn("Contact"explode_outer(col("Contacts")))
    df = df.withColumn("ContactID"col("Contact.ContactID").cast(LongType())) \
           .withColumn("CorrespondenceEmail"col("Contact.CorrespondenceEmail").cast(StringType())) \
           .withColumn("InquiryEmail"col("Contact.InquiryEmail").cast(StringType())) \
           .withColumn("Website"col("Contact.Website").cast(StringType())) \
           .withColumn("MainPhoneNumber"col("Contact.MainPhone.Number").cast(StringType())) \
           .withColumn("MainPhoneExtension"col("Contact.MainPhone.Extension").cast(StringType())) \
           .withColumn("AddressLine1"col("Contact.Address.AddressLine1").cast(StringType())) \
           .withColumn("AddressLine2"col("Contact.Address.AddressLine2").cast(StringType())) \
           .withColumn("City"col("Contact.Address.City").cast(StringType())) \
           .withColumn("StateProvince"col("Contact.Address.StateProvince").cast(StringType())) \
           .withColumn("Country"col("Contact.Address.Country").cast(StringType())) \
           .withColumn("PostalCode"col("Contact.Address.PostalCode").cast(StringType())) \
           .withColumn("County"col("Contact.Address.County").cast(StringType())) \
           .withColumn("BillingAddressLine1"col("Contact.BillingAddress.AddressLine1").cast(StringType())) \
           .withColumn("BillingAddressLine2"col("Contact.BillingAddress.AddressLine2").cast(StringType())) \
           .withColumn("BillingCity"col("Contact.BillingAddress.City").cast(StringType())) \
           .withColumn("BillingStateProvince"col("Contact.BillingAddress.StateProvince").cast(StringType())) \
           .withColumn("BillingCountry"col("Contact.BillingAddress.Country").cast(StringType())) \
           .withColumn("BillingPostalCode"col("Contact.BillingAddress.PostalCode").cast(StringType())) \
           .withColumn("BillingCounty"col("Contact.BillingAddress.County").cast(StringType())) \
           .withColumn("MailingAddressLine1"col("Contact.MailingAddress.AddressLine1").cast(StringType())) \
           .withColumn("MailingAddressLine2"col("Contact.MailingAddress.AddressLine2").cast(StringType())) \
           .withColumn("MailingCity"col("Contact.MailingAddress.City").cast(StringType())) \
           .withColumn("MailingStateProvince"col("Contact.MailingAddress.StateProvince").cast(StringType())) \
           .withColumn("MailingCountry"col("Contact.MailingAddress.Country").cast(StringType())) \
           .withColumn("MailingPostalCode"col("Contact.MailingAddress.PostalCode").cast(StringType())) \
           .withColumn("MailingCounty"col("Contact.MailingAddress.County").cast(StringType())) \
           .drop("Contacts""Contact")
    df = ensure_all_columns(df, {
        "CompanyID"LongType(),
        "ContactID"LongType(),
        "CorrespondenceEmail"StringType(),
        "InquiryEmail"StringType(),
        "Website"StringType(),
        "MainPhoneNumber"StringType(),
        "MainPhoneExtension"StringType(),
        "AddressLine1"StringType(),
        "AddressLine2"StringType(),
        "City"StringType(),
        "StateProvince"StringType(),
        "Country"StringType(),
        "PostalCode"StringType(),
        "County"StringType(),
        "BillingAddressLine1"StringType(),
        "BillingAddressLine2"StringType(),
        "BillingCity"StringType(),
        "BillingStateProvince"StringType(),
        "BillingCountry"StringType(),
        "BillingPostalCode"StringType(),
        "BillingCounty"StringType(),
        "MailingAddressLine1"StringType(),
        "MailingAddressLine2"StringType(),
        "MailingCity"StringType(),
        "MailingStateProvince"StringType(),
        "MailingCountry"StringType(),
        "MailingPostalCode"StringType(),
        "MailingCounty"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_company_detail(dffile_pathJW_99_16-1740614786528.png

 

    if "CompanyID" not in df.columns:
        df = df.withColumn("CompanyID"lit(file_path.split("/")[-2]).cast(LongType()))
    df = df.withColumn("IsActive"col("IsActive").cast(BooleanType()))
    if isinstance([f.dataType for f in df.schema.fields if f.name == "CompanyInfo"][0], StringType):
        df = df.withColumn("CompanyInfo"from_json(col("CompanyInfo"), StructType([
            StructField("Categories"ArrayType(StringType()), True),
            StructField("FranchiseeID"LongType(), True),
            StructField("GroupAndRoutes"StringType(), True),
            StructField("MarketingCampaigns"ArrayType(StringType()), True),
            StructField("Name"StringType(), True),
            StructField("ParentCompanyID"LongType(), True),
            StructField("Rank"StringType(), True),
            StructField("ReferralType"StringType(), True),
            StructField("ReferredBy"StringType(), True),
            StructField("ResponsibleRep"StructType([
                StructField("FirstName"StringType(), True),
                StructField("LastName"StringType(), True)
            ]), True),
            StructField("SageAccountNumber"StringType(), True),
            StructField("SalesStage"StringType(), True),
            StructField("SalesStatus"StringType(), True),
            StructField("Type"StringType(), True)
        ])))
    df = df.withColumn("CompanyInfo_ResponsibleRep_FirstName"col("CompanyInfo.ResponsibleRep.FirstName").cast(StringType())) \
           .withColumn("CompanyInfo_ResponsibleRep_LastName"col("CompanyInfo.ResponsibleRep.LastName").cast(StringType())) \
           .drop("CompanyInfo.ResponsibleRep")
    df = df.withColumn("CompanyInfo_Categories"concat_ws(""col("CompanyInfo.Categories")).cast(StringType())) \
           .withColumn("CompanyInfo_MarketingCampaigns"concat_ws(""col("CompanyInfo.MarketingCampaigns")).cast(StringType())) \
           .drop("CompanyInfo.Categories""CompanyInfo.MarketingCampaigns")
    df = df.select(
        col("CompanyID").alias("CompanyID"),
        col("IsActive").alias("IsActive"),
        col("CompanyInfo_FranchiseeID").alias("CompanyInfo_FranchiseeID"),
        col("CompanyInfo_GroupAndRoutes").alias("CompanyInfo_GroupAndRoutes"),
        col("CompanyInfo_Name").alias("CompanyInfo_Name"),
        col("CompanyInfo_ParentCompanyID").alias("CompanyInfo_ParentCompanyID"),
        col("CompanyInfo_Rank").alias("CompanyInfo_Rank"),
        col("CompanyInfo_ReferralType").alias("CompanyInfo_ReferralType"),
        col("CompanyInfo_ReferredBy").alias("CompanyInfo_ReferredBy"),
        col("CompanyInfo_ResponsibleRep_FirstName").alias("CompanyInfo_ResponsibleRep_FirstName"),
        col("CompanyInfo_ResponsibleRep_LastName").alias("CompanyInfo_ResponsibleRep_LastName"),
        col("CompanyInfo_SageAccountNumber").alias("CompanyInfo_SageAccountNumber"),
        col("CompanyInfo_SalesStage").alias("CompanyInfo_SalesStage"),
        col("CompanyInfo_SalesStatus").alias("CompanyInfo_SalesStatus"),
        col("CompanyInfo_Type").alias("CompanyInfo_Type"),
        col("CompanyInfo_Categories").alias("CompanyInfo_Categories"),
        col("CompanyInfo_MarketingCampaigns").alias("CompanyInfo_MarketingCampaigns")
    ).drop("CompanyInfo")
    df = ensure_all_columns(df, {
        "CompanyID"LongType(),
        "IsActive"BooleanType(),
        "CompanyInfo_FranchiseeID"LongType(),
        "CompanyInfo_GroupAndRoutes"StringType(),
        "CompanyInfo_Name"StringType(),
        "CompanyInfo_ParentCompanyID"LongType(),
        "CompanyInfo_Rank"StringType(),
        "CompanyInfo_ReferralType"StringType(),
        "CompanyInfo_ReferredBy"StringType(),
        "CompanyInfo_ResponsibleRep_FirstName"StringType(),
        "CompanyInfo_ResponsibleRep_LastName"StringType(),
        "CompanyInfo_SageAccountNumber"StringType(),
        "CompanyInfo_SalesStage"StringType(),
        "CompanyInfo_SalesStatus"StringType(),
        "CompanyInfo_Type"StringType(),
        "CompanyInfo_Categories"StringType(),
        "CompanyInfo_MarketingCampaigns"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_individual_contacts(dffile_pathJW_99_17-1740614786528.png

 

    if "IndividualID" not in df.columns:
        df = df.withColumn("IndividualID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = df.withColumn("Contacts"when(col("Contacts").isNull(), array()).otherwise(col("Contacts")))
    df = df.withColumn("Contact"explode_outer(col("Contacts")))
    df = df.withColumn("ContactID"col("Contact.ContactID").cast(IntegerType())) \
           .withColumn("Email"col("Contact.Email").cast(StringType())) \
           .withColumn("SecondaryEmail"col("Contact.SecondaryEmail").cast(StringType())) \
           .withColumn("MainPhoneNumber"col("Contact.MainPhone.Number").cast(StringType())) \
           .withColumn("MainPhoneExtension"col("Contact.MainPhone.Extension").cast(StringType())) \
           .withColumn("AddressLine1"col("Contact.Address.AddressLine1").cast(StringType())) \
           .withColumn("AddressLine2"col("Contact.Address.AddressLine2").cast(StringType())) \
           .withColumn("City"col("Contact.Address.City").cast(StringType())) \
           .withColumn("StateProvince"col("Contact.Address.StateProvince").cast(StringType())) \
           .withColumn("Country"col("Contact.Address.Country").cast(StringType())) \
           .withColumn("PostalCode"col("Contact.Address.PostalCode").cast(StringType())) \
           .withColumn("County"col("Contact.Address.County").cast(StringType())) \
           .drop("Contacts""Contact""OtherPhones""OtherPhone""PhoneNumbers""PhoneNumber")
    df = ensure_all_columns(df, {
        "IndividualID"IntegerType(),
        "ContactID"IntegerType(),
        "Email"StringType(),
        "SecondaryEmail"StringType(),
        "MainPhoneNumber"StringType(),
        "MainPhoneExtension"StringType(),
        "AddressLine1"StringType(),
        "AddressLine2"StringType(),
        "City"StringType(),
        "StateProvince"StringType(),
        "Country"StringType(),
        "PostalCode"StringType(),
        "County"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_individual_detail(dffile_pathJW_99_18-1740614786528.png

 

    if "IndividualID" not in df.columns:
        df = df.withColumn("IndividualID"lit(file_path.split("/")[-2]).cast(LongType()))
    df = df.withColumn("IsActive"col("IsActive").cast(BooleanType()))
    if "IndividualDetailInfo.Assistant" in df.columns:
        df = df.withColumn("AssistantFirstName"col("IndividualDetailInfo.Assistant.FirstName").cast(StringType())) \
               .withColumn("AssistantLastName"col("IndividualDetailInfo.Assistant.LastName").cast(StringType())) \
               .drop("IndividualDetailInfo.Assistant")
    else:
        df = df.withColumn("AssistantFirstName"lit(None).cast(StringType())) \
               .withColumn("AssistantLastName"lit(None).cast(StringType()))
    if "IndividualDetailInfo.ResponsibleRep" in df.columns:
        df = df.withColumn("ResponsibleRepFirstName"col("IndividualDetailInfo.ResponsibleRep.FirstName").cast(StringType())) \
               .withColumn("ResponsibleRepLastName"col("IndividualDetailInfo.ResponsibleRep.LastName").cast(StringType())) \
               .drop("IndividualDetailInfo.ResponsibleRep")
    else:
        df = df.withColumn("ResponsibleRepFirstName"lit(None).cast(StringType())) \
               .withColumn("ResponsibleRepLastName"lit(None).cast(StringType()))
    df = df.withColumn("Categories"concat_ws(""col("IndividualDetailInfo.Categories")).cast(StringType())) \
           .withColumn("MarketingCampaigns"concat_ws(""col("IndividualDetailInfo.MarketingCampaigns")).cast(StringType())) \
           .drop("IndividualDetailInfo.Categories""IndividualDetailInfo.MarketingCampaigns")
    df = df.select(
        col("IndividualID").alias("IndividualID"),
        col("IsActive").alias("IsActive"),
        col("IndividualDetailInfo.Anniversary").alias("Anniversary"),
        col("AssistantFirstName").alias("AssistantFirstName"),
        col("AssistantLastName").alias("AssistantLastName"),
        col("Categories").alias("Categories"),
        col("IndividualDetailInfo.CompanyID").cast(LongType()).alias("CompanyID"),
        col("IndividualDetailInfo.ContactType").alias("ContactType"),
        col("IndividualDetailInfo.DateOfBirth").alias("DateOfBirth"),
        col("IndividualDetailInfo.FranchiseeId").cast(LongType()).alias("FranchiseeId"),
        col("IndividualDetailInfo.GroupAndRoutes").alias("GroupAndRoutes"),
        col("IndividualDetailInfo.JobTitle").alias("JobTitle"),
        col("MarketingCampaigns").alias("MarketingCampaigns"),
        col("IndividualDetailInfo.Name.FirstName").alias("FirstName"),
        col("IndividualDetailInfo.Name.LastName").alias("LastName"),
        col("IndividualDetailInfo.ParentCompany").alias("ParentCompany"),
        col("IndividualDetailInfo.PotentialReferralRevenue").alias("PotentialReferralRevenue"),
        col("IndividualDetailInfo.PotentialReferralVolume").alias("PotentialReferralVolume"),
        col("IndividualDetailInfo.Rank").alias("Rank"),
        col("IndividualDetailInfo.ReferralType").alias("ReferralType"),
        col("IndividualDetailInfo.ReferredBy").alias("ReferredBy"),
        col("ResponsibleRepFirstName").alias("ResponsibleRepFirstName"),
        col("ResponsibleRepLastName").alias("ResponsibleRepLastName"),
        col("IndividualDetailInfo.SageAccountNumber").alias("SageAccountNumber"),
        col("IndividualDetailInfo.SalesStage").alias("SalesStage"),
        col("IndividualDetailInfo.SalesStatus").alias("SalesStatus"),
        col("IndividualDetailInfo.Title").alias("Title")
    )
    df = ensure_all_columns(df, {
        "IndividualID"LongType(),
        "IsActive"BooleanType(),
        "Anniversary"StringType(),
        "AssistantFirstName"StringType(),
        "AssistantLastName"StringType(),
        "Categories"StringType(),
        "CompanyID"LongType(),
        "ContactType"StringType(),
        "DateOfBirth"StringType(),
        "FranchiseeId"LongType(),
        "GroupAndRoutes"StringType(),
        "JobTitle"StringType(),
        "MarketingCampaigns"StringType(),
        "FirstName"StringType(),
        "LastName"StringType(),
        "ParentCompany"StringType(),
        "PotentialReferralRevenue"StringType(),
        "PotentialReferralVolume"StringType(),
        "Rank"StringType(),
        "ReferralType"StringType(),
        "ReferredBy"StringType(),
        "ResponsibleRepFirstName"StringType(),
        "ResponsibleRepLastName"StringType(),
        "SageAccountNumber"StringType(),
        "SalesStage"StringType(),
        "SalesStatus"StringType(),
        "Title"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_provider_detail(dffile_pathJW_99_19-1740614786528.png

 

    if "ProviderID" not in df.columns:
        df = df.withColumn("ProviderID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    return df.select(
        col("ProviderID").alias("ProviderID"),
        col("FranchisorID").alias("FranchisorID"),
        col("Name").alias("ProviderName"),
        col("CompanyID").alias("CompanyID"),
        col("EnterpriseID").alias("EnterpriseID"),
        col("Type").alias("ProviderType"),
        col("PrimaryEmail").alias("PrimaryEmail"),
        col("Website").alias("Website"),
        col("IsActive").alias("IsActive"),
        col("Address.AddressLine1").alias("AddressLine1"),
        col("Address.AddressLine2").alias("AddressLine2"),
        col("Address.City").alias("City"),
        col("Address.StateProvince").alias("StateProvince"),
        col("Address.Country").alias("Country"),
        col("Address.PostalCode").alias("PostalCode"),
        col("Address.County").alias("County")
    )

def flatten_provider_scores(dffile_pathJW_99_20-1740614786528.png

 

    if "ProviderID" not in df.columns:
        df = df.withColumn("ProviderID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    if "Vpasses" in df.columns:
        df = df.withColumn("Vpass"explode_outer(col("Vpasses")))
    return df.select(
        col("ProviderID").alias("ProviderID"),
        col("Vpass.Score").alias("VpassScore"),
        col("Vpass.IsAdminScore").alias("IsAdminScore"),
        col("NpsScore").alias("NpsScore")
    )

def flatten_provider_suspensions(dffile_pathJW_99_21-1740614786529.png

 

    if "ProviderID" not in df.columns:
        df = df.withColumn("ProviderID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = df.withColumn("Suspensions"when(col("Suspensions").isNull() | (size(col("Suspensions")) == 0), lit(None)).otherwise(col("Suspensions"))).drop("Suspensions")
    df = ensure_all_columns(df, {
        "ProviderID"IntegerType(),
        "SuspensionID"IntegerType(),
        "LocationID"IntegerType(),
        "StartDate"StringType(),
        "LiftDate"StringType(),
        "Description"StringType(),
        "Status"StringType(),
        "IsValid"BooleanType(),
        "Level"StringType(),
        "Reason"StringType(),
        "LiftMethod"StringType(),
        "SuspensionType"StringType(),
        "Visibility"StringType(),
        "AffectedDivisionName"StringType()
    })
    df = null_out_problematic_columns(df)
    return df

def flatten_location_detail(dffile_pathJW_99_22-1740614786529.png

 

    if "LocationID" not in df.columns:
        df = df.withColumn("LocationID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    df = df.withColumn("EdiAddressLine1"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.AddressLine1")).otherwise(lit(None))) \
           .withColumn("EdiAddressLine2"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.AddressLine2")).otherwise(lit(None))) \
           .withColumn("EdiCity"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.City")).otherwise(lit(None))) \
           .withColumn("EdiStateProvince"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.StateProvince")).otherwise(lit(None))) \
           .withColumn("EdiPostalCode"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.PostalCode")).otherwise(lit(None))) \
           .withColumn("EdiCountry"when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.Country")).otherwise(lit(None))) \
           .drop("EdiAddress")
    df = df.withColumn("AddressLine1"when(expr("typeof(Address) = 'struct'"), col("Address.AddressLine1")).otherwise(lit(None))) \
           .withColumn("AddressLine2"when(expr("typeof(Address) = 'struct'"), col("Address.AddressLine2")).otherwise(lit(None))) \
           .withColumn("City"when(expr("typeof(Address) = 'struct'"), col("Address.City")).otherwise(lit(None))) \
           .withColumn("StateProvince"when(expr("typeof(Address) = 'struct'"), col("Address.StateProvince")).otherwise(lit(None))) \
           .withColumn("PostalCode"when(expr("typeof(Address) = 'struct'"), col("Address.PostalCode")).otherwise(lit(None))) \
           .withColumn("Country"when(expr("typeof(Address) = 'struct'"), col("Address.Country")).otherwise(lit(None))) \
           .drop("Address")
    return df.select(
        col("LocationID").alias("LocationID"),
        col("EdiAddressLine1").alias("EdiAddressLine1"),
        col("EdiAddressLine2").alias("EdiAddressLine2"),
        col("EdiCity").alias("EdiCity"),
        col("EdiStateProvince").alias("EdiStateProvince"),
        col("EdiPostalCode").alias("EdiPostalCode"),
        col("EdiCountry").alias("EdiCountry"),
        col("AddressLine1").alias("AddressLine1"),
        col("AddressLine2").alias("AddressLine2"),
        col("City").alias("City"),
        col("StateProvince").alias("StateProvince"),
        col("PostalCode").alias("PostalCode"),
        col("Country").alias("Country")
    )

def flatten_location_scores(dffile_pathJW_99_23-1740614786529.png

 

    if "LocationID" not in df.columns:
        df = df.withColumn("LocationID"lit(file_path.split("/")[-2]).cast(IntegerType()))
    if "Vpasses" in df.columns:
        df = df.withColumn("Vpass"explode_outer(col("Vpasses")))
    return df.select(
        col("LocationID").alias("LocationID"),
        col("CloseRatio").alias("CloseRatio"),
        col("NpsScore").alias("NpsScore"),
        col("Vpass.Score").alias("VpassScore"),
        col("Vpass.IsAdminScore").alias("IsAdminScore")
    )

# Define transformation mapping
transformation_mapping = {
    "AccountingDetail.JSON": flatten_accounting_detail,
    "AccountingSummary.JSON": flatten_accounting_summary,
    "InternalParticipants.JSON": flatten_internal_participants,
    "JobDates.JSON": flatten_job_dates,
    "JobDetail.JSON": flatten_job_detail,
    "JobExternalParticipants.JSON": flatten_job_external_participants,
    "JobTags.JSON": flatten_job_tags,
    "SurveySummary.JSON": flatten_survey_summary,
    "LinkedAssignmentDetails.JSON": flatten_linked_assignments,
    "NotesSummary.JSON": flatten_notes_summary,
    "CompanyContacts.JSON": flatten_company_contacts,
    "CompanyDetail.JSON": flatten_company_detail,
    "IndividualContacts.JSON": flatten_individual_contacts,
    "IndividualDetail.JSON": flatten_individual_detail,
    "ProviderDetail.JSON": flatten_provider_detail,
    "ProviderScores.JSON": flatten_provider_scores,
    "ProviderSuspensions.JSON": flatten_provider_suspensions,
    "LocationDetail.JSON": flatten_location_detail,
    "LocationScores.JSON": flatten_location_scores
}
transformation_mapping_bc = sc.broadcast(transformation_mapping)

# Broadcast schemas (assuming schemas dict is defined elsewhere)
schemas_json = {k: v.json() for k, v in schemas.items()} if 'schemas' in globals() else {}
schemas_bc = sc.broadcast(schemas_json)

def process_partition(partitionJW_99_24-1740614786529.png

 

    """Process a partition of JSON files inside worker nodes using the existing Spark session."""
    # Use the global spark session (implicitly available via SparkContext on workers)
    for row in partition:
        path, file_name = row["path"], row["file_name"]
        folder_name = file_name.replace(".JSON""").replace(".json""")
        output_path = f"{output_base_path}/{folder_name}"

        # Retrieve schema JSON from broadcast and convert to StructType
        schema_json = schemas_bc.value.get(file_name)
        schema = StructType.fromJson(schema_json) if schema_json else None

        try:
            # Infer schema only if no predefined schema exists
            if schema is None:
                inferred_df = spark.read.option("inferSchema""true").json(path, multiLine=True)
                schema = inferred_df.schema

            raw_df = spark.read.schema(schema).json(path, multiLine=True)

            # Apply the appropriate transformation function
            transform_func = transformation_mapping_bc.value.get(file_name)
            if transform_func:
                raw_df = transform_func(raw_df, path)

            # Write to Parquet with optimization
            raw_df.coalesce(32).write.mode("append").partitionBy("JobID").parquet(output_path)
            print(f"JW_99_25-1740614786526.png

 

 Processed {file_name}")
        except Exception as e:
            print(f"JW_99_26-1740614786526.png

 

 Error processing {file_name}{e}")

def process_json_files_parallel():
    """Parallel processing of JSON files."""
    json_files = list_json_files()
    if not json_files:
        print("No files to process.")
        return

    df_files = spark.createDataFrame([(path, os.path.basename(path)) for path in json_files], ["path""file_name"])

    # Repartition to match cluster cores (128) for maximum parallelism
    df_files.repartition(128).rdd.foreachPartition(process_partition)

if __name__ == "__main__":
    process_json_files_parallel()
    spark.stop()
2 REPLIES 2

haijian
New Contributor II

Heard of that SparkSession already reside in Spark Cluster which is Databricks compute resource.  Instea of initializing that object, you can utilize that variable "spark" which is already in Cluster out there.  It can be re-configured with othere settings. 

narasimha_reddy
New Contributor II

You cannot use Spark session explicitly inside Executor logic. Here you are trying mapPartitions which makes the customlogic to get executed inside the executor thread. Either you need to change whole problem approach to segregate spark variable usage to outside of executor logic Or you can use python native functions /modules inside the Executor. 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now