cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader streamReader does not include the partition column as part of output.

tech2cloud
New Contributor II

I have folder structure at source such as

/transaction/date_=2023-01-20/hr_=02/tras01.csv

/transaction/date_=2023-01-20/hr_=03/tras02.csv

Where 'date_' and 'hr_' are my partitions and present in the dataset as well. But the streamReader does not read these columns as output. Whereas the same file if I read as CSV the column shows. Am I missing something here?

imagePlease help!

2 REPLIES 2

Anonymous
Not applicable

When reading data from files using Spark's StreamReader, the partition columns (in your case, date_ and hr_) are not automatically inferred as output columns. Instead, they are used to dynamically partition the data based on their values.

To include the partition columns in the output, you can specify them explicitly using the option() method of the StreamReader. Here's an example of how to read your data and include the partition columns in the output:

from pyspark.sql.functions import input_file_name
 
df = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("dateFormat", "yyyy-MM-dd") \
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
  .option("basePath", "/transaction/") \
  .option("pathGlobFilter", "*.csv") \
  .option("recursiveFileLookup", "true") \
  .option("partitionColumn", "date_, hr_") \
  .option("numPartitions", "100") \
  .load("/transaction/")
 
df = df.withColumn("date_", df["date_"].cast("date")) \
  .withColumn("hr_", df["hr_"].cast("integer")) \
  .withColumn("filename", input_file_name())

In this example, we specify the basePath as /transaction/, which is the root directory of the partitioned files. We also use the pathGlobFilter option to filter only CSV files, and recursiveFileLookup to look for files in all subdirectories. The partitionColumn option is set to "date_, hr_" to indicate that the data is partitioned by these columns. We also set numPartitions to "100" to control the number of partitions in the resulting DataFrame. Finally, we use the withColumn() method to cast the date_ column to a date type and the hr_ column to an integer type. We also add a new column called filename using the input_file_name() function, which returns the name of the file that each row comes from. With these options set, the resulting df DataFrame will include the date_ and hr_ columns in the output, along with any other columns in the CSV files.

Anonymous
Not applicable

Hi @Ravi Vishwakarma​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group