4 weeks ago
Problem: In Databricks, logs from an external binary (via os.system) show up, but Python logger.info() inside groupBy(...).applyInPandas(...) does not. print(..., flush=True) does show up.
Why: applyInPandas runs your function as a pandas UDF. That code executes in a Python worker process, which has its own logging configuration or is missing one? For example, I do have root-level configuration like this when the Databricks Workflow/job starts
log_level = os.getenv("LOG_LEVEL", default="INFO")
logging.basicConfig(
level=log_level,
format="[%(levelname)s] [%(asctime)s] %(message)s",
)
logging.getLogger("py4j").setLevel("ERROR")
logger.info(f"set root log level to {log_level}")This captures the logs in Spark and workflow level but not in UDFs that are sent to applyInPandas
Well, I have three choices to try out:
import logging, sys
def setup_worker_logging(level=logging.INFO):
logging.basicConfig(
level=level,
format='[%(levelname)s] [%(asctime)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
force=True,
)
# inside applyInPandas function
setup_worker_logging()
logger = logging.getLogger(__name__)
logger.info("visible now") formatting = "[%(levelname)s] [%(asctime)s] %(message)s"
log_level = os.getenv("LOG_LEVEL", default="INFO")
logging.basicConfig(level=log_level, format=formatting)
# Add StreamHandler for stdout output
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
formatter = logging.Formatter(formatting)
stream_handler.setFormatter(formatter)
# Add handler to root logger
root_logger = logging.getLogger()
root_logger.addHandler(stream_handler)
logging.getLogger("py4j").setLevel("ERROR")
logger.info(f"set root log level to {log_level}")Did any of you face such issues? If yes, how did you solve it in your production code?
3 weeks ago
Hey @Kirankumarbs !
applyInPandas runs in separate Python worker processes, so your driver-side logging.basicConfig(...) doesn’t apply there. That’s why print(..., flush=True) works (stdout from the worker is wired through), but logger.info() doesn’t.
If you can run DBR 18.0+ classic or 18.2+ shared/serverless, just enable Python worker logging and use normal logging in the UDF:
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
def my_group_fn(pdf):
import logging
logger = logging.getLogger("my.udf")
logger.setLevel(logging.INFO)
logger.info(f"group size: {len(pdf)}")
...
return out_pdf
# after the job
logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'my.udf'").select("level", "msg", "context").show(truncate=False)
Your setup_worker_logging() approach is a solid production pattern:
import logging, sys
def setup_worker_logging(level=logging.INFO):
logging.basicConfig(
level=level,
format='[%(levelname)s] [%(asctime)s] %(message)s',
handlers=[logging.StreamHandler(sys.stdout)],
force=True,
)
def grouped_fn(pdf):
setup_worker_logging()
logger = logging.getLogger(__name__)
logger.info("visible now")
...
return out_pdf
I hope this helps!
Sarah
3 weeks ago
Hey @sarahbhord
Really thanks for the comprehensive explanation! Yea this shares my 2nd option which I will try on Monday and let you know here! Thanks!
2 weeks ago
Hi @Kirankumarbs,
You have correctly identified the root cause here. When you use applyInPandas, your function runs inside a separate Python worker process on each executor, not in the driver process. The logging configuration you set up on the driver (via logging.basicConfig) does not carry over to those worker processes, because each one starts with a fresh Python interpreter and its own default logging state.
Here is what is happening in detail:
WHY print() WORKS BUT logger.info() DOES NOT
1. print() writes to stdout by default. Spark captures stdout from Python worker processes and surfaces it in the driver output (notebook cell results or job logs).
2. Python's logging module, by default, writes to stderr. Even though Spark also captures stderr, the issue is that inside the worker process the root logger has no handlers configured at all (since your basicConfig call only ran on the driver). With no handlers attached, log messages are simply discarded. You will typically see a one-time warning: "No handlers could be found for logger..." but even that may not appear depending on the Python version.
So it is not that the output is going to the wrong stream. It is that the logger in the worker process has no handlers configured, so the messages are never emitted at all.
YOUR OPTION 3 IS THE RIGHT APPROACH (WITH A SMALL ADJUSTMENT)
Your "Proper Solution" of adding a StreamHandler to stdout is on the right track, but the key point is that this setup needs to happen inside each worker process, not just on the driver. The driver-side configuration does not propagate to workers.
The cleanest pattern is to configure logging at the top of your applyInPandas function (or in a helper that you call at the start of it). Here is the recommended approach:
import logging
import sys
def setup_worker_logging(level=logging.INFO):
"""Call this at the start of any applyInPandas function."""
logger = logging.getLogger()
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"[%(levelname)s] [%(asctime)s] %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
def my_udf(pdf):
setup_worker_logging()
logger = logging.getLogger(__name__)
logger.info("This will now appear in the driver output")
# ... your transformation logic ...
return pdf
result_df = df.groupBy("key").applyInPandas(my_udf, schema=df.schema)
A few notes on this pattern:
- The "if not logger.handlers" check prevents adding duplicate handlers if the same worker process runs the function multiple times (which Spark will do for different groups).
- Writing to sys.stdout rather than sys.stderr ensures the output appears in the same place as print() output, which Databricks surfaces in notebook cells and in the "Standard output" section of job run logs.
- You can also use force=True with logging.basicConfig (Python 3.8+) as you showed in your second option. That works, but repeatedly calling basicConfig with force=True on every invocation is slightly heavier since it removes and re-adds all handlers each time.
FOR PRODUCTION CODE
If you have many UDFs across your codebase, a decorator pattern keeps things tidy:
import functools
import logging
import sys
def with_worker_logging(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger = logging.getLogger()
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
"[%(levelname)s] [%(asctime)s] %(message)s"
))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return func(*args, **kwargs)
return wrapper
@with_worker_logging
def my_udf(pdf):
logger = logging.getLogger(__name__)
logger.info("Visible in driver output")
return pdf
WHERE TO FIND EXECUTOR LOGS
If you also want to review the full executor-side logs (including stderr output), you can:
1. In the Spark UI, click on a stage, then click on an executor's "stderr" link to see the raw Python worker output.
2. If you have cluster log delivery configured (Cluster settings, Logging tab), executor logs are delivered to your configured storage location under a path like <cluster-id>/executor/<executor-id>/stderr.
Documentation reference for cluster log delivery:
https://docs.databricks.com/en/compute/configure.html#cluster-log-delivery
SUMMARY
- The driver's logging config does not propagate to Python worker processes used by applyInPandas.
- Workers start with no logging handlers, so log messages are silently dropped.
- Configure logging inside the UDF function itself (or via a decorator/helper), directing output to sys.stdout.
- The "if not logger.handlers" guard prevents duplicate handlers across repeated calls.
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.