09-06-2015 01:07 PM
Good afternoon,
Attempting to run this statement:
%sql
CREATE EXTERNAL TABLE IF NOT EXISTS dev_user_login (
event_name STRING,
datetime TIMESTAMP,
ip_address STRING,
acting_user_id STRING
)
PARTITIONED BY
(date DATE)
STORED AS
PARQUET
LOCATION
"/mnt/bi-dev-data/warehouse/users.loggedIn"
I get the following error message:
Error in SQL statement: QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.UnsupportedOperationException: Parquet does not support timestamp. See HIVE-6384
However, when I check HIVE-6384 (Implement all datatypes in Parquet) I see it was resolved some time ago.
Is Databricks still on a version of Hive that has yet to support Timestamps in parquet? Any help would be appreciated. I tried this in both 1.4 and 1.5 experimental.
Many thanks.
09-11-2015 09:45 AM
Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:
CREATE TEMPORARY TABLE
dev_user_login (event_name STRING, datetime TIMESTAMP, ip_address STRING, acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS ( path "examples/src/main/resources/people.parquet")
09-11-2015 09:45 AM
Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:
CREATE TEMPORARY TABLE
dev_user_login (event_name STRING, datetime TIMESTAMP, ip_address STRING, acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS ( path "examples/src/main/resources/people.parquet")
09-11-2015 06:48 PM
Many thanks! The above worked and I was able to create the table with the timestamp data type. Appreciate the automatic partition discovery also! Ill focus on using the Dataframes vs Hive implementation going forward.
01-10-2017 09:02 PM
didn't work for me, can you paste the entire script here?
06-28-2017 06:59 AM
Is there a way to specify the timezone as well. After following the approach mentioned above I was able to store date information like "2016-07-23" as 2016-07-23T00:00:00.000+0000. But now I need to specify the UTC+05:30 timezone. Let me know if this is possible.
09-11-2017 09:39 AM
How can apply the solution above, in spark script:
package com.neoris.spark
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}
//import org.apache.spark.sql.hive.thriftserver._
import org.apache.spark.sql.hive.HiveContext
object LogAnalyzerStreaming {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
"Uso: LogAnalyzerStreaming <host> <port> <batchInterval>")
System.exit(1)
}
val Array(in_host, in_port, in_batchInterval) = args
val host = in_host.trim
val port = in_port.toInt
val batchInterval = Seconds(in_batchInterval.toInt)
val sparkConf = new SparkConf()
.setAppName("LogAnalyzerStreaming")
.setMaster("local[*]")
.set("spark.executor.memory", "2g")
.set("spark.sql.hive.thriftServer.singleSession", "true")
.set("spark.driver.allowMultipleContexts", "true")
val sparkStreamingContext = new StreamingContext(sparkConf, batchInterval)
val stream = FlumeUtils.createStream(sparkStreamingContext, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val eventBody = stream.map(e => new String(e.event.getBody.array))
val eventBodySchema =
StructType(
Array(
StructField("Fecha",StringType,true),
StructField("Hora",StringType,true),
StructField("filler_queries",StringType,true),
StructField("filler_info",StringType,true),
StructField("filler_client",StringType,true),
StructField("ip_port",StringType,true),
StructField("url01",StringType,true),
StructField("filler_view",StringType,true),
StructField("filler_default",StringType,true),
StructField("filler_query",StringType,true),
StructField("url02",StringType,true),
StructField("filler_in",StringType,true),
StructField("s_country",StringType,true),
StructField("s_edc",StringType,true),
StructField("url",StringType,true)
)
)
eventBody.foreachRDD { rdd =>
val sqlContext = new HiveContext(rdd.sparkContext)
val streamRDD = rdd.map(x => x.split(" ")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14)))
val streamSchemaRDD = sqlContext.applySchema(streamRDD,eventBodySchema)
streamSchemaRDD.registerTempTable("log")
val queryLog = sqlContext.sql("SELECT TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'dd-MMM-yyyy') AS TIMESTAMP)) as FECHA, TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'hh:mm:ss.SSS') AS TIMESTAMP)) as HORA FROM log")
queryLog.show()
queryLog.write
.format("parquet")
.mode("append")
.saveAsTable("logs")
}
stream.count().map(cnt => cnt + " eventos flume recibidos." ).print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
}
}
09-21-2017 11:20 AM
At the end, I've changed the format type from parquet to orc and it works fine for me.
format("orc")
10-23-2024 04:40 AM
1. change to spark native catalog approach (not hive metadata store) works. Syntax is essentially:
CREATE TABLE IF NOT EXISTS dbName.tableName (columns names and types
)
USING parquet
PARTITIONED BY (
runAt STRING
)
LOCATION 'abfss://path/to/parquet/folder';
2. I found I still have to use MSCK repair table 'the-table-name' to ensure the query shows the data.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group