โ09-06-2015 01:07 PM
Good afternoon,
Attempting to run this statement:
%sql 
CREATE EXTERNAL TABLE IF NOT EXISTS dev_user_login (
  event_name STRING,
  datetime TIMESTAMP,
  ip_address STRING,
  acting_user_id STRING
)
PARTITIONED BY
  (date DATE)
STORED AS 
  PARQUET
LOCATION
  "/mnt/bi-dev-data/warehouse/users.loggedIn"
I get the following error message:
Error in SQL statement: QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.UnsupportedOperationException: Parquet does not support timestamp. See HIVE-6384
However, when I check HIVE-6384 (Implement all datatypes in Parquet) I see it was resolved some time ago.
Is Databricks still on a version of Hive that has yet to support Timestamps in parquet? Any help would be appreciated. I tried this in both 1.4 and 1.5 experimental.
Many thanks.
โ09-11-2015 09:45 AM
Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:
CREATE TEMPORARY TABLE dev_user_login (event_name STRING,  datetime TIMESTAMP,  ip_address STRING,  acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS (  path "examples/src/main/resources/people.parquet")โ09-11-2015 09:45 AM
Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:
CREATE TEMPORARY TABLE dev_user_login (event_name STRING,  datetime TIMESTAMP,  ip_address STRING,  acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS (  path "examples/src/main/resources/people.parquet")โ09-11-2015 06:48 PM
Many thanks! The above worked and I was able to create the table with the timestamp data type. Appreciate the automatic partition discovery also! Ill focus on using the Dataframes vs Hive implementation going forward.
โ01-10-2017 09:02 PM
didn't work for me, can you paste the entire script here?
โ06-28-2017 06:59 AM
Is there a way to specify the timezone as well. After following the approach mentioned above I was able to store date information like "2016-07-23" as 2016-07-23T00:00:00.000+0000. But now I need to specify the UTC+05:30 timezone. Let me know if this is possible.
โ09-11-2017 09:39 AM
How can apply the solution above, in spark script:
package com.neoris.spark
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}
//import org.apache.spark.sql.hive.thriftserver._
import org.apache.spark.sql.hive.HiveContext
object LogAnalyzerStreaming {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        "Uso: LogAnalyzerStreaming <host> <port> <batchInterval>")
      System.exit(1)
    }
    val Array(in_host, in_port, in_batchInterval) = args
    val host = in_host.trim
    val port = in_port.toInt
    val batchInterval = Seconds(in_batchInterval.toInt)
    val sparkConf = new SparkConf()
      .setAppName("LogAnalyzerStreaming")
      .setMaster("local[*]")
      .set("spark.executor.memory", "2g")
      .set("spark.sql.hive.thriftServer.singleSession", "true")
      .set("spark.driver.allowMultipleContexts", "true")
    val sparkStreamingContext = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createStream(sparkStreamingContext, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    val eventBody = stream.map(e => new String(e.event.getBody.array))
    val eventBodySchema =
      StructType(
        Array(
          StructField("Fecha",StringType,true),
          StructField("Hora",StringType,true),
          StructField("filler_queries",StringType,true),
          StructField("filler_info",StringType,true),
          StructField("filler_client",StringType,true),
          StructField("ip_port",StringType,true),
          StructField("url01",StringType,true),
          StructField("filler_view",StringType,true),
          StructField("filler_default",StringType,true),
          StructField("filler_query",StringType,true),
          StructField("url02",StringType,true),
          StructField("filler_in",StringType,true),
          StructField("s_country",StringType,true),
          StructField("s_edc",StringType,true),
          StructField("url",StringType,true)
        )
      )
    eventBody.foreachRDD { rdd =>
      val sqlContext = new HiveContext(rdd.sparkContext)
      val streamRDD = rdd.map(x => x.split(" ")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14)))
      val streamSchemaRDD = sqlContext.applySchema(streamRDD,eventBodySchema)
      streamSchemaRDD.registerTempTable("log")
      val queryLog = sqlContext.sql("SELECT TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'dd-MMM-yyyy') AS TIMESTAMP)) as FECHA, TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'hh:mm:ss.SSS') AS TIMESTAMP)) as HORA FROM log")
      queryLog.show()
      queryLog.write
        .format("parquet")
        .mode("append")
        .saveAsTable("logs")
    }
    stream.count().map(cnt => cnt + " eventos flume recibidos." ).print()
    sparkStreamingContext.start() 
    sparkStreamingContext.awaitTermination() 
  }
}โ09-21-2017 11:20 AM
At the end, I've changed the format type from parquet to orc and it works fine for me.
format("orc")โ10-23-2024 04:40 AM
1. change to spark native catalog approach (not hive metadata store) works. Syntax is essentially:
    CREATE TABLE IF NOT EXISTS dbName.tableName (columns names and types
    )
    USING parquet 
    PARTITIONED BY (
      runAt STRING
    )
    LOCATION 'abfss://path/to/parquet/folder';2. I found I still have to use MSCK repair table 'the-table-name' to ensure the query shows the data.
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now