Zerobus Ingest, part of Lakeflow Connect simplifies push-based data ingestion, making it easier to move data from various sources to a centralized analytical platform. This is especially beneficial in manufacturing, where agents on isolated on-premise networks can forward data from message queues like RabbitMQ to Delta tables in your lakehouse. 
This blog will demonstrate how to create and deploy lightweight agents to subscribe to sources like RabbitMQ and push this data to Databricks via Zerobus Ingest, transforming it into a Delta table.
Zerobus Ingest is designed to act as a bridge, making it incredibly straightforward to connect various data sources to your Lakehouse. Since Zerobus Ingest is a collection of APIs, it unlocks the creation of "customized forwarding agents"—small, efficient processes that subscribe to data streams and forward that data to Databricks. These agents are remarkably flexible and can be deployed in diverse environments, whether on-premises or in the cloud, with minimal compute requirements. 

Once Zerobus Ingest receives your data, it automatically handles the materialization of this data into Delta tables, ensuring a consistent and optimized format for your analytics.
RabbitMQ provides a robust and reliable way to handle messages, acting as an intermediary between publishers and consumers. Imagine a scenario where various applications publish events or data updates to a RabbitMQ. With Zerobus Ingest, you can easily create a simple RabbitMQ subscriber, a forwarding agent that listens to these messages.
Check out this example of a simple Python RabbitMQ subscriber that publishes data to Databricks.
→https://github.com/hntd187/zerobus-examples/tree/master/python/rabbitmq_example
 
Once you have your Zerobus Ingest and RabbitMQ configuration, ingestion is quite simple. In less than 50 lines of code, we have an agent built for data forwarding. Check it out yourself. 
→ https://github.com/hntd187/zerobus-examples/blob/master/python/rabbitmq_example/main.py#L23
def main(workspace_url, zerobus_endpoint, token, table_name, rabbitmq_host, rabbitmq_queue):
    sdk_handle = ZerobusSdk(zerobus_endpoint, workspace_url, token)
    table_properties = TableProperties(table_name, AirQuality.DESCRIPTOR)
    connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
    channel = connection.channel()
    stream = sdk_handle.create_stream(table_properties)
    def msg_callback(ch, method, props, body):
        msg: AirQuality = Parse(body, AirQuality())
        print(f"Got msg: {msg}")
        stream.ingest_record(msg).wait_for_ack()
    channel.basic_consume(queue=rabbitmq_queue, on_message_callback=msg_callback, auto_ack=True)
    channel.start_consuming()
    stream.close()
Ingestion doesn’t need to be hard. With the right tools, it can be as simple as stream.ingest_record(msg). 
 
More in the Zerobus Ingest series: The RabbitMQ Forwarder is just one example of how Zerobus Ingest simplifies connecting operational data to your Lakehouse. The same APIs that bridge message queues can also handle direct writes and custom HTTP endpoints: 
- Databricks Direct Write App – Push telemetry data to Delta Lake in real time using the high-performance Zerobus Ingest Direct Write API
- Zerobus Ingest Station – Build custom REST ingestion endpoints with validation and transformation logic, powered by the Zerobus SDK