🤖 ROS2 Fundamentos
Objetivo do Módulo
Dominar conceitos core do ROS2: nodes, topics, services e como organizar um sistema robótico.
🏗️ Arquitetura ROS2
Conceitos Principais
[Node 1: Camera] --publish--> /camera/image --> [Node 2: Detector]
|
[Node 3: Controller]
| Conceito | Descrição | Analogia |
|---|---|---|
| Node | Processo executável | App no celular |
| Topic | Canal de comunicação | Canal do YouTube |
| Publisher | Quem envia dados | YouTuber fazendo upload |
| Subscriber | Quem recebe dados | Inscrito assistindo |
| Service | Requisição/Resposta | Chamar API REST |
| Action | Tarefa de longa duração | Upload com barra de progresso |
Por Que ROS2 em Vez de ROS1?
- ✅ ROS2 (2024+)
- ⚠️ ROS1 (Legacy)
Vantagens:
- ✅ Real-time support (DDS middleware)
- ✅ Multi-plataforma (Linux, Windows, macOS)
- ✅ Segurança (criptografia, autenticação)
- ✅ Suporte comercial (Industrial, automotive)
- ✅ Python 3 + C++17
Desvantagens:
- ❌ Comunidade menor (crescendo)
- ❌ Menos pacotes legacy
Usado por:
- Tesla (Optimus)
- BMW (Factory automation)
- Bosch (Industrial robots)
Vantagens:
- ✅ Comunidade enorme
- ✅ Milhares de pacotes
- ✅ Documentação extensa
Desvantagens:
- ❌ Sem suporte real-time
- ❌ Apenas Linux
- ❌ Sem segurança nativa
- ❌ Python 2 (deprecated)
- ❌ EOL em 2025
Não use para projetos novos!
Recomendação: Sempre use ROS2 para projetos novos (2024+)
DDS: O "Cérebro" do ROS2
DDS (Data Distribution Service) é o middleware que faz ROS2 funcionar:
┌─────────────────────────────────────────┐
│ ROS2 Application Layer │
│ (nodes, topics, services, actions) │
├─────────────────────────────────────────┤
│ ROS2 Client Library │
│ (rclpy, rclcpp) │
├─────────────────────────────────────────┤
│ DDS Middleware (RMW) │
│ (CycloneDDS, FastDDS, RTI Connext) │
├─────────────────────────────────────────┤
│ Network Layer (UDP) │
└─────────────────────────────────────────┘
Principais DDS:
- CycloneDDS (Recomendado): Leve, rápido, open-source
- FastDDS (Default no Humble): Completo, mais features
- RTI Connext: Comercial, ultra-confiável
Configurar DDS:
# Usar CycloneDDS (recomendado para humanoides)
echo "export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp" >> ~/.bashrc
source ~/.bashrc
# Verificar DDS ativo
echo $RMW_IMPLEMENTATION
📡 Topics (Publicação/Assinatura)
Conceito: Pub/Sub Pattern
/cmd_vel (Twist)
|
┌───────────────┼───────────────┐
↓ ↓ ↓
[Teleop] [Autonomy] [Safety Monitor]
Publisher Publisher Subscriber
Múltiplos publishers ✅
Múltiplos subscribers ✅
Comunicação assíncrona ✅
Desacoplado (publishers não sabem quem recebe) ✅
Quando usar topics:
- Dados contínuos (câmera @ 30 Hz, IMU @ 200 Hz)
- Múltiplos listeners (broadcast)
- Fire-and-forget (não precisa de resposta)
Publisher (Publicar Dados)
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, '/chatter', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World {self.count}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: {msg.data}')
self.count += 1
def main():
rclpy.init()
node = TalkerNode()
rclpy.spin(node)
Parâmetros importantes:
'/chatter': Nome do topic (sempre começa com/)10: QoS queue size (buffer de mensagens)
Subscriber (Receber Dados)
class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String, '/chatter', self.listener_callback, 10
)
def listener_callback(self, msg):
self.get_logger().info(f'Received: {msg.data}')
QoS (Quality of Service) Profiles
- 🔒 Reliable (Garantido)
- ⚡ Best Effort (Rápido)
- 📷 Sensor Data (Preset)
from rclpy.qos import QoSProfile, ReliabilityPolicy
qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.RELIABLE
)
publisher = node.create_publisher(String, '/critical_data', qos)
Quando usar:
- Comandos críticos
- Configurações
- Dados que não podem ser perdidos
Desvantagens:
- Maior latência
- Retransmissões
from rclpy.qos import QoSProfile, ReliabilityPolicy
qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.BEST_EFFORT
)
publisher = node.create_publisher(Image, '/camera/image', qos)
Quando usar:
- Câmeras (alta frequência)
- Sensores rápidos (IMU, LIDAR)
- Dados onde frame drop é aceitável
Vantagens:
- Baixa latência
- Alto throughput
from rclpy.qos import qos_profile_sensor_data
publisher = node.create_publisher(
Image,
'/camera/image',
qos_profile_sensor_data
)
Configuração automática:
- Best effort
- Volatile (não guarda histórico)
- Depth 10
Exemplo Prático: IMU Publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
from rclpy.qos import qos_profile_sensor_data
import numpy as np
class IMUPublisher(Node):
def __init__(self):
super().__init__('imu_publisher')
# Publisher @ 200 Hz
self.publisher = self.create_publisher(
Imu,
'/imu/data',
qos_profile_sensor_data
)
# Timer: 200 Hz = 5ms
self.timer = self.create_timer(0.005, self.publish_imu)
self.get_logger().info("IMU Publisher started @ 200 Hz")
def publish_imu(self):
msg = Imu()
# Header
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_link'
# Aceleração linear (m/s²)
msg.linear_acceleration.x = np.random.normal(0, 0.01)
msg.linear_acceleration.y = np.random.normal(0, 0.01)
msg.linear_acceleration.z = 9.81 + np.random.normal(0, 0.01)
# Velocidade angular (rad/s)
msg.angular_velocity.x = np.random.normal(0, 0.001)
msg.angular_velocity.y = np.random.normal(0, 0.001)
msg.angular_velocity.z = np.random.normal(0, 0.001)
# Orientação (quaternion)
msg.orientation.w = 1.0
msg.orientation.x = 0.0
msg.orientation.y = 0.0
msg.orientation.z = 0.0
self.publisher.publish(msg)
def main():
rclpy.init()
node = IMUPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Rodar:
# Terminal 1: Rodar publisher
python3 imu_publisher.py
# Terminal 2: Escutar topic
ros2 topic echo /imu/data
# Terminal 3: Ver frequência
ros2 topic hz /imu/data