RabbitMQ v Dockeri: Od nuly po prvú message za 15 minút
RabbitMQ v Dockeri: Od nuly po prvú message za 15 minút
"Potrebujem message queue. Rýchlo. Bez komplikácií." 🐰
RabbitMQ je odpoveď.
Prečo RabbitMQ?
RabbitMQ je battle-tested message broker používaný v produkcii tisíckami firiem.
RabbitMQ features:
- ✅ Reliable message delivery
- ✅ Multiple messaging patterns
- ✅ Management UI (built-in)
- ✅ Easy Docker setup
- ✅ Production ready
Poďme na to!
Quick Start: 15 minút od nuly po working queue
Krok 1: Docker Setup (2 minúty)
# Stiahni RabbitMQ image s management UI
docker pull rabbitmq:3-management
# Spusti RabbitMQ container
docker run -d \
--name rabbitmq \
--hostname rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management
# Check že beží
docker ps
# Output:
# CONTAINER ID IMAGE STATUS
# abc123... rabbitmq:3-management Up 10 secondsPorty:
5672- AMQP port (pre aplikácie)15672- Management UI (pre admina)
Verify:
# Check logs
docker logs rabbitmq
# Keď uvidíš toto, je ready:
# Server startup complete; 3 plugins started.
# * rabbitmq_management
# * rabbitmq_prometheus
# * rabbitmq_web_dispatch
# Open Management UI:
# http://localhost:15672
# Login: admin / admin123Total time: 2 minúty!
Krok 2: Python Setup (2 minúty)
# Virtual environment
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# Install pika (RabbitMQ client)
pip install pika
# Verify
python -c "import pika; print('OK')"Krok 3: Basic Connection (1 minúta)
# connection_test.py
import pika
# Connection parameters
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=credentials
)
# Connect
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
print("✅ Connected to RabbitMQ!")
# Close
connection.close()Run:
python connection_test.py
# Output: ✅ Connected to RabbitMQ!Setup Queues & Exchanges
Krok 4: Create Simple Queue (2 minúty)
# setup_simple.py
import pika
def setup_simple_queue():
# Connect
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Declare queue
channel.queue_declare(
queue='hello',
durable=True # Survive broker restart
)
print("✅ Queue 'hello' created!")
connection.close()
if __name__ == '__main__':
setup_simple_queue()Run:
python setup_simple.py
# Output: ✅ Queue 'hello' created!
# Check in Management UI:
# http://localhost:15672/#/queues
# You'll see 'hello' queue!Krok 5: Send First Message (2 minúty)
# producer.py
import pika
def send_message(message):
# Connect
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Ensure queue exists
channel.queue_declare(queue='hello', durable=True)
# Send message
channel.basic_publish(
exchange='', # Default exchange
routing_key='hello', # Queue name
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)
print(f"✅ Sent: {message}")
connection.close()
if __name__ == '__main__':
send_message('Hello RabbitMQ!')Run:
python producer.py
# Output: ✅ Sent: Hello RabbitMQ!
# Check Management UI:
# Queue 'hello' now has 1 message!Krok 6: Receive Message (2 minúty)
# consumer.py
import pika
def callback(ch, method, properties, body):
"""Called when message received"""
print(f"📥 Received: {body.decode()}")
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumer():
# Connect
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Ensure queue exists
channel.queue_declare(queue='hello', durable=True)
# Set QoS (process 1 message at a time)
channel.basic_qos(prefetch_count=1)
# Start consuming
channel.basic_consume(
queue='hello',
on_message_callback=callback
)
print("🚀 Waiting for messages. Press Ctrl+C to exit.")
try:
channel.start_consuming()
except KeyboardInterrupt:
print("\n⛔ Consumer stopped")
channel.stop_consuming()
connection.close()
if __name__ == '__main__':
start_consumer()Run:
# Terminal 1: Start consumer
python consumer.py
# Output: 🚀 Waiting for messages...
# Terminal 2: Send message
python producer.py
# Terminal 1 output:
# 📥 Received: Hello RabbitMQ!
# ✅ First message received!Total time: 10 minút od začiatku!
Exchanges & Routing
Understanding Exchanges
┌──────────┐
│ Producer │
└────┬─────┘
│ publish to exchange
↓
┌────────────┐
│ Exchange │ (routes messages)
└─────┬──────┘
│ binding + routing key
├─────────┬─────────┐
↓ ↓ ↓
Queue1 Queue2 Queue3
↓ ↓ ↓
Consumer1 Consumer2 Consumer3Exchange types:
Direct
Fanout
Topic
Headers
Krok 7: Direct Exchange (3 minúty)
# setup_direct.py
import pika
def setup_direct_exchange():
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Declare exchange
channel.exchange_declare(
exchange='logs_direct',
exchange_type='direct',
durable=True
)
# Declare queues
channel.queue_declare(queue='error_log', durable=True)
channel.queue_declare(queue='info_log', durable=True)
channel.queue_declare(queue='warning_log', durable=True)
# Bind queues to exchange with routing keys
channel.queue_bind(
exchange='logs_direct',
queue='error_log',
routing_key='error'
)
channel.queue_bind(
exchange='logs_direct',
queue='info_log',
routing_key='info'
)
channel.queue_bind(
exchange='logs_direct',
queue='warning_log',
routing_key='warning'
)
print("✅ Direct exchange setup complete!")
print(" Exchange: logs_direct")
print(" Queues: error_log, info_log, warning_log")
connection.close()
if __name__ == '__main__':
setup_direct_exchange()Producer for direct exchange:
# producer_direct.py
import pika
import sys
def send_log(severity, message):
"""
Send log message with severity routing key
severity: 'info', 'warning', or 'error'
"""
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Publish to exchange with routing key
channel.basic_publish(
exchange='logs_direct',
routing_key=severity,
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"✅ Sent [{severity}]: {message}")
connection.close()
if __name__ == '__main__':
# Send different severity logs
send_log('info', 'User logged in')
send_log('warning', 'Low disk space')
send_log('error', 'Database connection failed')Consumer for specific severity:
# consumer_direct.py
import pika
import sys
def callback(ch, method, properties, body):
print(f"📥 [{method.routing_key}]: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_logs(severity):
"""
Consume logs of specific severity
severity: 'info', 'warning', or 'error'
"""
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
queue_name = f'{severity}_log'
print(f"🚀 Consuming {severity} logs...")
print(" Press Ctrl+C to exit")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
try:
channel.start_consuming()
except KeyboardInterrupt:
print(f"\n⛔ {severity} consumer stopped")
channel.stop_consuming()
connection.close()
if __name__ == '__main__':
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
consume_logs(severity)Test:
# Setup
python setup_direct.py
# Terminal 1: Consume errors only
python consumer_direct.py error
# Terminal 2: Consume info only
python consumer_direct.py info
# Terminal 3: Send logs
python producer_direct.py
# Terminal 1 sees only errors
# Terminal 2 sees only info
# Perfect routing! ✅Krok 8: Fanout Exchange (2 minúty)
# setup_fanout.py
import pika
def setup_fanout_exchange():
"""
Fanout = broadcast to all queues
Use case: Notifications to all services
"""
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Declare fanout exchange
channel.exchange_declare(
exchange='notifications',
exchange_type='fanout',
durable=True
)
# Declare queues for different services
channel.queue_declare(queue='email_service', durable=True)
channel.queue_declare(queue='sms_service', durable=True)
channel.queue_declare(queue='push_service', durable=True)
# Bind all queues (no routing key needed for fanout)
channel.queue_bind(exchange='notifications', queue='email_service')
channel.queue_bind(exchange='notifications', queue='sms_service')
channel.queue_bind(exchange='notifications', queue='push_service')
print("✅ Fanout exchange setup complete!")
print(" Exchange: notifications")
print(" Queues: email_service, sms_service, push_service")
print(" All queues receive ALL messages!")
connection.close()
if __name__ == '__main__':
setup_fanout_exchange()Producer:
# producer_fanout.py
import pika
def broadcast_notification(message):
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Publish to fanout exchange (routing key ignored)
channel.basic_publish(
exchange='notifications',
routing_key='', # Ignored for fanout
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"📢 Broadcasted: {message}")
connection.close()
if __name__ == '__main__':
broadcast_notification('New order received: #12345')Test:
# Setup
python setup_fanout.py
# Terminal 1: Email service
python consumer_direct.py email_service
# Terminal 2: SMS service
python consumer_direct.py sms_service
# Terminal 3: Push service
python consumer_direct.py push_service
# Terminal 4: Broadcast
python producer_fanout.py
# ALL consumers receive the message! 📢Krok 9: Topic Exchange (3 minúty)
# setup_topic.py
import pika
def setup_topic_exchange():
"""
Topic exchange with pattern matching
Routing key pattern: <category>.<severity>.<source>
Examples:
- user.error.api
- user.info.web
- order.warning.payment
Wildcards:
* = exactly one word
# = zero or more words
"""
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
# Declare topic exchange
channel.exchange_declare(
exchange='logs_topic',
exchange_type='topic',
durable=True
)
# Declare queues
channel.queue_declare(queue='all_errors', durable=True)
channel.queue_declare(queue='user_logs', durable=True)
channel.queue_declare(queue='payment_logs', durable=True)
# Bind with patterns
# All errors from any source
channel.queue_bind(
exchange='logs_topic',
queue='all_errors',
routing_key='*.error.*'
)
# All user-related logs
channel.queue_bind(
exchange='logs_topic',
queue='user_logs',
routing_key='user.#'
)
# All payment logs
channel.queue_bind(
exchange='logs_topic',
queue='payment_logs',
routing_key='order.*.payment'
)
print("✅ Topic exchange setup complete!")
print(" Exchange: logs_topic")
print(" Patterns:")
print(" *.error.* → all_errors")
print(" user.# → user_logs")
print(" order.*.payment → payment_logs")
connection.close()
if __name__ == '__main__':
setup_topic_exchange()Producer:
# producer_topic.py
import pika
def send_log(routing_key, message):
credentials = pika.PlainCredentials('admin', 'admin123')
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
channel = connection.channel()
channel.basic_publish(
exchange='logs_topic',
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
print(f"✅ Sent [{routing_key}]: {message}")
connection.close()
if __name__ == '__main__':
# Send various logs
send_log('user.error.api', 'User authentication failed')
send_log('user.info.web', 'User logged in')
send_log('order.warning.payment', 'Payment processing slow')
send_log('order.error.payment', 'Payment failed')
# Routing:
# user.error.api → all_errors + user_logs
# user.info.web → user_logs only
# order.warning.payment → payment_logs only
# order.error.payment → all_errors + payment_logsDocker Compose Setup
Production-Ready Setup
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
hostname: rabbitmq
restart: unless-stopped
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD:-admin123}
RABBITMQ_DEFAULT_VHOST: /
volumes:
# Persist data
- rabbitmq_data:/var/lib/rabbitmq
# Custom config
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 10s
retries: 3
networks:
- app_network
volumes:
rabbitmq_data:
networks:
app_network:
driver: bridgeCustom config:
# rabbitmq.conf
# Limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
# Clustering (optional)
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
# Management
management.tcp.port = 15672
# Logging
log.file.level = info
log.console.level = info
# Queue settings
queue_master_locator = min-mastersRun:
# Start
docker-compose up -d
# Check logs
docker-compose logs -f rabbitmq
# Stop
docker-compose down
# Stop and remove volumes
docker-compose down -vHelper Scripts
Connection Helper
# rabbitmq_helper.py
import pika
import os
from contextlib import contextmanager
class RabbitMQHelper:
"""Helper class for RabbitMQ operations"""
def __init__(self, host='localhost', port=5672, user='admin', password='admin123'):
self.host = host
self.port = port
self.user = user
self.password = password
def get_connection(self):
"""Get RabbitMQ connection"""
credentials = pika.PlainCredentials(self.user, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300
)
return pika.BlockingConnection(parameters)
@contextmanager
def get_channel(self):
"""Context manager for channel"""
connection = self.get_connection()
channel = connection.channel()
try:
yield channel
finally:
connection.close()
def declare_queue(self, queue_name, durable=True):
"""Declare queue"""
with self.get_channel() as channel:
channel.queue_declare(queue=queue_name, durable=durable)
print(f"✅ Queue '{queue_name}' declared")
def declare_exchange(self, exchange_name, exchange_type='direct', durable=True):
"""Declare exchange"""
with self.get_channel() as channel:
channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=durable
)
print(f"✅ Exchange '{exchange_name}' ({exchange_type}) declared")
def bind_queue(self, exchange, queue, routing_key=''):
"""Bind queue to exchange"""
with self.get_channel() as channel:
channel.queue_bind(
exchange=exchange,
queue=queue,
routing_key=routing_key
)
print(f"✅ Queue '{queue}' bound to '{exchange}' with key '{routing_key}'")
def purge_queue(self, queue_name):
"""Delete all messages from queue"""
with self.get_channel() as channel:
channel.queue_purge(queue=queue_name)
print(f"✅ Queue '{queue_name}' purged")
# Usage
if __name__ == '__main__':
helper = RabbitMQHelper()
# Declare queue
helper.declare_queue('test_queue')
# Declare exchange
helper.declare_exchange('test_exchange', 'topic')
# Bind
helper.bind_queue('test_exchange', 'test_queue', 'test.#')Management CLI
# manage_rabbitmq.py
import pika
import sys
class RabbitMQManager:
def __init__(self):
credentials = pika.PlainCredentials('admin', 'admin123')
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, credentials=credentials)
)
self.channel = self.connection.channel()
def list_queues(self):
"""List all queues"""
# Note: This is limited, better use Management API
print("Use Management UI for full list:")
print("http://localhost:15672/#/queues")
def get_queue_info(self, queue_name):
"""Get queue information"""
try:
queue = self.channel.queue_declare(queue=queue_name, passive=True)
print(f"Queue: {queue_name}")
print(f" Messages: {queue.method.message_count}")
print(f" Consumers: {queue.method.consumer_count}")
except Exception as e:
print(f"❌ Error: {e}")
def purge_queue(self, queue_name):
"""Delete all messages"""
try:
self.channel.queue_purge(queue=queue_name)
print(f"✅ Queue '{queue_name}' purged")
except Exception as e:
print(f"❌ Error: {e}")
def delete_queue(self, queue_name):
"""Delete queue"""
try:
self.channel.queue_delete(queue=queue_name)
print(f"✅ Queue '{queue_name}' deleted")
except Exception as e:
print(f"❌ Error: {e}")
def close(self):
self.connection.close()
# CLI
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage:")
print(" python manage_rabbitmq.py list")
print(" python manage_rabbitmq.py info <queue_name>")
print(" python manage_rabbitmq.py purge <queue_name>")
print(" python manage_rabbitmq.py delete <queue_name>")
sys.exit(1)
manager = RabbitMQManager()
command = sys.argv[1]
if command == 'list':
manager.list_queues()
elif command == 'info' and len(sys.argv) == 3:
manager.get_queue_info(sys.argv[2])
elif command == 'purge' and len(sys.argv) == 3:
manager.purge_queue(sys.argv[2])
elif command == 'delete' and len(sys.argv) == 3:
manager.delete_queue(sys.argv[2])
else:
print("❌ Invalid command")
manager.close()Troubleshooting
Common Issues
Issue 1: Connection refused
# Check if RabbitMQ is running
docker ps | grep rabbitmq
# Check logs
docker logs rabbitmq
# Restart
docker restart rabbitmqIssue 2: Authentication failed
# Wrong credentials
credentials = pika.PlainCredentials('wrong', 'password')
# Error: ACCESS_REFUSED
# Fix: Use correct credentials
credentials = pika.PlainCredentials('admin', 'admin123')Issue 3: Queue not found
# Declare queue before using
channel.queue_declare(queue='my_queue', durable=True)
# Or use passive=True to check if exists
try:
channel.queue_declare(queue='my_queue', passive=True)
except:
print("Queue doesn't exist!")Issue 4: Messages not received
# Missing ACK
def callback(ch, method, properties, body):
print(body)
# Must ACK!
ch.basic_ack(delivery_tag=method.delivery_tag)
# Or auto_ack=True
channel.basic_consume(
queue='my_queue',
on_message_callback=callback,
auto_ack=True # Auto acknowledge
)Záver
RabbitMQ setup za 15 minút:
- ✅ Docker container (2 min)
- ✅ Python client (2 min)
- ✅ First message (5 min)
- ✅ Exchanges & routing (6 min)
What you learned:
- Simple queue (FIFO)
- Direct exchange (routing key)
- Fanout exchange (broadcast)
- Topic exchange (patterns)
- Docker setup
- Management
Production ready:
- Docker Compose
- Persistent volumes
- Health checks
- Custom config
Tutorial napísal developer ktorý potreboval message queue a našiel RabbitMQ. Sometimes the best solution is the battle-tested one. 🐰
P.S.: Management UI je game changer. Môžeš vidieť všetko - queues, exchanges, messages, connections. Try it: http://localhost:15672 🎨