Hello,
I am working on a Spark job where I'm reading several tables from PostgreSQL into DataFrames as follows:
df = (spark.read
.format("postgresql")
.option("query", query)
.option("host", database_host)
.option("port", database_port)
.option("database", database_name)
.option("user", user)
.option("password", password)
.option("fetchsize", 500000)
.load()
)
df = df.cache()
Unfortunately, I cannot partition the tables during the read because the columns I would like to partition by are VARCHARs. When I add .option("numPartitions", partitions) to the code above, the execution plan still indicates a single partition:
Scan JDBCRelation((SELECT party_name, party_id, company_id, deleted_at, updated_at FROM public.table) SPARK_GEN_SUBQ_3808) [numPartitions=1] (1)
I'm selecting between 1 to 10 columns from 6 tables. The largest table contains approximately 200,000,000 rows. The job involves several complex operations, including dozens of joins and multiple aggregations. It runs slowly, particularly during the reading phase.
In execution plan, I noticed that each table involves multiple Scan JDBCRelation operations. For example, one of the PostgreSQL read queries, which lacks any WHERE clauses, results in 28 Scan JDBC operations.
Could anyone suggest potential optimizations for reading from PostgreSQL? Additionally, could you explain why there are multiple Scan JDBCs for a single source table?
Thank you for your assistance!
Please see Execution Plan in PDF attached to this post
or check it on pastebin (paste password: eZtui0teAL) - https://pastebin.com/bviEiX73