When reading data from files using Spark's StreamReader, the partition columns (in your case, date_ and hr_) are not automatically inferred as output columns. Instead, they are used to dynamically partition the data based on their values.
To include the partition columns in the output, you can specify them explicitly using the option() method of the StreamReader. Here's an example of how to read your data and include the partition columns in the output:
from pyspark.sql.functions import input_file_name
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.option("basePath", "/transaction/") \
.option("pathGlobFilter", "*.csv") \
.option("recursiveFileLookup", "true") \
.option("partitionColumn", "date_, hr_") \
.option("numPartitions", "100") \
.load("/transaction/")
df = df.withColumn("date_", df["date_"].cast("date")) \
.withColumn("hr_", df["hr_"].cast("integer")) \
.withColumn("filename", input_file_name())
In this example, we specify the basePath as /transaction/, which is the root directory of the partitioned files. We also use the pathGlobFilter option to filter only CSV files, and recursiveFileLookup to look for files in all subdirectories. The partitionColumn option is set to "date_, hr_" to indicate that the data is partitioned by these columns. We also set numPartitions to "100" to control the number of partitions in the resulting DataFrame. Finally, we use the withColumn() method to cast the date_ column to a date type and the hr_ column to an integer type. We also add a new column called filename using the input_file_name() function, which returns the name of the file that each row comes from. With these options set, the resulting df DataFrame will include the date_ and hr_ columns in the output, along with any other columns in the CSV files.