Blog
Zato's publish/subscribe functionality is accessible from Python code in two distinct ways, each suited to different integration scenarios.
Services running within the Zato platform can use the built-in self.publish
method to send messages directly to topics without any HTTP overhead. This approach provides the most efficient path for broadcasting events from your integration layer to external systems and applications that subscribe to those topics.
External Python applications, on the other hand, interact with the pub/sub system through the REST API. Using standard libraries such as requests
, these applications can publish messages and retrieve messages from their queues over HTTP. This approach enables any Python application - whether it's a web application, a background worker, or a microservice - to participate in your event-driven architecture.
The simplest way to publish a message from a Zato service is to call self.pubsub.publish
with a topic name and the data to send. The system automatically generates all required metadata including message ID, timestamps, and expiration time.
# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Prepare topic and data
topic_name = 'demo.1'
data = {
'order_id': 12345,
'status': 'completed'
}
# Publish a message to a topic
self.pubsub.publish(topic_name, data)
When you need more control over message delivery and identification, you can provide custom metadata:
msg_id
- specify your own message identifier for tracking purposespriority
- controls message ordering (higher values are delivered first)expiration
- sets how long the message remains available (in seconds)cid
- correlation ID that ties the published message to the current service invocation for end-to-end tracing# -*- coding: utf-8 -*-
# Zato
from zato.server.service import Service
class MyService(Service):
def handle(self):
# Prepare topic and data
topic_name = 'demo.1'
data = {
'order_id': 12345,
'status': 'completed'
}
# Prepare metadata
msg_id = 'my-custom-msg-id'
priority = 7
expiration = 3600
# Publish a message with custom metadata
self.pubsub.publish(
topic_name,
data,
msg_id=msg_id,
cid=self.cid,
priority=priority,
expiration=expiration,
)
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
topic_name = 'demo.1'
url = f'{base_url}/pubsub/topic/{topic_name}'
request = {
'data': {
'order_id': 12345,
'status': 'completed'
}
}
response = requests.post(url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
is_ok = result['is_ok']
msg_id = result['msg_id']
logger.info('Published: %s', is_ok)
logger.info('Message ID: %s', msg_id)
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
url = f'{base_url}/pubsub/messages/get'
response = requests.post(url, auth=auth)
result = response.json()
logger.info('Response: %s', result)
message_count = result['message_count']
messages = result['messages']
suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)
for message in messages:
meta = message['meta']
data = message['data']
topic_name = meta['topic_name']
priority = meta['priority']
logger.info('Topic: %s', topic_name)
logger.info('Data: %s', data)
logger.info('Priority: %s', priority)
logger.info('---')
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
from time import sleep
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
topic_name = 'demo.1'
# Publish a message
publish_url = f'{base_url}/pubsub/topic/{topic_name}'
request = {
'data': {
'order_id': 12345,
'status': 'completed'
}
}
response = requests.post(publish_url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
msg_id = result['msg_id']
logger.info('Published: %s', msg_id)
# Wait a moment for message to be routed
sleep(0.5)
# Get messages
get_url = f'{base_url}/pubsub/messages/get'
response = requests.post(get_url, auth=auth)
result = response.json()
logger.info('Response: %s', result)
message_count = result['message_count']
messages = result['messages']
suffix = 'message' if message_count == 1 else 'messages'
logger.info('Retrieved %s %s', message_count, suffix)
for message in messages:
data = message['data']
logger.info('Data: %s', data)
The result will be:
Published: zpsm.20251011-171049-9451-66b0c25a2d900fb5f
Retrieved 1 message
Data: {'order_id': 12345, 'status': 'completed'}
# -*- coding: utf-8 -*-
# stdlib
import logging
from logging import getLogger, INFO
# requests
import requests
logging.basicConfig(level=INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = getLogger(__name__)
base_url = 'http://localhost:11223'
username = 'user.1'
password = 'password.1'
auth = (username, password)
url = f'{base_url}/pubsub/topic/demo.1'
request = {
'data': 'Order processed'
}
response = requests.post(url, json=request, auth=auth)
result = response.json()
logger.info('Response: %s', result)
is_ok = result['is_ok']
if is_ok:
msg_id = result['msg_id']
logger.info('Success: %s', msg_id)
else:
details = result['details']
logger.info('Error: %s', details)
What Event-Driven Architecture is and how it helps in systems integrations
Understanding the concepts of topics that messages are published to and queues that messages are read from
How APIs and systems can communicate with the broker to publish and subscribe to their messages
How to grant and secure access to your topics and message queues