- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-28-2017 08:54 PM
I see that https://github.com/apache/spark/pull/18581 will enable defining custom Line Separators for many sources, including CSV. Apart from waiting on this PR to make it into the main Databricks runtime, is there any other alternative to support different line separator? Such as directly setting HadoopFileLinesReader configuration?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 09:40 AM
Brian's answer above is probably the simplest. Here are some other options in case line.seperator doesn't do the trick:
Option: Custom CSV Reader
Modify the CSV reader and upload it as a library. Here are the steps:
- Fork the current CSV reader from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/d...
- Merge the pull request into your fork
- Change the package name
- Load it as a JAR library
- Access your custom reader using spark.read.format("com.example.spark.datasources.csv.CSVFileFormat").load(filename)
Option: Fix the original file...
Create a program that reads the file in as a byte stream, repairs the bytes, and writes out a new, repaired file. It's easiest if you do it without parallelizing, but you could parallelize if you wanted by breaking the file into 1 gb chunks and seeking ahead to the relevant section to process those in parallel.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 07:46 AM
Currently, the only known option is to fix the line separator before beginning your standard processing.
In that vein, one option I can think of is to use SparkContext.wholeTextFiles(..) to read in an RDD, split the data by the customs line separator and then from there are a couple of additional choices:
- Write the file back out with the new line separators.
- Convert the RDD to a DataFrame with a call like rdd.toDF() and resume processing.
The major disadvantage to this is the size of each file. The call to wholeTextFiles(..) will load each file as a single partition which means we could very easily consume all the available RAM.
The second option I can think of would be to perform the same operation above, but from the driver with standard file IO (if possible). The Scala/Python libraries for file manipulation are pretty straight forward allowing you to read one chunk of data, clean it up and write it back out to a new file. Again this is a very ugly solution and is not as practical if you are working from most data stores like S3, HDFS, etc.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 07:47 AM
I'll try to run this question by other instructors and/or engineers to see if there is some unknown/undocumented solution.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 08:38 AM
From the referenced PR, I assume that we’re talking about processing files that use something other than \n to delimit lines—e.g., \r, or \r\n. Since CSV files are assumed to be text files, and since Java uses a platform-specific notion of a line separator (System.getProperty("line.separator")), it might be possible to change that system property. However, I’m not 100% positive that’ll work, without either (a) digging into the source for the CSV reader, or (b) experimenting with changing that property (which needs to be propagated to the Executors).
I'd definitely stay away from wholeTextFiles(), for the reasons Jacob Parr (@SireInsectus) mentions.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 10:46 AM
So I chased down all the calls in the latest version of the source code (with @Brian Clapper's help) and it boils down to how org.apache.hadoop.util.LineReader is implemented - in short, it's hardcoded to use CR & LF if a specific record delimiter is not specified.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 03:03 PM
QQ: how do I call System.getProperty from a Python notebook in Databricks? Can't seem to figure out the correct import 😞
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 03:05 PM
Yeah, you can't really do that easily. That's a JVM thing. The actual code that reads the CSV runs on the JVM (and is written in Scala).
If you're working in Python, your best bet really is to write your own adapter (or wait until the PR shows up in Spark). And you'll have to write that adapter in Scala or Java.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 03:52 PM
spark._jvm.java.lang.System.getProperty("line.separator")
spark._jvm.java.lang.System.setProperty("line.separator", "\n")
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 04:00 PM
That's cool. I didn't know we could do that from Python.... However, remember that org.apache.hadoop.util.LineReader is hardcoded to use CR & LFs - I just verified that this morning. It does not check any system properties like all "good" libraries would.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 05:31 PM
You can do that, but you're stepping inside "implementation detail" land. This is definitely doable, but you're using a non-public API, so caveat programmer.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 09:40 AM
Brian's answer above is probably the simplest. Here are some other options in case line.seperator doesn't do the trick:
Option: Custom CSV Reader
Modify the CSV reader and upload it as a library. Here are the steps:
- Fork the current CSV reader from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/d...
- Merge the pull request into your fork
- Change the package name
- Load it as a JAR library
- Access your custom reader using spark.read.format("com.example.spark.datasources.csv.CSVFileFormat").load(filename)
Option: Fix the original file...
Create a program that reads the file in as a byte stream, repairs the bytes, and writes out a new, repaired file. It's easiest if you do it without parallelizing, but you could parallelize if you wanted by breaking the file into 1 gb chunks and seeking ahead to the relevant section to process those in parallel.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2017 11:12 AM
Thank you Doug, and everyone else who took the time to reply. I really appreciate this help!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-07-2018 02:46 PM
You can use newAPIHadoopFile
SCALA
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "#")
val log_df = sc.newAPIHadoopFile("path/to/file", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString).toDF()
PYTHON
log_rdd = sc.newAPIHadoopFile("/path/to/file", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf).map(lambda x: [x[1]])