I've troubleshot this like 20+ times. I am aware that the current code is causing the spark session to be passed to the workers, where it should only be applied to the driver. Can someone please help me resolve this (the schema is defined earlier)?
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.sql import functions as F
from azure.storage.blob import ContainerClient
from pyspark.sql.types import *
from delta.tables import DeltaTable
import os
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, DoubleType, BooleanType, FloatType, LongType
)
# Define schemas
schemas = {
"AccountingDetail.JSON": StructType([
StructField("JobID", IntegerType(), True),
StructField("Invoices", ArrayType(StructType([
StructField("InvoiceID", IntegerType(), True),
StructField("ExternalID", StringType(), True),
StructField("InvoiceNumber", StringType(), True),
StructField("DateAdded", TimestampType(), True),
StructField("DateLastUpdate", TimestampType(), True),
StructField("Memo", StringType(), True),
StructField("DateInvoiced", TimestampType(), True),
StructField("BillTo", StringType(), True),
StructField("InvoiceClass", StringType(), True),
StructField("Amount", DoubleType(), True),
StructField("Tax", DoubleType(), True),
StructField("Source", StringType(), True),
StructField("Status", StringType(), True),
StructField("InvoiceBalance", DoubleType(), True),
StructField("InvoiceLineItems", ArrayType(StructType([
StructField("InvoiceLineItemID", IntegerType(), True),
StructField("InvoiceID", IntegerType(), True),
StructField("LineDate", TimestampType(), True),
StructField("Room", StringType(), True),
StructField("CategoryID", StringType(), True),
StructField("Details", StringType(), True),
StructField("ItemDescription", StringType(), True),
StructField("UnitQuantity", IntegerType(), True),
StructField("UnitOfMeasure", StringType(), True),
StructField("Quantity", DoubleType(), True),
StructField("LaborRate", DoubleType(), True),
StructField("MaterialRate", DoubleType(), True),
StructField("EquipmentRate", DoubleType(), True),
StructField("Adjustment", DoubleType(), True),
StructField("Overhead", DoubleType(), True),
StructField("Profit", DoubleType(), True),
StructField("Rate", DoubleType(), True),
StructField("ExtendedAmount", DoubleType(), True),
StructField("AddedDate", TimestampType(), True),
StructField("UpdatedDate", TimestampType(), True),
StructField("TimeModified", TimestampType(), True),
StructField("Status", StringType(), True)
])), True),
StructField("AdditionalInfoAmount", DoubleType(), True),
StructField("InvoiceType", StringType(), True)
])), True),
StructField("Payments", ArrayType(StructType([
StructField("PaymentID", IntegerType(), True),
StructField("InvoiceID", IntegerType(), True),
StructField("DateAdded", TimestampType(), True),
StructField("DateLastUpdate", TimestampType(), True),
StructField("Mode", StringType(), True),
StructField("DatePaid", TimestampType(), True),
StructField("ReferenceNumber", StringType(), True),
StructField("Amount", DoubleType(), True),
StructField("Memo", StringType(), True),
StructField("Source", StringType(), True),
StructField("IsPaid", BooleanType(), True),
StructField("Status", StringType(), True),
StructField("DiscountAmount", DoubleType(), True)
])), True),
StructField("JobCostings", ArrayType(StructType([
StructField("JobCostingID", IntegerType(), True),
StructField("Date", TimestampType(), True),
StructField("LaborHours", DoubleType(), True),
StructField("JobCostTypeCategory", StringType(), True),
StructField("Quantity", DoubleType(), True),
StructField("UOM", StringType(), True),
StructField("Rate", DoubleType(), True),
StructField("Extended", DoubleType(), True),
StructField("Billable", BooleanType(), True),
StructField("PaymentTo", StringType(), True),
StructField("Description", StringType(), True),
StructField("TxnType", StringType(), True),
StructField("TimeModified", TimestampType(), True),
StructField("TransferFrom", StringType(), True),
StructField("Status", StringType(), True),
StructField("AddedBy", StringType(), True),
StructField("PO", StringType(), True),
StructField("Item", StringType(), True),
StructField("Service", StringType(), True),
StructField("Memo", StringType(), True),
StructField("BillTxnId", StringType(), True),
StructField("CalculatedBurden", DoubleType(), True),
StructField("LinkedVendorPayments", ArrayType(StructType([
StructField("VendorPaymentID", LongType(), False),
StructField("AmountPaid", DoubleType(), True),
StructField("DatePaid", StringType(), True),
StructField("CheckNumber", StringType(), True)
])), False)
])), True)
]),
"AccountingSummary.JSON": StructType([
StructField("IsForcedSync", BooleanType(), False),
StructField("MD5HashAsHexString", StringType(), True),
StructField("ActualGrossProfit", DoubleType(), True),
StructField("ActualGrossProfitPercentage", DoubleType(), True),
StructField("AdjustedInvoiceSubtotal", DoubleType(), True),
StructField("BalanceOwing", DoubleType(), True),
StructField("ChangeOrderAmount", DoubleType(), True),
StructField("CollectedSubtotal", DoubleType(), True),
StructField("ConsumablesCost", DoubleType(), True),
StructField("EquipmentCost", DoubleType(), True),
StructField("EstimateGrossProfitAmountAfterWOAdjustment", DoubleType(), True),
StructField("EstimateGrossProfitPercentageAfterWOAdjustment", DoubleType(), True),
StructField("EstimatedGrossProfitAmountFromEstimateImport", DoubleType(), True),
StructField("EstimatedGrossProfitPercentageFromEstimateImport", DoubleType(), True),
StructField("EstimateGrossProfit", DoubleType(), True),
StructField("EstimateUninvoicedAmount", DoubleType(), True),
StructField("EstimateUnpaid", DoubleType(), True),
StructField("GrossProfitPercentage", DoubleType(), True),
StructField("InitialEstimate", DoubleType(), True),
StructField("InvoiceSubtotal", DoubleType(), True),
StructField("TotalInvoiced", DoubleType(), True),
StructField("LaborCost", DoubleType(), True),
StructField("MaterialsCost", DoubleType(), True),
StructField("OriginalEstimate", DoubleType(), True),
StructField("OtherCost", DoubleType(), True),
StructField("ProfessionalFee", DoubleType(), True),
StructField("RecognizedRevenue", DoubleType(), True),
StructField("ReferralFeeCost", DoubleType(), True),
StructField("SubtradeCost", DoubleType(), True),
StructField("SupplementEstimate", DoubleType(), True),
StructField("TotalCollected", DoubleType(), True),
StructField("TotalEstimates", DoubleType(), True),
StructField("TotalJobCost", DoubleType(), True),
StructField("TotalWorkOrderBudget", DoubleType(), True),
StructField("WarrantyCost", DoubleType(), True),
StructField("EstimateDepreciation", DoubleType(), True),
StructField("JobId", LongType(), False)
]),
"CompanyContacts.JSON": StructType([
StructField("CompanyID", IntegerType(), True),
StructField("Contacts", ArrayType(
StructType([
StructField("ContactID", IntegerType(), True),
StructField("CorrespondenceEmail", StringType(), True),
StructField("InquiryEmail", StringType(), True),
StructField("Website", StringType(), True),
# Addresses
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("BillingAddress", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("MailingAddress", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), True),
# Main Phone
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True),
# Other Phones
StructField("OtherPhones", ArrayType(
StructType([
StructField("Category", StringType(), True),
StructField("PhoneNumbers", ArrayType(
StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
])
), True)
])
), True)
])
), True)
]),
"IndividualContacts.JSON": StructType([
StructField("IndividualID", IntegerType(), True),
StructField("Contacts", ArrayType(StructType([
StructField("ContactID", IntegerType(), True),
StructField("Email", StringType(), True),
StructField("SecondaryEmail", StringType(), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True), # Nullable struct
StructField("OtherPhones", ArrayType(StructType([
StructField("Category", StringType(), True),
StructField("PhoneNumbers", ArrayType(StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True), True)
])), False),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), False)
])), False)
]),
"IndividualDetail.JSON" : StructType([
StructField("IndividualID", IntegerType(), True),
StructField("IsActive", BooleanType(), True),
StructField("IndividualDetailInfo", StructType([
StructField("Anniversary", StringType(), True),
StructField("Assistant", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("Categories", ArrayType(StringType(), True), True),
StructField("CompanyID", IntegerType(), True),
StructField("ContactType", StringType(), True),
StructField("DateOfBirth", StringType(), True),
StructField("FranchiseeId", IntegerType(), True),
StructField("GroupAndRoutes", StringType(), True),
StructField("JobTitle", StringType(), True),
StructField("MarketingCampaigns", ArrayType(StringType(), True), True),
StructField("Name", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("ParentCompany", StringType(), True),
StructField("PotentialReferralRevenue", StringType(), True),
StructField("PotentialReferralVolume", StringType(), True),
StructField("Rank", StringType(), True),
StructField("ReferralType", StringType(), True),
StructField("ReferredBy", StringType(), True),
StructField("ResponsibleRep", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("SageAccountNumber", StringType(), True),
StructField("SalesStage", StringType(), True),
StructField("SalesStatus", StringType(), True),
StructField("Title", StringType(), True)
]), True)
]),
"LocationScores.JSON" : StructType([
StructField("LocationID", LongType(), False),
StructField("CloseRatio", DoubleType(), True),
StructField("NpsScore", IntegerType(), True),
StructField("Vpasses", ArrayType(StructType([
StructField("Score", IntegerType(), False),
StructField("IsAdminScore", BooleanType(), False)
])), False)
]),
"ProviderScores.JSON" : StructType([
StructField("ProviderID", IntegerType(), False),
StructField("Vpasses", ArrayType(StructType([
StructField("Score", IntegerType(), True),
StructField("IsAdminScore", BooleanType(), True)
])), False),
StructField("NpsScore", IntegerType(), True)
]),
"JobDetail.JSON" : StructType([
StructField("JobID", IntegerType(), True),
StructField("FranchiseeInfo", StructType([
StructField("FranchiseeID", IntegerType(), True),
StructField("DashCompanyID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("ExternalEnterpriseID", StringType(), True)
]), True),
StructField("ClaimInfo", StructType([
StructField("ClaimID", IntegerType(), True),
StructField("ReferenceID", StringType(), True),
StructField("TypeOfLoss", StringType(), True),
StructField("LossCategory", StringType(), True),
StructField("ProviderLossCategory", StringType(), True),
StructField("ClientName", StringType(), True),
StructField("DateOfLoss", StringType(), True),
StructField("DateReceived", StringType(), True),
StructField("CatReferenceNumber", StringType(), True),
StructField("PolicyLimits", StructType([
StructField("DwellingAmount", DoubleType(), True),
StructField("ContentsAmount", DoubleType(), True),
StructField("OtherStructuresAmount", DoubleType(), True)
]), True),
StructField("ClientID", IntegerType(), True),
StructField("Caller", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("ClaimEnteredBy", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("PreferredProviderName", StringType(), True),
StructField("PreferredProviderLocationName", StringType(), True),
StructField("IsProviderCreatedClaim", BooleanType(), True),
StructField("ReportedBy", StringType(), True),
StructField("CodeRedID", StringType(), True),
StructField("ClaimCustomer", StructType([
StructField("ClaimCustomerType", IntegerType(), True),
StructField("BillingAddress", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True)
]), True),
StructField("CustomerName", StringType(), True),
StructField("IndividualDetails", StructType([
StructField("IndividualID", IntegerType(), True),
StructField("ContactsDetails", ArrayType(StructType([
StructField("Email", StringType(), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True)
])), True),
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True)
]), True),
StructField("CompanyDetails", StructType([
StructField("CompanyName", StringType(), True),
StructField("ContactsDetails", ArrayType(StructType([
StructField("Email", StringType(), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True)
])), True)
]), True)
]), True)
]), True),
StructField("JobInfo", StructType([
StructField("PolicyHolderType", StringType(), True),
StructField("Division", StringType(), True),
StructField("ProviderReasonForClosing", StringType(), True),
StructField("IsAdminJob", BooleanType(), True),
StructField("DateAdded", StringType(), True),
StructField("DateLastUpdate", StringType(), True),
StructField("JobNumber", StringType(), True),
StructField("JobName", StringType(), True),
StructField("Status", StringType(), True),
StructField("IsClosed", BooleanType(), True),
StructField("IsOnHold", BooleanType(), True),
StructField("EnvironmentalCode", StringType(), True),
StructField("ReceivedByFullName", StringType(), True),
StructField("Priority", StringType(), True),
StructField("LossDescription", StringType(), True),
StructField("SpecialInstructions", StringType(), True),
StructField("InitialFindings", StringType(), True),
StructField("RoomsAffected", ArrayType(StringType()), True),
StructField("AssociatedJobID", IntegerType(), True),
StructField("ProviderCatastropheName", StringType(), True),
StructField("ProviderDivisionType", StringType(), True),
StructField("EnvironmentalCodeDescription", StringType(), True),
StructField("JobCompletionPercentage", DoubleType(), True),
StructField("MasterBuilderNumber", IntegerType(), True),
StructField("ReferralFeeDatePaid", StringType(), True),
StructField("RegionalAreaManager", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("WaterJobCat", StringType(), True),
StructField("WaterJobClass", StringType(), True),
StructField("JobSource", StringType(), True),
StructField("LocationID", IntegerType(), True)
]), True),
StructField("AdminInfo", StructType([
StructField("AdminJobNumber", StringType(), True),
StructField("AdminDivision", StringType(), True),
StructField("AdminReasonForClosing", StringType(), True),
StructField("Program", StructType([
StructField("ProgramID", IntegerType(), True),
StructField("ProgramName", StringType(), True),
StructField("NationalAccountDirector", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True)
]), True),
StructField("ReasonForClosingCategoryName", StringType(), True),
StructField("IsEmergencyServiceRequired", BooleanType(), True),
StructField("LicenseNumber", StringType(), True),
StructField("AdminInsuranceCarrier", StringType(), True),
StructField("AdminInsuranceCarrierID", IntegerType(), True),
StructField("AdminJobStatus", StringType(), True),
StructField("JobLeadID", StringType(), True),
StructField("Services", ArrayType(StringType()), True)
]), True),
StructField("PolicyClaimInformation", StructType([
StructField("ClaimNumber", StringType(), True),
StructField("PolicyNumber", StringType(), True),
StructField("DatePolicyStart", StringType(), True),
StructField("DatePolicyExpiration", StringType(), True),
StructField("YearBuilt", StringType(), True),
StructField("ExternalFileNumber", StringType(), True),
StructField("LossType", StringType(), True),
StructField("SecondaryLossType", StringType(), True),
StructField("ReportedBy", StringType(), True),
StructField("ReferredByFullName", StringType(), True),
StructField("SourceOfLoss", StringType(), True),
StructField("JobSize", StringType(), True),
StructField("LossCategory", StringType(), True),
StructField("ProviderLossCategory", StringType(), True)
]), True),
StructField("PaymentServices", StructType([
StructField("DeductibleAmount", DoubleType(), True),
StructField("CollectWhen", StringType(), True),
StructField("DateLienRights", StringType(), True),
StructField("DateLienLiened", StringType(), True),
StructField("DateLienReleased", StringType(), True),
StructField("MbJobNumber", StringType(), True)
]), True),
StructField("LossContactInfo", StructType([
StructField("ContactPerson", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True)
]), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True)
]), True),
StructField("Customer", StructType([
StructField("JobPolicyHolderType", IntegerType(), True),
StructField("IndividualDetails", StructType([
StructField("IndividualID", IntegerType(), True),
StructField("Title", StringType(), True),
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True)
]), True),
StructField("CompanyDetails", StructType([
StructField("CompanyName", StringType(), True),
StructField("ContactsDetails", ArrayType(StructType([
StructField("Email", StringType(), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True)
])), True)
]), True)
]), True),
StructField("JobReferralDetails", StructType([
StructField("ReferralCategory", IntegerType(), True),
StructField("ReferralType", StringType(), True),
StructField("SourceIndividual", StructType([
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True)
]), True),
StructField("SourceCompany", StructType([
StructField("CompanyName", StringType(), True)
]), True),
StructField("SourceEmployee", StructType([
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True)
]), True),
StructField("SourceMarketingCampaign", StructType([
StructField("CampaignName", StringType(), True)
]), True),
StructField("SalesStage", StringType(), True),
StructField("SalesStatus", StringType(), True)
]), True)
]),
"CompanyDetail.JSON": StructType([
StructField("CompanyID", LongType(), False),
StructField("IsActive", BooleanType(), False),
StructField("CompanyInfo", StructType([
StructField("Categories", ArrayType(StringType()), True),
StructField("FranchiseeID", LongType(), True),
StructField("GroupAndRoutes", StringType(), True),
StructField("MarketingCampaigns", ArrayType(StringType()), True),
StructField("Name", StringType(), True),
StructField("ParentCompanyID", LongType(), True),
StructField("Rank", StringType(), True),
StructField("ReferralType", StringType(), True),
StructField("ReferredBy", StringType(), True),
StructField("ResponsibleRep", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("SageAccountNumber", StringType(), True),
StructField("SalesStage", StringType(), True),
StructField("SalesStatus", StringType(), True),
StructField("Type", StringType(), True)
]), True)
]),
"EmployeeDetail.JSON": StructType([
StructField("IsActive", BooleanType(), True),
StructField("EmployeeInfo", StructType([
StructField("FranchiseeID", IntegerType(), True),
StructField("Title", StringType(), True),
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True),
StructField("DateOfBirth", StringType(), True),
StructField("LocationName", StringType(), True)
]), True),
StructField("EmployeeID", IntegerType(), True)
]),
"InternalParticipants.JSON": StructType([
StructField("JobID", LongType(), False),
StructField("Participants", ArrayType(
StructType([
StructField("Type", StringType(), True),
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), False),
StructField("ID", LongType(), False)
])
), False)
]),
"JobExternalParticipants.JSON": StructType([
StructField("JobID", IntegerType(), False),
StructField("CompanyRelations", ArrayType(StructType([
StructField("RelationshipType", IntegerType(), False),
StructField("RelationshipTypeName", StringType(), True),
StructField("CompanyDetails", StructType([
StructField("CompanyID", IntegerType(), True),
StructField("CompanyName", StringType(), True),
StructField("MarketingRank", StringType(), True),
StructField("ContactsDetails", ArrayType(StructType([
StructField("Email", StringType(), True),
StructField("Website", StringType(), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True),
StructField("County", StringType(), True)
]), False),
StructField("BillingAddress", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("MailingAddress", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True),
StructField("OtherPhones", ArrayType(StructType([
StructField("Category", StringType(), True),
StructField("PhoneNumbers", ArrayType(StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
])), True)
])), True)
])), False)
]), False)
])), False),
StructField("IndividualRelations", ArrayType(StructType([
StructField("RelationshipType", IntegerType(), True),
StructField("RelationshipTypeName", StringType(), True),
StructField("IndividualDetails", StructType([
StructField("IndividualID", IntegerType(), True),
StructField("Title", StringType(), True),
StructField("CompanyName", StringType(), True),
StructField("PreferredCommunicationMethod", StringType(), True),
StructField("MarketingRank", StringType(), True),
StructField("ContactsDetails", ArrayType(StructType([
StructField("Email", StringType(), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("Country", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True),
StructField("OtherPhones", ArrayType(StructType([
StructField("Category", StringType(), True),
StructField("PhoneNumbers", ArrayType(StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
])), True)
])), True)
])), False),
StructField("PersonName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), False)
]), False)
])), False),
StructField("BillToType", IntegerType(), True)
]),
"JobDates.JSON" : StructType([
StructField("JobID", LongType(), False),
StructField("Dates", ArrayType(
StructType([
StructField("DateTypeID", IntegerType(), False),
StructField("DateName", StringType(), True),
StructField("Value", TimestampType(), False),
StructField("AuditDetails", StructType([
StructField("EnteredBy", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("WhenEntered", TimestampType(), True)
]), True)
])
), False)
]),
"LinkedAssignmentDetails.JSON": StructType([
StructField("JobID", LongType(), False),
StructField("LinkedExternalAssignments", ArrayType(
StructType([
StructField("ID", IntegerType(), False),
StructField("ExternalSystemType", IntegerType(), False),
StructField("ExternalID", StringType(), True)
])
), False)
]),
"NotesSummary.JSON": StructType([
StructField("JobID", LongType(), False),
StructField("LastNoteAdded", StringType(), True),
StructField("LastNoteAddedTime", StringType(), True)
]),
"SurveySummary.JSON": StructType([
StructField("JobID", LongType(), False),
StructField("MostRecentSurveyDate", StringType(), True),
StructField("SurveyDetails", ArrayType(
StructType([
StructField("SurveyScore", StringType(), True),
StructField("SurveyStatus", IntegerType(), True),
StructField("IsIncludedForSurvey", BooleanType(), False),
StructField("IsSendTriggerCompleted", BooleanType(), False),
StructField("SurveyScoreValue", IntegerType(), True),
StructField("SurveyScoreLabel", StringType(), True),
StructField("NoSurveyReason", StringType(), True)
])
), False)
]),
"EmployeeContacts.JSON": StructType([
StructField("EmployeeID", IntegerType(), True),
StructField("Contacts", ArrayType(
StructType([
StructField("ContactID", IntegerType(), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("Phones", ArrayType(StringType()), True),
StructField("Email", StringType(), True)
])
), True)
]),
"LocationDetail.JSON": StructType([
StructField("LocationID", LongType(), False),
StructField("ProviderID", LongType(), True),
StructField("LocationType", StringType(), True),
StructField("Name", StringType(), True),
StructField("UFOC", StringType(), True),
StructField("ExternalEnterpriseLocationID", LongType(), True),
StructField("MainFax", StringType(), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), False),
StructField("MainPhone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True),
StructField("EdiAddresses", ArrayType(StructType([
StructField("AddressType", StringType(), True),
StructField("AddressValue", StringType(), True)
])), False),
StructField("PrimaryContact", StructType([
StructField("Name", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("Email", StringType(), True),
StructField("Phone", StructType([
StructField("Number", StringType(), True),
StructField("Extension", StringType(), True)
]), True)
]), False),
StructField("BusinessDevelopmentManagerName", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), False)
]),
"ProviderDetail.JSON": StructType([
StructField("ProviderID", IntegerType(), False),
StructField("FranchisorID", IntegerType(), True),
StructField("Name", StringType(), True),
StructField("CompanyID", StringType(), True),
StructField("EnterpriseID", StringType(), True),
StructField("Type", StringType(), True),
StructField("Address", StructType([
StructField("AddressLine1", StringType(), True),
StructField("AddressLine2", StringType(), True),
StructField("City", StringType(), True),
StructField("StateProvince", StringType(), True),
StructField("Country", StringType(), True),
StructField("PostalCode", StringType(), True),
StructField("County", StringType(), True)
]), True),
StructField("PrimaryEmail", StringType(), True),
StructField("Website", StringType(), True),
StructField("IsActive", BooleanType(), True)
]),
"ProviderSuspensions.JSON" : StructType([
StructField("ProviderID", IntegerType(), False),
StructField("Suspensions", ArrayType(StructType([
StructField("SuspensionID", IntegerType(), False),
StructField("LocationID", IntegerType(), True),
StructField("StartDate", StringType(), True),
StructField("LiftDate", StringType(), True),
StructField("Description", StringType(), True),
StructField("Status", StringType(), True),
StructField("IsValid", BooleanType(), False),
StructField("Level", StringType(), True),
StructField("Reason", StringType(), True),
StructField("LiftMethod", StringType(), True),
StructField("SuspensionType", StringType(), True),
StructField("Visibility", StringType(), True),
StructField("AffectedDivisions", ArrayType(StructType([
StructField("Name", StringType(), True)
])), True)
])), True)
])
}
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from azure.storage.blob import ContainerClient
import os
# Function to create or get Spark session with optimized configurations
def create_spark_session():
return SparkSession.builder \
.appName("Highly Optimized JSON Pipeline") \
.config("spark.sql.catalogImplementation", "hive") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.files.maxPartitionBytes", 128 * 1024 * 1024) \
.config("spark.sql.files.minPartitionNum", "256") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "4") \
.config("spark.dynamicAllocation.maxExecutors", "16") \
.config("spark.executor.cores", "32") \
.config("spark.executor.memory", "200g") \
.config("spark.driver.memory", "64g") \
.config("spark.driver.cores", "8") \
.config("spark.sql.parquet.compression.codec", "zstd") \
.config("spark.memory.fraction", "0.9") \
.config("spark.memory.storageFraction", "0.3") \
.config("spark.sql.shuffle.partitions", "256") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("fs.azure.account.key.amrdatalake01.blob.core.windows.net", "e3dTBwzl+nu4xTxqsBj+WgWOy5fa2ve7SICaBjVOmrLOw5fYshIZk+FSoz8IoM87GttRSYyL5yJM+AStFYUMGw==") \
.getOrCreate()
# Initialize Spark session globally (only on driver)
spark = create_spark_session()
sc = spark.sparkContext
# Define Storage Account Details
storage_account_name = "secret##"
container_name = "secret##"
# Define base paths using abfss directly
output_base_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/dash.test/output"
def list_json_files():
"""List JSON files from Azure Blob Storage with robust authentication handling."""
try:
container_client = ContainerClient(
account_url=f"https://{storage_account_name}.blob.core.windows.net",
container_name=container_name,
credential="secret##"
)
container_client.get_container_properties()
print(f"Successfully authenticated with container {container_name}")
all_blobs = list(container_client.list_blobs())
print(f"Total blobs found in container: {len(all_blobs)}")
print(f"Sample blob names: {', '.join(blob.name for blob in all_blobs[:5]) if all_blobs else 'None'}")
json_files = [f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{blob.name}" for blob in all_blobs if blob.name.endswith('.JSON')]
print(f"Total JSON Files Found (.JSON): {len(json_files)}")
print(f"Sample JSON File Paths: {json_files[:5] if json_files else 'No JSON files found'}")
return json_files
except Exception as e:
print(f"Error listing files: {e}")
return []
# Helper functions for DataFrame processing
def flatten_structs(df
struct_fields = [field.name for field in df.schema.fields if isinstance(field.dataType, StructType)]
while struct_fields:
for field in struct_fields:
empty_struct = spark.createDataFrame([], df.schema[field].dataType).first().asDict()
empty_struct = {k: None for k in empty_struct}
df = df.withColumn(field, when(col(field).isNotNull(), col(field)).otherwise(struct(**empty_struct)))
subfields = df.schema[field].dataType.fields
for subfield in subfields:
new_col_name = f"{field}_{subfield.name}"
df = df.withColumn(new_col_name, col(f"{field}.{subfield.name}"))
df = df.drop(field)
struct_fields = [field.name for field in df.schema.fields if isinstance(field.dataType, StructType)]
return df
def process_array_columns(df, explode_columns
array_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, ArrayType)]
for col_name in explode_columns:
if col_name in array_columns:
df = df.withColumn(col_name, when(col(col_name).isNull(), array()).otherwise(col(col_name)))
df = df.withColumn(col_name, explode_outer(col(col_name)))
return df
def ensure_all_columns(df, expected_columns
current_columns = df.columns
for col_name, col_type in expected_columns.items():
if col_name not in current_columns:
df = df.withColumn(col_name, lit(None).cast(col_type))
return df
def null_out_problematic_columns(df
problematic_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (StructType, ArrayType))]
for col_name in problematic_columns:
df = df.withColumn(col_name, lit(None).cast(StringType()))
return df
def infer_and_merge_schema(file_path, predefined_schema=None
try:
inferred_schema = spark.read.json(file_path, multiLine=True).schema
if predefined_schema:
for field in inferred_schema.fields:
if field.name not in [f.name for f in predefined_schema.fields]:
predefined_schema.add(field)
return predefined_schema
return inferred_schema
except Exception as e:
print(f"Error inferring schema for {file_path}: {e}")
return predefined_schema
# Flattening functions for DataFrame processing
def flatten_accounting_detail(df, file_path
df = df.withColumn("JobID", col("JobID").cast(IntegerType()))
df = df.withColumn("Invoice", explode_outer(col("Invoices")))
df = df.withColumn("InvoiceID", col("Invoice.InvoiceID").cast(IntegerType())) \
.withColumn("ExternalID", col("Invoice.ExternalID")) \
.withColumn("InvoiceNumber", col("Invoice.InvoiceNumber")) \
.withColumn("InvoiceDateAdded", col("Invoice.DateAdded").cast(TimestampType())) \
.withColumn("InvoiceDateLastUpdate", col("Invoice.DateLastUpdate").cast(TimestampType())) \
.withColumn("InvoiceMemo", col("Invoice.Memo")) \
.withColumn("InvoiceDateInvoiced", col("Invoice.DateInvoiced").cast(TimestampType())) \
.withColumn("BillTo", col("Invoice.BillTo")) \
.withColumn("InvoiceClass", col("Invoice.InvoiceClass")) \
.withColumn("InvoiceAmount", col("Invoice.Amount").cast(DoubleType())) \
.withColumn("Tax", col("Invoice.Tax").cast(DoubleType())) \
.withColumn("Source", col("Invoice.Source")) \
.withColumn("InvoiceStatus", col("Invoice.Status")) \
.withColumn("InvoiceBalance", col("Invoice.InvoiceBalance").cast(DoubleType())) \
.withColumn("AdditionalInfoAmount", col("Invoice.AdditionalInfoAmount").cast(DoubleType())) \
.withColumn("InvoiceType", col("Invoice.InvoiceType"))
df = df.withColumn("InvoiceLineItem", explode_outer(col("Invoice.InvoiceLineItems")))
df = df.withColumn("InvoiceLineItemID", col("InvoiceLineItem.InvoiceLineItemID").cast(IntegerType())) \
.withColumn("InvoiceLineItemDate", col("InvoiceLineItem.LineDate").cast(TimestampType())) \
.withColumn("Room", col("InvoiceLineItem.Room")) \
.withColumn("CategoryID", col("InvoiceLineItem.CategoryID").cast(StringType())) \
.withColumn("Details", col("InvoiceLineItem.Details")) \
.withColumn("ItemDescription", col("InvoiceLineItem.ItemDescription")) \
.withColumn("UnitQuantity", col("InvoiceLineItem.UnitQuantity").cast(IntegerType())) \
.withColumn("UnitOfMeasure", col("InvoiceLineItem.UnitOfMeasure")) \
.withColumn("Quantity", col("InvoiceLineItem.Quantity").cast(DoubleType())) \
.withColumn("LaborRate", col("InvoiceLineItem.LaborRate").cast(DoubleType())) \
.withColumn("MaterialRate", col("InvoiceLineItem.MaterialRate").cast(DoubleType())) \
.withColumn("EquipmentRate", col("InvoiceLineItem.EquipmentRate").cast(DoubleType())) \
.withColumn("Adjustment", col("InvoiceLineItem.Adjustment").cast(DoubleType())) \
.withColumn("Overhead", col("InvoiceLineItem.Overhead").cast(DoubleType())) \
.withColumn("Profit", col("InvoiceLineItem.Profit").cast(DoubleType())) \
.withColumn("InvoiceLineItemRate", col("InvoiceLineItem.Rate").cast(DoubleType())) \
.withColumn("InvoiceLineItemExtendedAmount", col("InvoiceLineItem.ExtendedAmount").cast(DoubleType())) \
.withColumn("InvoiceLineItemAddedDate", col("InvoiceLineItem.AddedDate").cast(TimestampType())) \
.withColumn("InvoiceLineItemUpdatedDate", col("InvoiceLineItem.UpdatedDate").cast(TimestampType())) \
.withColumn("InvoiceLineItemTimeModified", col("InvoiceLineItem.TimeModified").cast(TimestampType())) \
.withColumn("InvoiceLineItemStatus", col("InvoiceLineItem.Status"))
df = df.withColumn("Payment", explode_outer(col("Payments")))
df = df.withColumn("PaymentID", col("Payment.PaymentID").cast(IntegerType())) \
.withColumn("PaymentInvoiceID", col("Payment.InvoiceID").cast(IntegerType())) \
.withColumn("PaymentDateAdded", col("Payment.DateAdded").cast(TimestampType())) \
.withColumn("PaymentDateLastUpdate", col("Payment.DateLastUpdate").cast(TimestampType())) \
.withColumn("Mode", col("Payment.Mode")) \
.withColumn("DatePaid", col("Payment.DatePaid").cast(TimestampType())) \
.withColumn("ReferenceNumber", col("Payment.ReferenceNumber")) \
.withColumn("PaymentAmount", col("Payment.Amount").cast(DoubleType())) \
.withColumn("PaymentMemo", col("Payment.Memo")) \
.withColumn("PaymentSource", col("Payment.Source")) \
.withColumn("IsPaid", col("Payment.IsPaid").cast(BooleanType())) \
.withColumn("PaymentStatus", col("Payment.Status")) \
.withColumn("PaymentDiscountAmount", col("Payment.DiscountAmount").cast(DoubleType()))
df = df.withColumn("JobCosting", explode_outer(col("JobCostings")))
df = df.withColumn("JobCostingID", col("JobCosting.JobCostingID").cast(IntegerType())) \
.withColumn("JobCostingDate", col("JobCosting.Date").cast(TimestampType())) \
.withColumn("LaborHours", col("JobCosting.LaborHours").cast(DoubleType())) \
.withColumn("JobCostTypeCategory", col("JobCosting.JobCostTypeCategory")) \
.withColumn("JobCostingQuantity", col("JobCosting.Quantity").cast(DoubleType())) \
.withColumn("UOM", col("JobCosting.UOM")) \
.withColumn("JobCostingRate", col("JobCosting.Rate").cast(DoubleType())) \
.withColumn("Extended", col("JobCosting.Extended").cast(DoubleType())) \
.withColumn("Billable", col("JobCosting.Billable").cast(BooleanType())) \
.withColumn("PaymentTo", col("JobCosting.PaymentTo")) \
.withColumn("Description", col("JobCosting.Description")) \
.withColumn("TxnType", col("JobCosting.TxnType")) \
.withColumn("JobCostingStatus", col("JobCosting.Status")) \
.withColumn("TransferFrom", col("JobCosting.TransferFrom")) \
.withColumn("AddedBy", col("JobCosting.AddedBy")) \
.withColumn("PO", col("JobCosting.PO")) \
.withColumn("Item", col("JobCosting.Item")) \
.withColumn("Service", col("JobCosting.Service")) \
.withColumn("Memo", col("JobCosting.Memo")) \
.withColumn("BillTxnId", col("JobCosting.BillTxnId")) \
.withColumn("CalculatedBurden", col("JobCosting.CalculatedBurden").cast(DoubleType()))
df = df.withColumn("LinkedVendorPayment", explode_outer(col("JobCosting.LinkedVendorPayments")))
df = df.withColumn("VendorPaymentID", col("LinkedVendorPayment.VendorPaymentID").cast(LongType())) \
.withColumn("VendorPaymentAmountPaid", col("LinkedVendorPayment.AmountPaid").cast(DoubleType())) \
.withColumn("VendorPaymentDatePaid", col("LinkedVendorPayment.DatePaid")) \
.withColumn("CheckNumber", col("LinkedVendorPayment.CheckNumber"))
df = df.drop("Invoices", "Payments", "JobCostings", "Invoice", "InvoiceLineItem", "Payment", "JobCosting", "LinkedVendorPayment")
return df
def flatten_accounting_summary(df, file_path
if "JobId" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(LongType()))
return df.select(
col("JobId").alias("JobID"),
col("IsForcedSync").alias("IsForcedSync"),
col("MD5HashAsHexString").alias("MD5HashAsHexString"),
col("ActualGrossProfit").alias("ActualGrossProfit"),
col("ActualGrossProfitPercentage").alias("ActualGrossProfitPercentage"),
col("AdjustedInvoiceSubtotal").alias("AdjustedInvoiceSubtotal"),
col("BalanceOwing").alias("BalanceOwing"),
col("ChangeOrderAmount").alias("ChangeOrderAmount"),
col("CollectedSubtotal").alias("CollectedSubtotal"),
col("ConsumablesCost").alias("ConsumablesCost"),
col("EquipmentCost").alias("EquipmentCost"),
col("EstimateGrossProfitAmountAfterWOAdjustment").alias("EstimateGrossProfitAmountAfterWOAdjustment"),
col("EstimateGrossProfitPercentageAfterWOAdjustment").alias("EstimateGrossProfitPercentageAfterWOAdjustment"),
col("EstimatedGrossProfitAmountFromEstimateImport").alias("EstimatedGrossProfitAmountFromEstimateImport"),
col("EstimatedGrossProfitPercentageFromEstimateImport").alias("EstimatedGrossProfitPercentageFromEstimateImport"),
col("EstimateGrossProfit").alias("EstimateGrossProfit"),
col("EstimateUninvoicedAmount").alias("EstimateUninvoicedAmount"),
col("EstimateUnpaid").alias("EstimateUnpaid"),
col("GrossProfitPercentage").alias("GrossProfitPercentage"),
col("InitialEstimate").alias("InitialEstimate"),
col("InvoiceSubtotal").alias("InvoiceSubtotal"),
col("TotalInvoiced").alias("TotalInvoiced"),
col("LaborCost").alias("LaborCost"),
col("MaterialsCost").alias("MaterialsCost"),
col("OriginalEstimate").alias("OriginalEstimate"),
col("OtherCost").alias("OtherCost"),
col("ProfessionalFee").alias("ProfessionalFee"),
col("RecognizedRevenue").alias("RecognizedRevenue"),
col("ReferralFeeCost").alias("ReferralFeeCost"),
col("SubtradeCost").alias("SubtradeCost"),
col("SupplementEstimate").alias("SupplementEstimate"),
col("TotalCollected").alias("TotalCollected"),
col("TotalEstimates").alias("TotalEstimates"),
col("TotalJobCost").alias("TotalJobCost"),
col("TotalWorkOrderBudget").alias("TotalWorkOrderBudget"),
col("WarrantyCost").alias("WarrantyCost"),
col("EstimateDepreciation").alias("EstimateDepreciation")
)
def flatten_internal_participants(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = df.withColumn("Participant", explode_outer(col("Participants")))
return df.select(
col("JobID").alias("JobID"),
col("Participant.ID").cast(LongType()).alias("ParticipantID"),
col("Participant.Type").alias("ParticipantType"),
col("Participant.PersonName.FirstName").alias("FirstName"),
col("Participant.PersonName.LastName").alias("LastName")
)
def flatten_job_dates(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = df.withColumn("Dates", when(col("Dates").isNotNull(), col("Dates")).otherwise(array()))
df = df.withColumn("Date", explode_outer(col("Dates")))
df = df.withColumn("Date_AuditDetails",
when(col("Date.AuditDetails").isNotNull(), col("Date.AuditDetails"))
.otherwise(struct(lit(None).alias("EnteredBy"), lit(None).alias("WhenEntered"))))
df = df.withColumn("Date_AuditDetails_EnteredBy",
when(col("Date_AuditDetails.EnteredBy").isNotNull(), col("Date_AuditDetails.EnteredBy"))
.otherwise(struct(lit(None).alias("FirstName"), lit(None).alias("LastName"))))
return df.select(
col("JobID").alias("JobID"),
col("Date.DateTypeID").cast(IntegerType()).alias("DateTypeID"),
col("Date.DateName").alias("DateName"),
col("Date.Value").cast(TimestampType()).alias("DateValue"),
col("Date_AuditDetails.WhenEntered").cast(TimestampType()).alias("WhenEntered"),
col("Date_AuditDetails_EnteredBy.FirstName").alias("EnteredByFirstName"),
col("Date_AuditDetails_EnteredBy.LastName").alias("EnteredByLastName")
)
def flatten_job_detail(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = flatten_structs(df)
df = process_array_columns(df, [
"Customer_IndividualDetails_ContactsDetails_Email",
"Customer_IndividualDetails_ContactsDetails_Address_AddressLine1",
"Customer_IndividualDetails_ContactsDetails_Address_AddressLine2",
"Customer_IndividualDetails_ContactsDetails_Address_City",
"Customer_IndividualDetails_ContactsDetails_Address_StateProvince",
"Customer_IndividualDetails_ContactsDetails_Address_PostalCode",
"Customer_IndividualDetails_ContactsDetails_Address_Country",
"Customer_IndividualDetails_ContactsDetails_MainPhone_Number",
"Customer_IndividualDetails_ContactsDetails_MainPhone_Extension",
"Customer_IndividualDetails_ContactsDetails_OtherPhones_Category",
"Customer_IndividualDetails_ContactsDetails_OtherPhones_PhoneNumbers_Number",
"Customer_IndividualDetails_ContactsDetails_OtherPhones_PhoneNumbers_Extension",
"Customer_CompanyDetails_ContactsDetails_Email",
"Customer_CompanyDetails_ContactsDetails_Address_AddressLine1",
"Customer_CompanyDetails_ContactsDetails_Address_AddressLine2",
"Customer_CompanyDetails_ContactsDetails_Address_City",
"Customer_CompanyDetails_ContactsDetails_Address_StateProvince",
"Customer_CompanyDetails_ContactsDetails_Address_PostalCode",
"Customer_CompanyDetails_ContactsDetails_Address_Country",
"Customer_CompanyDetails_ContactsDetails_MainPhone_Number",
"Customer_CompanyDetails_ContactsDetails_MainPhone_Extension",
"Customer_CompanyDetails_ContactsDetails_OtherPhones_Category",
"Customer_CompanyDetails_ContactsDetails_OtherPhones_PhoneNumbers_Number",
"Customer_CompanyDetails_ContactsDetails_OtherPhones_PhoneNumbers_Extension",
"JobInfo_RoomsAffected",
"AdminInfo_Services",
"JobReferralDetails_SourceCompany",
"JobReferralDetails_SourceIndividual",
"JobReferralDetails_SourceEmployee",
"JobReferralDetails_SourceMarketingCampaign",
])
df = ensure_all_columns(df, df.schema.fields)
df = null_out_problematic_columns(df)
return df
def flatten_job_external_participants(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = process_array_columns(df, [
"CompanyRelations",
"CompanyRelations.CompanyDetails.ContactsDetails",
"IndividualRelations",
"IndividualRelations.IndividualDetails.ContactsDetails"
])
df = df.withColumn("CompanyRelations", explode_outer(col("CompanyRelations")))
df = df.withColumn("IndividualRelations", explode_outer(col("IndividualRelations")))
df = df.withColumn("CompanyContacts", explode_outer(col("CompanyRelations.CompanyDetails.ContactsDetails")))
df = df.withColumn("IndividualContacts", explode_outer(col("IndividualRelations.IndividualDetails.ContactsDetails")))
df = df.select(
col("JobID").alias("JobID"),
col("BillToType").alias("BillToType"),
col("CompanyRelations.RelationshipType").alias("CompanyRelationshipType"),
col("CompanyRelations.RelationshipTypeName").alias("CompanyRelationshipTypeName"),
col("CompanyRelations.CompanyDetails.CompanyID").alias("CompanyID"),
col("CompanyRelations.CompanyDetails.CompanyName").alias("CompanyName"),
col("CompanyRelations.CompanyDetails.MarketingRank").alias("CompanyMarketingRank"),
col("CompanyContacts.Email").alias("CompanyContactEmail"),
col("CompanyContacts.Website").alias("CompanyContactWebsite"),
col("CompanyContacts.Address.AddressLine1").alias("CompanyAddressLine1"),
col("CompanyContacts.Address.AddressLine2").alias("CompanyAddressLine2"),
col("CompanyContacts.Address.City").alias("CompanyAddressCity"),
col("CompanyContacts.Address.StateProvince").alias("CompanyAddressStateProvince"),
col("CompanyContacts.Address.Country").alias("CompanyAddressCountry"),
col("CompanyContacts.Address.PostalCode").alias("CompanyAddressPostalCode"),
col("CompanyContacts.Address.County").alias("CompanyAddressCounty"),
col("CompanyContacts.MainPhone.Number").alias("CompanyMainPhoneNumber"),
col("CompanyContacts.MainPhone.Extension").alias("CompanyMainPhoneExtension"),
col("IndividualRelations.RelationshipType").alias("IndividualRelationshipType"),
col("IndividualRelations.RelationshipTypeName").alias("IndividualRelationshipTypeName"),
col("IndividualRelations.IndividualDetails.IndividualID").alias("IndividualID"),
col("IndividualRelations.IndividualDetails.Title").alias("IndividualTitle"),
col("IndividualRelations.IndividualDetails.CompanyName").alias("IndividualCompanyName"),
col("IndividualRelations.IndividualDetails.PreferredCommunicationMethod").alias("IndividualPreferredCommunicationMethod"),
col("IndividualRelations.IndividualDetails.MarketingRank").alias("IndividualMarketingRank"),
col("IndividualContacts.Email").alias("IndividualContactEmail"),
col("IndividualContacts.Address.AddressLine1").alias("IndividualAddressLine1"),
col("IndividualContacts.Address.AddressLine2").alias("IndividualAddressLine2"),
col("IndividualContacts.Address.City").alias("IndividualAddressCity"),
col("IndividualContacts.Address.StateProvince").alias("IndividualAddressStateProvince"),
col("IndividualContacts.Address.Country").alias("IndividualAddressCountry"),
col("IndividualContacts.Address.PostalCode").alias("IndividualAddressPostalCode"),
col("IndividualContacts.Address.County").alias("IndividualAddressCounty"),
col("IndividualContacts.MainPhone.Number").alias("IndividualMainPhoneNumber"),
col("IndividualContacts.MainPhone.Extension").alias("IndividualMainPhoneExtension"),
col("IndividualRelations.IndividualDetails.PersonName.FirstName").alias("IndividualFirstName"),
col("IndividualRelations.IndividualDetails.PersonName.LastName").alias("IndividualLastName")
)
df = ensure_all_columns(df, {
"JobID": IntegerType(),
"BillToType": IntegerType(),
"CompanyRelationshipType": IntegerType(),
"CompanyRelationshipTypeName": StringType(),
"CompanyID": IntegerType(),
"CompanyName": StringType(),
"CompanyMarketingRank": StringType(),
"CompanyContactEmail": StringType(),
"CompanyContactWebsite": StringType(),
"CompanyAddressLine1": StringType(),
"CompanyAddressLine2": StringType(),
"CompanyAddressCity": StringType(),
"CompanyAddressStateProvince": StringType(),
"CompanyAddressCountry": StringType(),
"CompanyAddressPostalCode": StringType(),
"CompanyAddressCounty": StringType(),
"CompanyMainPhoneNumber": StringType(),
"CompanyMainPhoneExtension": StringType(),
"IndividualRelationshipType": IntegerType(),
"IndividualRelationshipTypeName": StringType(),
"IndividualID": IntegerType(),
"IndividualTitle": StringType(),
"IndividualCompanyName": StringType(),
"IndividualPreferredCommunicationMethod": StringType(),
"IndividualMarketingRank": StringType(),
"IndividualContactEmail": StringType(),
"IndividualAddressLine1": StringType(),
"IndividualAddressLine2": StringType(),
"IndividualAddressCity": StringType(),
"IndividualAddressStateProvince": StringType(),
"IndividualAddressCountry": StringType(),
"IndividualAddressPostalCode": StringType(),
"IndividualAddressCounty": StringType(),
"IndividualMainPhoneNumber": StringType(),
"IndividualMainPhoneExtension": StringType(),
"IndividualFirstName": StringType(),
"IndividualLastName": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_job_tags(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = process_array_columns(df, ["JobTags"])
df = df.withColumn("JobTag", explode_outer(col("JobTags")))
df = df.withColumn("JobTags_TagName", col("JobTag.TagName").cast(StringType())).drop("JobTag")
df = ensure_all_columns(df, {"JobID": IntegerType(), "JobTags_TagName": StringType()})
df = null_out_problematic_columns(df)
return df
def flatten_survey_summary(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = process_array_columns(df, ["SurveyDetails"])
df = df.withColumn("SurveyDetail", explode_outer(col("SurveyDetails")))
df = df.select(
col("JobID").alias("JobID"),
col("MostRecentSurveyDate").alias("MostRecentSurveyDate"),
col("SurveyDetail.SurveyScore").alias("SurveyScore"),
col("SurveyDetail.SurveyStatus").alias("SurveyStatus"),
col("SurveyDetail.IsIncludedForSurvey").alias("IsIncludedForSurvey"),
col("SurveyDetail.IsSendTriggerCompleted").alias("IsSendTriggerCompleted"),
col("SurveyDetail.SurveyScoreValue").alias("SurveyScoreValue"),
col("SurveyDetail.SurveyScoreLabel").alias("SurveyScoreLabel"),
col("SurveyDetail.NoSurveyReason").alias("NoSurveyReason")
)
df = df.withColumn("SurveyStatus",
when(col("SurveyStatus") == 0, "Yes")
.when(col("SurveyStatus") == 1, "No")
.when(col("SurveyStatus") == 2, "Sent")
.when(col("SurveyStatus") == 3, "Received")
.otherwise(lit(None)))
return df
def flatten_linked_assignments(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(LongType()))
df = df.withColumn("Assignment", explode_outer(col("LinkedExternalAssignments")))
return df.select(
col("JobID").alias("JobID"),
col("Assignment.ID").cast(IntegerType()).alias("AssignmentID"),
col("Assignment.ExternalSystemType").cast(IntegerType()).alias("ExternalSystemType"),
col("Assignment.ExternalID").alias("ExternalID")
)
def flatten_notes_summary(df, file_path
if "JobID" not in df.columns:
df = df.withColumn("JobID", lit(file_path.split("/")[-2]).cast(LongType()))
return df.select(
col("JobID").alias("JobID"),
col("LastNoteAdded").alias("LastNoteAdded"),
col("LastNoteAddedTime").alias("LastNoteAddedTime")
)
def flatten_company_contacts(df, file_path
if "CompanyID" not in df.columns:
df = df.withColumn("CompanyID", lit(file_path.split("/")[-2]).cast(LongType()))
df = df.withColumn("Contacts", when(col("Contacts").isNull(), array()).otherwise(col("Contacts")))
df = df.withColumn("Contact", explode_outer(col("Contacts")))
df = df.withColumn("ContactID", col("Contact.ContactID").cast(LongType())) \
.withColumn("CorrespondenceEmail", col("Contact.CorrespondenceEmail").cast(StringType())) \
.withColumn("InquiryEmail", col("Contact.InquiryEmail").cast(StringType())) \
.withColumn("Website", col("Contact.Website").cast(StringType())) \
.withColumn("MainPhoneNumber", col("Contact.MainPhone.Number").cast(StringType())) \
.withColumn("MainPhoneExtension", col("Contact.MainPhone.Extension").cast(StringType())) \
.withColumn("AddressLine1", col("Contact.Address.AddressLine1").cast(StringType())) \
.withColumn("AddressLine2", col("Contact.Address.AddressLine2").cast(StringType())) \
.withColumn("City", col("Contact.Address.City").cast(StringType())) \
.withColumn("StateProvince", col("Contact.Address.StateProvince").cast(StringType())) \
.withColumn("Country", col("Contact.Address.Country").cast(StringType())) \
.withColumn("PostalCode", col("Contact.Address.PostalCode").cast(StringType())) \
.withColumn("County", col("Contact.Address.County").cast(StringType())) \
.withColumn("BillingAddressLine1", col("Contact.BillingAddress.AddressLine1").cast(StringType())) \
.withColumn("BillingAddressLine2", col("Contact.BillingAddress.AddressLine2").cast(StringType())) \
.withColumn("BillingCity", col("Contact.BillingAddress.City").cast(StringType())) \
.withColumn("BillingStateProvince", col("Contact.BillingAddress.StateProvince").cast(StringType())) \
.withColumn("BillingCountry", col("Contact.BillingAddress.Country").cast(StringType())) \
.withColumn("BillingPostalCode", col("Contact.BillingAddress.PostalCode").cast(StringType())) \
.withColumn("BillingCounty", col("Contact.BillingAddress.County").cast(StringType())) \
.withColumn("MailingAddressLine1", col("Contact.MailingAddress.AddressLine1").cast(StringType())) \
.withColumn("MailingAddressLine2", col("Contact.MailingAddress.AddressLine2").cast(StringType())) \
.withColumn("MailingCity", col("Contact.MailingAddress.City").cast(StringType())) \
.withColumn("MailingStateProvince", col("Contact.MailingAddress.StateProvince").cast(StringType())) \
.withColumn("MailingCountry", col("Contact.MailingAddress.Country").cast(StringType())) \
.withColumn("MailingPostalCode", col("Contact.MailingAddress.PostalCode").cast(StringType())) \
.withColumn("MailingCounty", col("Contact.MailingAddress.County").cast(StringType())) \
.drop("Contacts", "Contact")
df = ensure_all_columns(df, {
"CompanyID": LongType(),
"ContactID": LongType(),
"CorrespondenceEmail": StringType(),
"InquiryEmail": StringType(),
"Website": StringType(),
"MainPhoneNumber": StringType(),
"MainPhoneExtension": StringType(),
"AddressLine1": StringType(),
"AddressLine2": StringType(),
"City": StringType(),
"StateProvince": StringType(),
"Country": StringType(),
"PostalCode": StringType(),
"County": StringType(),
"BillingAddressLine1": StringType(),
"BillingAddressLine2": StringType(),
"BillingCity": StringType(),
"BillingStateProvince": StringType(),
"BillingCountry": StringType(),
"BillingPostalCode": StringType(),
"BillingCounty": StringType(),
"MailingAddressLine1": StringType(),
"MailingAddressLine2": StringType(),
"MailingCity": StringType(),
"MailingStateProvince": StringType(),
"MailingCountry": StringType(),
"MailingPostalCode": StringType(),
"MailingCounty": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_company_detail(df, file_path
if "CompanyID" not in df.columns:
df = df.withColumn("CompanyID", lit(file_path.split("/")[-2]).cast(LongType()))
df = df.withColumn("IsActive", col("IsActive").cast(BooleanType()))
if isinstance([f.dataType for f in df.schema.fields if f.name == "CompanyInfo"][0], StringType):
df = df.withColumn("CompanyInfo", from_json(col("CompanyInfo"), StructType([
StructField("Categories", ArrayType(StringType()), True),
StructField("FranchiseeID", LongType(), True),
StructField("GroupAndRoutes", StringType(), True),
StructField("MarketingCampaigns", ArrayType(StringType()), True),
StructField("Name", StringType(), True),
StructField("ParentCompanyID", LongType(), True),
StructField("Rank", StringType(), True),
StructField("ReferralType", StringType(), True),
StructField("ReferredBy", StringType(), True),
StructField("ResponsibleRep", StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True)
]), True),
StructField("SageAccountNumber", StringType(), True),
StructField("SalesStage", StringType(), True),
StructField("SalesStatus", StringType(), True),
StructField("Type", StringType(), True)
])))
df = df.withColumn("CompanyInfo_ResponsibleRep_FirstName", col("CompanyInfo.ResponsibleRep.FirstName").cast(StringType())) \
.withColumn("CompanyInfo_ResponsibleRep_LastName", col("CompanyInfo.ResponsibleRep.LastName").cast(StringType())) \
.drop("CompanyInfo.ResponsibleRep")
df = df.withColumn("CompanyInfo_Categories", concat_ws(", ", col("CompanyInfo.Categories")).cast(StringType())) \
.withColumn("CompanyInfo_MarketingCampaigns", concat_ws(", ", col("CompanyInfo.MarketingCampaigns")).cast(StringType())) \
.drop("CompanyInfo.Categories", "CompanyInfo.MarketingCampaigns")
df = df.select(
col("CompanyID").alias("CompanyID"),
col("IsActive").alias("IsActive"),
col("CompanyInfo_FranchiseeID").alias("CompanyInfo_FranchiseeID"),
col("CompanyInfo_GroupAndRoutes").alias("CompanyInfo_GroupAndRoutes"),
col("CompanyInfo_Name").alias("CompanyInfo_Name"),
col("CompanyInfo_ParentCompanyID").alias("CompanyInfo_ParentCompanyID"),
col("CompanyInfo_Rank").alias("CompanyInfo_Rank"),
col("CompanyInfo_ReferralType").alias("CompanyInfo_ReferralType"),
col("CompanyInfo_ReferredBy").alias("CompanyInfo_ReferredBy"),
col("CompanyInfo_ResponsibleRep_FirstName").alias("CompanyInfo_ResponsibleRep_FirstName"),
col("CompanyInfo_ResponsibleRep_LastName").alias("CompanyInfo_ResponsibleRep_LastName"),
col("CompanyInfo_SageAccountNumber").alias("CompanyInfo_SageAccountNumber"),
col("CompanyInfo_SalesStage").alias("CompanyInfo_SalesStage"),
col("CompanyInfo_SalesStatus").alias("CompanyInfo_SalesStatus"),
col("CompanyInfo_Type").alias("CompanyInfo_Type"),
col("CompanyInfo_Categories").alias("CompanyInfo_Categories"),
col("CompanyInfo_MarketingCampaigns").alias("CompanyInfo_MarketingCampaigns")
).drop("CompanyInfo")
df = ensure_all_columns(df, {
"CompanyID": LongType(),
"IsActive": BooleanType(),
"CompanyInfo_FranchiseeID": LongType(),
"CompanyInfo_GroupAndRoutes": StringType(),
"CompanyInfo_Name": StringType(),
"CompanyInfo_ParentCompanyID": LongType(),
"CompanyInfo_Rank": StringType(),
"CompanyInfo_ReferralType": StringType(),
"CompanyInfo_ReferredBy": StringType(),
"CompanyInfo_ResponsibleRep_FirstName": StringType(),
"CompanyInfo_ResponsibleRep_LastName": StringType(),
"CompanyInfo_SageAccountNumber": StringType(),
"CompanyInfo_SalesStage": StringType(),
"CompanyInfo_SalesStatus": StringType(),
"CompanyInfo_Type": StringType(),
"CompanyInfo_Categories": StringType(),
"CompanyInfo_MarketingCampaigns": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_individual_contacts(df, file_path
if "IndividualID" not in df.columns:
df = df.withColumn("IndividualID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = df.withColumn("Contacts", when(col("Contacts").isNull(), array()).otherwise(col("Contacts")))
df = df.withColumn("Contact", explode_outer(col("Contacts")))
df = df.withColumn("ContactID", col("Contact.ContactID").cast(IntegerType())) \
.withColumn("Email", col("Contact.Email").cast(StringType())) \
.withColumn("SecondaryEmail", col("Contact.SecondaryEmail").cast(StringType())) \
.withColumn("MainPhoneNumber", col("Contact.MainPhone.Number").cast(StringType())) \
.withColumn("MainPhoneExtension", col("Contact.MainPhone.Extension").cast(StringType())) \
.withColumn("AddressLine1", col("Contact.Address.AddressLine1").cast(StringType())) \
.withColumn("AddressLine2", col("Contact.Address.AddressLine2").cast(StringType())) \
.withColumn("City", col("Contact.Address.City").cast(StringType())) \
.withColumn("StateProvince", col("Contact.Address.StateProvince").cast(StringType())) \
.withColumn("Country", col("Contact.Address.Country").cast(StringType())) \
.withColumn("PostalCode", col("Contact.Address.PostalCode").cast(StringType())) \
.withColumn("County", col("Contact.Address.County").cast(StringType())) \
.drop("Contacts", "Contact", "OtherPhones", "OtherPhone", "PhoneNumbers", "PhoneNumber")
df = ensure_all_columns(df, {
"IndividualID": IntegerType(),
"ContactID": IntegerType(),
"Email": StringType(),
"SecondaryEmail": StringType(),
"MainPhoneNumber": StringType(),
"MainPhoneExtension": StringType(),
"AddressLine1": StringType(),
"AddressLine2": StringType(),
"City": StringType(),
"StateProvince": StringType(),
"Country": StringType(),
"PostalCode": StringType(),
"County": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_individual_detail(df, file_path
if "IndividualID" not in df.columns:
df = df.withColumn("IndividualID", lit(file_path.split("/")[-2]).cast(LongType()))
df = df.withColumn("IsActive", col("IsActive").cast(BooleanType()))
if "IndividualDetailInfo.Assistant" in df.columns:
df = df.withColumn("AssistantFirstName", col("IndividualDetailInfo.Assistant.FirstName").cast(StringType())) \
.withColumn("AssistantLastName", col("IndividualDetailInfo.Assistant.LastName").cast(StringType())) \
.drop("IndividualDetailInfo.Assistant")
else:
df = df.withColumn("AssistantFirstName", lit(None).cast(StringType())) \
.withColumn("AssistantLastName", lit(None).cast(StringType()))
if "IndividualDetailInfo.ResponsibleRep" in df.columns:
df = df.withColumn("ResponsibleRepFirstName", col("IndividualDetailInfo.ResponsibleRep.FirstName").cast(StringType())) \
.withColumn("ResponsibleRepLastName", col("IndividualDetailInfo.ResponsibleRep.LastName").cast(StringType())) \
.drop("IndividualDetailInfo.ResponsibleRep")
else:
df = df.withColumn("ResponsibleRepFirstName", lit(None).cast(StringType())) \
.withColumn("ResponsibleRepLastName", lit(None).cast(StringType()))
df = df.withColumn("Categories", concat_ws(", ", col("IndividualDetailInfo.Categories")).cast(StringType())) \
.withColumn("MarketingCampaigns", concat_ws(", ", col("IndividualDetailInfo.MarketingCampaigns")).cast(StringType())) \
.drop("IndividualDetailInfo.Categories", "IndividualDetailInfo.MarketingCampaigns")
df = df.select(
col("IndividualID").alias("IndividualID"),
col("IsActive").alias("IsActive"),
col("IndividualDetailInfo.Anniversary").alias("Anniversary"),
col("AssistantFirstName").alias("AssistantFirstName"),
col("AssistantLastName").alias("AssistantLastName"),
col("Categories").alias("Categories"),
col("IndividualDetailInfo.CompanyID").cast(LongType()).alias("CompanyID"),
col("IndividualDetailInfo.ContactType").alias("ContactType"),
col("IndividualDetailInfo.DateOfBirth").alias("DateOfBirth"),
col("IndividualDetailInfo.FranchiseeId").cast(LongType()).alias("FranchiseeId"),
col("IndividualDetailInfo.GroupAndRoutes").alias("GroupAndRoutes"),
col("IndividualDetailInfo.JobTitle").alias("JobTitle"),
col("MarketingCampaigns").alias("MarketingCampaigns"),
col("IndividualDetailInfo.Name.FirstName").alias("FirstName"),
col("IndividualDetailInfo.Name.LastName").alias("LastName"),
col("IndividualDetailInfo.ParentCompany").alias("ParentCompany"),
col("IndividualDetailInfo.PotentialReferralRevenue").alias("PotentialReferralRevenue"),
col("IndividualDetailInfo.PotentialReferralVolume").alias("PotentialReferralVolume"),
col("IndividualDetailInfo.Rank").alias("Rank"),
col("IndividualDetailInfo.ReferralType").alias("ReferralType"),
col("IndividualDetailInfo.ReferredBy").alias("ReferredBy"),
col("ResponsibleRepFirstName").alias("ResponsibleRepFirstName"),
col("ResponsibleRepLastName").alias("ResponsibleRepLastName"),
col("IndividualDetailInfo.SageAccountNumber").alias("SageAccountNumber"),
col("IndividualDetailInfo.SalesStage").alias("SalesStage"),
col("IndividualDetailInfo.SalesStatus").alias("SalesStatus"),
col("IndividualDetailInfo.Title").alias("Title")
)
df = ensure_all_columns(df, {
"IndividualID": LongType(),
"IsActive": BooleanType(),
"Anniversary": StringType(),
"AssistantFirstName": StringType(),
"AssistantLastName": StringType(),
"Categories": StringType(),
"CompanyID": LongType(),
"ContactType": StringType(),
"DateOfBirth": StringType(),
"FranchiseeId": LongType(),
"GroupAndRoutes": StringType(),
"JobTitle": StringType(),
"MarketingCampaigns": StringType(),
"FirstName": StringType(),
"LastName": StringType(),
"ParentCompany": StringType(),
"PotentialReferralRevenue": StringType(),
"PotentialReferralVolume": StringType(),
"Rank": StringType(),
"ReferralType": StringType(),
"ReferredBy": StringType(),
"ResponsibleRepFirstName": StringType(),
"ResponsibleRepLastName": StringType(),
"SageAccountNumber": StringType(),
"SalesStage": StringType(),
"SalesStatus": StringType(),
"Title": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_provider_detail(df, file_path
if "ProviderID" not in df.columns:
df = df.withColumn("ProviderID", lit(file_path.split("/")[-2]).cast(IntegerType()))
return df.select(
col("ProviderID").alias("ProviderID"),
col("FranchisorID").alias("FranchisorID"),
col("Name").alias("ProviderName"),
col("CompanyID").alias("CompanyID"),
col("EnterpriseID").alias("EnterpriseID"),
col("Type").alias("ProviderType"),
col("PrimaryEmail").alias("PrimaryEmail"),
col("Website").alias("Website"),
col("IsActive").alias("IsActive"),
col("Address.AddressLine1").alias("AddressLine1"),
col("Address.AddressLine2").alias("AddressLine2"),
col("Address.City").alias("City"),
col("Address.StateProvince").alias("StateProvince"),
col("Address.Country").alias("Country"),
col("Address.PostalCode").alias("PostalCode"),
col("Address.County").alias("County")
)
def flatten_provider_scores(df, file_path
if "ProviderID" not in df.columns:
df = df.withColumn("ProviderID", lit(file_path.split("/")[-2]).cast(IntegerType()))
if "Vpasses" in df.columns:
df = df.withColumn("Vpass", explode_outer(col("Vpasses")))
return df.select(
col("ProviderID").alias("ProviderID"),
col("Vpass.Score").alias("VpassScore"),
col("Vpass.IsAdminScore").alias("IsAdminScore"),
col("NpsScore").alias("NpsScore")
)
def flatten_provider_suspensions(df, file_path
if "ProviderID" not in df.columns:
df = df.withColumn("ProviderID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = df.withColumn("Suspensions", when(col("Suspensions").isNull() | (size(col("Suspensions")) == 0), lit(None)).otherwise(col("Suspensions"))).drop("Suspensions")
df = ensure_all_columns(df, {
"ProviderID": IntegerType(),
"SuspensionID": IntegerType(),
"LocationID": IntegerType(),
"StartDate": StringType(),
"LiftDate": StringType(),
"Description": StringType(),
"Status": StringType(),
"IsValid": BooleanType(),
"Level": StringType(),
"Reason": StringType(),
"LiftMethod": StringType(),
"SuspensionType": StringType(),
"Visibility": StringType(),
"AffectedDivisionName": StringType()
})
df = null_out_problematic_columns(df)
return df
def flatten_location_detail(df, file_path
if "LocationID" not in df.columns:
df = df.withColumn("LocationID", lit(file_path.split("/")[-2]).cast(IntegerType()))
df = df.withColumn("EdiAddressLine1", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.AddressLine1")).otherwise(lit(None))) \
.withColumn("EdiAddressLine2", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.AddressLine2")).otherwise(lit(None))) \
.withColumn("EdiCity", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.City")).otherwise(lit(None))) \
.withColumn("EdiStateProvince", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.StateProvince")).otherwise(lit(None))) \
.withColumn("EdiPostalCode", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.PostalCode")).otherwise(lit(None))) \
.withColumn("EdiCountry", when(expr("typeof(EdiAddress) = 'struct'"), col("EdiAddress.Country")).otherwise(lit(None))) \
.drop("EdiAddress")
df = df.withColumn("AddressLine1", when(expr("typeof(Address) = 'struct'"), col("Address.AddressLine1")).otherwise(lit(None))) \
.withColumn("AddressLine2", when(expr("typeof(Address) = 'struct'"), col("Address.AddressLine2")).otherwise(lit(None))) \
.withColumn("City", when(expr("typeof(Address) = 'struct'"), col("Address.City")).otherwise(lit(None))) \
.withColumn("StateProvince", when(expr("typeof(Address) = 'struct'"), col("Address.StateProvince")).otherwise(lit(None))) \
.withColumn("PostalCode", when(expr("typeof(Address) = 'struct'"), col("Address.PostalCode")).otherwise(lit(None))) \
.withColumn("Country", when(expr("typeof(Address) = 'struct'"), col("Address.Country")).otherwise(lit(None))) \
.drop("Address")
return df.select(
col("LocationID").alias("LocationID"),
col("EdiAddressLine1").alias("EdiAddressLine1"),
col("EdiAddressLine2").alias("EdiAddressLine2"),
col("EdiCity").alias("EdiCity"),
col("EdiStateProvince").alias("EdiStateProvince"),
col("EdiPostalCode").alias("EdiPostalCode"),
col("EdiCountry").alias("EdiCountry"),
col("AddressLine1").alias("AddressLine1"),
col("AddressLine2").alias("AddressLine2"),
col("City").alias("City"),
col("StateProvince").alias("StateProvince"),
col("PostalCode").alias("PostalCode"),
col("Country").alias("Country")
)
def flatten_location_scores(df, file_path
if "LocationID" not in df.columns:
df = df.withColumn("LocationID", lit(file_path.split("/")[-2]).cast(IntegerType()))
if "Vpasses" in df.columns:
df = df.withColumn("Vpass", explode_outer(col("Vpasses")))
return df.select(
col("LocationID").alias("LocationID"),
col("CloseRatio").alias("CloseRatio"),
col("NpsScore").alias("NpsScore"),
col("Vpass.Score").alias("VpassScore"),
col("Vpass.IsAdminScore").alias("IsAdminScore")
)
# Define transformation mapping
transformation_mapping = {
"AccountingDetail.JSON": flatten_accounting_detail,
"AccountingSummary.JSON": flatten_accounting_summary,
"InternalParticipants.JSON": flatten_internal_participants,
"JobDates.JSON": flatten_job_dates,
"JobDetail.JSON": flatten_job_detail,
"JobExternalParticipants.JSON": flatten_job_external_participants,
"JobTags.JSON": flatten_job_tags,
"SurveySummary.JSON": flatten_survey_summary,
"LinkedAssignmentDetails.JSON": flatten_linked_assignments,
"NotesSummary.JSON": flatten_notes_summary,
"CompanyContacts.JSON": flatten_company_contacts,
"CompanyDetail.JSON": flatten_company_detail,
"IndividualContacts.JSON": flatten_individual_contacts,
"IndividualDetail.JSON": flatten_individual_detail,
"ProviderDetail.JSON": flatten_provider_detail,
"ProviderScores.JSON": flatten_provider_scores,
"ProviderSuspensions.JSON": flatten_provider_suspensions,
"LocationDetail.JSON": flatten_location_detail,
"LocationScores.JSON": flatten_location_scores
}
transformation_mapping_bc = sc.broadcast(transformation_mapping)
# Broadcast schemas (assuming schemas dict is defined elsewhere)
schemas_json = {k: v.json() for k, v in schemas.items()} if 'schemas' in globals() else {}
schemas_bc = sc.broadcast(schemas_json)
def process_partition(partition
"""Process a partition of JSON files inside worker nodes using the existing Spark session."""
# Use the global spark session (implicitly available via SparkContext on workers)
for row in partition:
path, file_name = row["path"], row["file_name"]
folder_name = file_name.replace(".JSON", "").replace(".json", "")
output_path = f"{output_base_path}/{folder_name}"
# Retrieve schema JSON from broadcast and convert to StructType
schema_json = schemas_bc.value.get(file_name)
schema = StructType.fromJson(schema_json) if schema_json else None
try:
# Infer schema only if no predefined schema exists
if schema is None:
inferred_df = spark.read.option("inferSchema", "true").json(path, multiLine=True)
schema = inferred_df.schema
raw_df = spark.read.schema(schema).json(path, multiLine=True)
# Apply the appropriate transformation function
transform_func = transformation_mapping_bc.value.get(file_name)
if transform_func:
raw_df = transform_func(raw_df, path)
# Write to Parquet with optimization
raw_df.coalesce(32).write.mode("append").partitionBy("JobID").parquet(output_path)
print(f"
Processed {file_name}") except Exception as e:
print(f"
Error processing {file_name}: {e}") def process_json_files_parallel():
"""Parallel processing of JSON files."""
json_files = list_json_files()
if not json_files:
print("No files to process.")
return
df_files = spark.createDataFrame([(path, os.path.basename(path)) for path in json_files], ["path", "file_name"])
# Repartition to match cluster cores (128) for maximum parallelism
df_files.repartition(128).rdd.foreachPartition(process_partition)
if __name__ == "__main__":
process_json_files_parallel()
spark.stop()