Use Case:
Converting unstructured data from PDF to structured format before sending to Salesforce
Ask:
Best practices to structure my table better before sending to a system like salesforce
Output in structured format looks like:

My code:
- Extract Tables from PDF
- Unify Schema and Save Delta Table
- Auto Verification
- Access via Delta Sharing
import pdfplumber
import re
from pyspark.sql import Row, SparkSession
from delta_sharing import SharingClient
import fsspec
# --- Configuration ---
pdf_path = "/Volumes/workspace/default/pdf/ntt docomo japan.pdf"
target_table = "workspace.default.structured_pdf_table"
# --- Delta Sharing token from Databricks secret ---
token = dbutils.secrets.get(scope="delta_sharing_scope", key="delta_token")
print("Delta Sharing token retrieved successfully.")
# --- Helper: Normalize headers ---
def normalize_header(header, idx):
if not header or not isinstance(header, str) or header.strip() == "":
return f"col{idx+1}"
clean = re.sub(r'[^0-9a-zA-Z_]', '', header.strip().lower().replace(" ", "_"))
return clean if clean else f"col{idx+1}"
# --- Step 1: Extract PDF tables + fallback text ---
structured_data = []
try:
with pdfplumber.open(pdf_path) as pdf:
print(f"Opened PDF '{pdf_path}' with {len(pdf.pages)} page(s).")
for page_num, page in enumerate(pdf.pages, start=1):
tables = page.extract_tables()
if tables:
print(f"Table(s) found on page {page_num}")
for table in tables:
headers = table[0]
rows = table[1:]
clean_headers = [normalize_header(h, i) for i, h in enumerate(headers)]
for row in rows:
row_dict = {clean_headers[i]: row[i] if i < len(row) else None
for i in range(len(clean_headers))}
structured_data.append(Row(**row_dict))
else:
print(f"No tables found on page {page_num}, using fallback text parsing...")
page_text = page.extract_text()
if page_text:
lines = page_text.split("\n")
for line in lines:
cols = re.split(r"\s{2,}", line.strip())
row_dict = {f"col{i+1}": cols[i] if i < len(cols) else None
for i in range(6)}
row_dict["raw_line"] = line
structured_data.append(Row(**row_dict))
except Exception as e:
print(f"Error during PDF parsing: {e}")
# --- Step 2: Unify schema + create Delta table ---
if structured_data:
try:
# Determine all possible columns
all_cols = set()
for r in structured_data:
all_cols.update(r.asDict().keys())
all_cols = list(all_cols)
# Ensure all rows have all columns
uniform_rows = []
for r in structured_data:
d = r.asDict()
for c in all_cols:
if c not in d:
d[c] = None
uniform_rows.append(Row(**d))
# Create DataFrame
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(uniform_rows)
# Force all columns to string to avoid type inference issues
df = df.selectExpr(*[f"CAST({c} AS STRING) as {c}" for c in all_cols])
# Preview before saving
print("\n🔎 Preview of structured data before saving:")
display(df.limit(20))
# Save Delta table
df.write.mode("overwrite").saveAsTable(target_table)
print(f"\n Structured data saved to Delta table: {target_table}")
except Exception as e:
print(f"Error while saving Delta table: {e}")
else:
print(" No structured data extracted from the PDF.")
# --- Step 3: Auto Verification ---
try:
print("\n🔎 Step 3a: Check if table exists in workspace.default")
tables = spark.catalog.listTables("workspace.default")
table_names = [t.name for t in tables]
if "structured_pdf_table" in table_names:
print("Table exists in workspace.default")
else:
print("Table NOT found in workspace.default")
df = spark.table(target_table)
print("\nStep 3b: Preview first 20 rows of the Delta table")
display(df.limit(20))
print("\nStep 3c: Delta table schema")
df.printSchema()
print("\nStep 3d: Total number of rows in the table")
print(df.count())
except Exception as e:
print(f"Error during table verification: {e}")
# --- Step 4: Access via Delta Sharing ---
try:
profile_url = f"https://dbc-5ec65784-4b66.cloud.databricks.com/shares/pdf_share/profile.json?token={token}"
# Use fsspec to handle token safely
fs = fsspec.filesystem("https", expand=True)
with fs.open(profile_url) as f:
client = SharingClient(f)
shared_table = client.load_table(target_table)
shared_df = shared_table.to_pandas()
print("\nStep 4: Preview Delta Shared table (first 20 rows)")
display(shared_df.head(20))
except Exception as e:
print(f"Could not access table via Delta Sharing: {e}")