Here is the code (forgot to add)
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "false") # Assuming the CSV files have headers
.schema(schema) # Specify the schema here
.option("cloudFiles.schemaLocation", checkpoint_dir)
.load(source_files)
.writeStream
.format("parquet")
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start(output_path)
)