cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Hive Table Creation - Parquet does not support Timestamp Datatype?

RobertWalsh
New Contributor II

Good afternoon,

Attempting to run this statement:

%sql 
CREATE EXTERNAL TABLE IF NOT EXISTS dev_user_login (
  event_name STRING,
  datetime TIMESTAMP,
  ip_address STRING,
  acting_user_id STRING
)
PARTITIONED BY
  (date DATE)
STORED AS 
  PARQUET
LOCATION
  "/mnt/bi-dev-data/warehouse/users.loggedIn"

I get the following error message:

Error in SQL statement: QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.UnsupportedOperationException: Parquet does not support timestamp. See HIVE-6384

However, when I check HIVE-6384 (Implement all datatypes in Parquet) I see it was resolved some time ago.

Is Databricks still on a version of Hive that has yet to support Timestamps in parquet? Any help would be appreciated. I tried this in both 1.4 and 1.5 experimental.

Many thanks.

1 ACCEPTED SOLUTION

Accepted Solutions

vida
Databricks Employee
Databricks Employee

Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:

CREATE TEMPORARY TABLE 
dev_user_login (event_name STRING,  datetime TIMESTAMP,  ip_address STRING,  acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS (  path "examples/src/main/resources/people.parquet")

View solution in original post

7 REPLIES 7

vida
Databricks Employee
Databricks Employee

Can you try this - which will use the Dataframes implementation of parquet rather than the Hive version:

CREATE TEMPORARY TABLE 
dev_user_login (event_name STRING,  datetime TIMESTAMP,  ip_address STRING,  acting_user_id STRING)USING org.apache.spark.sql.parquetOPTIONS (  path "examples/src/main/resources/people.parquet")

RobertWalsh
New Contributor II

Many thanks! The above worked and I was able to create the table with the timestamp data type. Appreciate the automatic partition discovery also! Ill focus on using the Dataframes vs Hive implementation going forward.

@omoshiroi

didn't work for me, can you paste the entire script here?

Is there a way to specify the timezone as well. After following the approach mentioned above I was able to store date information like "2016-07-23" as 2016-07-23T00:00:00.000+0000. But now I need to specify the UTC+05:30 timezone. Let me know if this is possible.

SirChokolate
New Contributor II

How can apply the solution above, in spark script:

package com.neoris.spark
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}
//import org.apache.spark.sql.hive.thriftserver._
import org.apache.spark.sql.hive.HiveContext
object LogAnalyzerStreaming {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        "Uso: LogAnalyzerStreaming <host> <port> <batchInterval>")
      System.exit(1)
    }
    val Array(in_host, in_port, in_batchInterval) = args
    val host = in_host.trim
    val port = in_port.toInt
    val batchInterval = Seconds(in_batchInterval.toInt)
    val sparkConf = new SparkConf()
      .setAppName("LogAnalyzerStreaming")
      .setMaster("local[*]")
      .set("spark.executor.memory", "2g")
      .set("spark.sql.hive.thriftServer.singleSession", "true")
      .set("spark.driver.allowMultipleContexts", "true")
    val sparkStreamingContext = new StreamingContext(sparkConf, batchInterval)
    val stream = FlumeUtils.createStream(sparkStreamingContext, host, port, StorageLevel.MEMORY_ONLY_SER_2)
    val eventBody = stream.map(e => new String(e.event.getBody.array))
    val eventBodySchema =
      StructType(
        Array(
          StructField("Fecha",StringType,true),
          StructField("Hora",StringType,true),
          StructField("filler_queries",StringType,true),
          StructField("filler_info",StringType,true),
          StructField("filler_client",StringType,true),
          StructField("ip_port",StringType,true),
          StructField("url01",StringType,true),
          StructField("filler_view",StringType,true),
          StructField("filler_default",StringType,true),
          StructField("filler_query",StringType,true),
          StructField("url02",StringType,true),
          StructField("filler_in",StringType,true),
          StructField("s_country",StringType,true),
          StructField("s_edc",StringType,true),
          StructField("url",StringType,true)
        )
      )
    eventBody.foreachRDD { rdd =>
      val sqlContext = new HiveContext(rdd.sparkContext)
      val streamRDD = rdd.map(x => x.split(" ")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14)))
      val streamSchemaRDD = sqlContext.applySchema(streamRDD,eventBodySchema)
      streamSchemaRDD.registerTempTable("log")
      val queryLog = sqlContext.sql("SELECT TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'dd-MMM-yyyy') AS TIMESTAMP)) as FECHA, TO_DATE(CAST(UNIX_TIMESTAMP(Fecha, 'hh:mm:ss.SSS') AS TIMESTAMP)) as HORA FROM log")
      queryLog.show()
      queryLog.write
        .format("parquet")
        .mode("append")
        .saveAsTable("logs")
    }
    stream.count().map(cnt => cnt + " eventos flume recibidos." ).print()
    sparkStreamingContext.start() 
    sparkStreamingContext.awaitTermination() 
  }
}

At the end, I've changed the format type from parquet to orc and it works fine for me.

format("orc")

source2sea
Contributor

1. change to spark native catalog approach (not hive metadata store) works. Syntax is essentially:

    CREATE TABLE IF NOT EXISTS dbName.tableName (columns names and types
    )
    USING parquet 
    PARTITIONED BY (
      runAt STRING
    )
    LOCATION 'abfss://path/to/parquet/folder';

2. I found I still have to use MSCK repair table 'the-table-name' to ensure the query shows the data.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group