ameerafi
Databricks Employee
Databricks Employee

@DBonomo @sahil_s_jain We can write separate getSchema method inside the BulkCopyUtils.scala file and call that method instead of referring it from spark. You can add the below function in the BulkCopyUtils.scala file and build it locally. You can then call this function --> `val tableCols = BulkCopyJdbcUtils.getSchema(rs, JdbcDialects.get(url))`

import org.apache.spark.sql.jdbc.JdbcDialect


/**
* Utility object containing getSchema implementation for Spark 3.5
* This replaces the call to org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.getSchema
* to avoid method signature conflicts in DBR 15.4
*/
object BulkCopyJdbcUtils {

/**
* Takes a [[ResultSet]] and returns its Catalyst schema.
* This is the Spark 3.5 version with 3 parameters.
*
* @param resultSet The ResultSet to extract schema from
* @param dialect The JDBC dialect to use for type mapping
* @param alwaysNullable If true, all the columns are nullable.
* @return A [[StructType]] giving the Catalyst schema.
* @throws SQLException if the schema contains an unsupported type.
*/
def getSchema(
resultSet: ResultSet,
dialect: JdbcDialect,
alwaysNullable: Boolean = false): StructType = {
val rsmd = resultSet.getMetaData
val ncols = rsmd.getColumnCount
val fields = new Array[StructField](ncols)
var i = 0
while (i < ncols) {
val columnName = rsmd.getColumnLabel(i + 1)
val dataType = rsmd.getColumnType(i + 1)
val typeName = rsmd.getColumnTypeName(i + 1)
val fieldSize = rsmd.getPrecision(i + 1)
val fieldScale = rsmd.getScale(i + 1)
val isSigned = {
try {
rsmd.isSigned(i + 1)
} catch {
// Workaround for HIVE-14684:
case e: SQLException if
e.getMessage == "Method not supported" &&
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
}
}
val nullable = if (alwaysNullable) {
true
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder()
.putString("name", columnName)
.putLong("scale", fieldScale)
.build()

val columnType = getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned)
fields(i) = StructField(columnName, columnType, nullable, metadata)
i = i + 1
}
new StructType(fields)
}

/**
* Maps a JDBC type to a Catalyst type using Spark 3.5 logic.
* Fixed DecimalType.bounded compatibility issue.
*/
private def getCatalystType(
sqlType: Int,
typeName: String,
precision: Int,
scale: Int,
signed: Boolean): DataType = {

val answer = sqlType match {
// scalastyle:off
case java.sql.Types.ARRAY => null
case java.sql.Types.BIGINT => if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.BINARY => BinaryType
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
case java.sql.Types.CHAR => StringType
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATALINK => null
case java.sql.Types.DATE => DateType
case java.sql.Types.DECIMAL
if precision != 0 || scale != 0 => createDecimalType(precision, scale)
case java.sql.Types.DECIMAL => DecimalType.SYSTEM_DEFAULT
case java.sql.Types.DISTINCT => null
case java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.FLOAT => FloatType
case java.sql.Types.INTEGER => if (signed) { IntegerType } else { LongType }
case java.sql.Types.JAVA_OBJECT => null
case java.sql.Types.LONGNVARCHAR => StringType
case java.sql.Types.LONGVARBINARY => BinaryType
case java.sql.Types.LONGVARCHAR => StringType
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NCLOB => StringType
case java.sql.Types.NULL => NullType
case java.sql.Types.NUMERIC
if precision != 0 || scale != 0 => createDecimalType(precision, scale)
case java.sql.Types.NUMERIC => DecimalType.SYSTEM_DEFAULT
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE => null
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE => null
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ =>
throw new SQLException("Unrecognized SQL type " + sqlType)
// scalastyle:on
}

if (answer == null) {
throw new SQLException("Unsupported type " + sqlType)
}
answer
}

/**
* Helper method to create DecimalType with proper bounds checking
* This replaces DecimalType.bounded which may not be accessible
*/
private def createDecimalType(precision: Int, scale: Int): DecimalType = {
// Ensure precision and scale are within valid bounds
val validPrecision = math.min(math.max(precision, 1), DecimalType.MAX_PRECISION)
val validScale = math.min(math.max(scale, 0), validPrecision)

try {
// Try the standard constructor first
DecimalType(validPrecision, validScale)
} catch {
case _: Exception =>
// Fallback to system default if constructor fails
DecimalType.SYSTEM_DEFAULT
}
}
}