Tuesday
Hello - I am following some online code to create a function as follows:
-----------------------------------------
CREATE OR REPLACE FUNCTION my_catalog.my_schema.insert_data_function(
col1_value STRING,
col2_value INT
)
RETURNS BOOLEAN
COMMENT 'Inserts data into a table in Unity Catalog.'
RETURN
INSERT INTO my_catalog.my_schema.my_table (column1, column2)
VALUES (col1_value, col2_value);
-----------------------------------------
Then I got an error:
Syntax error at or near 'INTO'. SQLSTATE: 42601
It looks like function is to return do reading only and return a value but not writing something to the table. Can you please advise? Thanks!
Wednesday
Hi @greengil ,
You can't use functions to modify data. They're intended to return scalar value or table. If you need to modify content of a table use stored procedure instead.
Wednesday
Thank you for the info, @szymon_dybczak.
I am attempting to create a UC 'function' which will be registered as a tool to be used by an Agent. The agent will connect to the tool to insert some data into a table. Will a stored procedure be able to registered as a UC function? Thanks!
Wednesday
In UC, the functions must be read-only; they cannot modify state (no INSERT, DELETE, MERGE, CREATE, VACUUM, etc). So I tried to create a PROCEDURE and call it; I was able to insert data into the table successfully.
Unity Catalog tools are really just Unity Catalog user-defined functions (UDFs) under the hood. So you can’t register a stored procedure as a UC function tool directly. However I can think of two quick options to make an agent perform writes:
1) Agent code tool: Define a tool in your agent’s code that executes a parameterized CALL … via the SQL Statement Execution API or Workspace client. Agent code tools are recommended for calling REST APIs or running arbitrary code and don’t require the operation to be wrapped as a UC function.
2) DBSQL MCP server (tool interoperability): Expose a general SQL execution tool via the Databricks-managed DBSQL MCP server and have the agent run parameterized SQL, including DML, under your governance model. Use this for pipeline authoring or when you want tool discovery via MCP; for read-only chatbot-style retrieval, use Genie instead.
If you must keep the tool in Unity Catalog for governance and discoverability, use UC functions for read/query tools and pair them with an agent code tool (or MCP SQL tool) for mutations. UC function tools are a great fit for retrieval/transform patterns, and Databricks explicitly supports using UC UDFs as agent tools.
Wednesday
Hi @iyashk-DB - Thank you for the tips. Does the following sample code look correct? Thanks for your help!
#################################################################
###### Create the stored procedure to write data to a table #####
%sql
CREATE OR REPLACE PROCEDURE myCatalog.mySchema.insert_data_procedure(
col1_value STRING,
col2_value STRING
)
LANGUAGE SQL
AS
BEGIN
INSERT INTO myCatalog.mySchema.myTable (column1, column2)
VALUES (col1_value, col2_value);
END;
#######################################################################
###### Define to execute a parameterized call to stored procedure #####
import requests
def call_insert_data_procedure(
col1_value: str,
col2_value: str,
databricks_host: str,
databricks_token: str
) -> str:
"""
Calls the insert_data_procedure stored procedure in Databricks via the SQL Statement Execution API.
Args:
col1_value (str): Value for the first column.
col2_value (str): Value for the second column.
databricks_host (str): Databricks workspace URL.
databricks_token (str): Databricks personal access token.
Returns:
str: Status message.
"""
endpoint = f"{databricks_host}/api/2.0/sql/statements/"
headers = {
"Authorization": f"Bearer {databricks_token}",
"Content-Type": "application/json"
}
sql = (
"CALL myCatalog.mySchema.insert_data_procedure("
f"'{col1_value}', '{col2_value}');"
)
payload = {
"statement": sql,
"warehouse_id": "<your_warehouse_id>"
}
response = requests.post(endpoint, headers=headers, json=payload)
response.raise_for_status()
return "Procedure executed successfully."
###########################################################
##### Define as a tool for an agent (LangChain style) #####
from langchain.tools import Tool
insert_data_tool = Tool(
name="InsertDataProcedure",
func=lambda col1, col2: call_insert_data_procedure(
col1, col2, "<your_databricks_host>", "<your_databricks_token>"
),
description="Inserts data into notification_schedule using the stored procedure."
)
yesterday
If you are executing that code from databricks you don't need to call it through API SQL Statement Execution, as it is not necessary overhead. Just use spark.sql("CALL myCatalog.mySchema.insert_data_procedure("
f"'{col1_value}', '{col2_value}');"
))
yesterday
I tried the following and got an error. What's the issue?
Here is the registered function the agent will call:
def myFunction(col_val1: str, col_val2: str) -> str:
"""
A function that will accept 2 strings from caller and insert into a table.
"""
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sql(f"CALL myCatalog.mySchema.myStoredProcedure('{col_val1}', '{col_val2}');")
return "Data inserted successfully."
Then from Playground, load the Tool to the above function and ask agent to insert data. Got the below error:
== Error ==
SystemExit: -1
== Stacktrace ==
File "<udfbody>", line 3, in main
spark = SparkSession.builder.getOrCreate()
File "/databricks/spark/python/pyspark/sql/session.py", line 574, in getOrCreate
else SparkContext.getOrCreate(sparkConf)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/core/context.py", line 579, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/databricks/spark/python/pyspark/core/context.py", line 207, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/databricks/spark/python/pyspark/core/context.py", line 500, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/java_gateway.py", line 63, in launch_gateway
SPARK_HOME = _find_spark_home()
^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/find_spark_home.py", line 91, in _find_spark_home
sys.exit(-1) SQLSTATE: 39000
== SQL (line 1, position 😎 ==
SELECT `myCatalog`.`mySchema`.`myStoredProcedure`("test1","test2") as output
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Error: Error: org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function myCatalog.mySchema.myFunction(test1, test2) failed.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now