Pular para o conteúdo principal

🤖 ROS2 Fundamentos

Objetivo do Módulo

Dominar conceitos core do ROS2: nodes, topics, services e como organizar um sistema robótico.


🏗️ Arquitetura ROS2

Conceitos Principais

[Node 1: Camera] --publish--> /camera/image --> [Node 2: Detector]
|
[Node 3: Controller]
ConceitoDescriçãoAnalogia
NodeProcesso executávelApp no celular
TopicCanal de comunicaçãoCanal do YouTube
PublisherQuem envia dadosYouTuber fazendo upload
SubscriberQuem recebe dadosInscrito assistindo
ServiceRequisição/RespostaChamar API REST
ActionTarefa de longa duraçãoUpload com barra de progresso

Por Que ROS2 em Vez de ROS1?

Vantagens:

  • ✅ Real-time support (DDS middleware)
  • ✅ Multi-plataforma (Linux, Windows, macOS)
  • ✅ Segurança (criptografia, autenticação)
  • ✅ Suporte comercial (Industrial, automotive)
  • ✅ Python 3 + C++17

Desvantagens:

  • ❌ Comunidade menor (crescendo)
  • ❌ Menos pacotes legacy

Usado por:

  • Tesla (Optimus)
  • BMW (Factory automation)
  • Bosch (Industrial robots)

Recomendação: Sempre use ROS2 para projetos novos (2024+)


DDS: O "Cérebro" do ROS2

DDS (Data Distribution Service) é o middleware que faz ROS2 funcionar:

┌─────────────────────────────────────────┐
│ ROS2 Application Layer │
│ (nodes, topics, services, actions) │
├─────────────────────────────────────────┤
│ ROS2 Client Library │
│ (rclpy, rclcpp) │
├─────────────────────────────────────────┤
│ DDS Middleware (RMW) │
│ (CycloneDDS, FastDDS, RTI Connext) │
├─────────────────────────────────────────┤
│ Network Layer (UDP) │
└─────────────────────────────────────────┘

Principais DDS:

  • CycloneDDS (Recomendado): Leve, rápido, open-source
  • FastDDS (Default no Humble): Completo, mais features
  • RTI Connext: Comercial, ultra-confiável

Configurar DDS:

# Usar CycloneDDS (recomendado para humanoides)
echo "export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp" >> ~/.bashrc
source ~/.bashrc

# Verificar DDS ativo
echo $RMW_IMPLEMENTATION

📡 Topics (Publicação/Assinatura)

Conceito: Pub/Sub Pattern

              /cmd_vel (Twist)
|
┌───────────────┼───────────────┐
↓ ↓ ↓
[Teleop] [Autonomy] [Safety Monitor]
Publisher Publisher Subscriber

Múltiplos publishers ✅
Múltiplos subscribers ✅
Comunicação assíncrona ✅
Desacoplado (publishers não sabem quem recebe) ✅

Quando usar topics:

  • Dados contínuos (câmera @ 30 Hz, IMU @ 200 Hz)
  • Múltiplos listeners (broadcast)
  • Fire-and-forget (não precisa de resposta)

Publisher (Publicar Dados)

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class TalkerNode(Node):
def __init__(self):
super().__init__('talker')
self.publisher = self.create_publisher(String, '/chatter', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0

def timer_callback(self):
msg = String()
msg.data = f'Hello World {self.count}'
self.publisher.publish(msg)
self.get_logger().info(f'Publishing: {msg.data}')
self.count += 1

def main():
rclpy.init()
node = TalkerNode()
rclpy.spin(node)

Parâmetros importantes:

  • '/chatter': Nome do topic (sempre começa com /)
  • 10: QoS queue size (buffer de mensagens)

Subscriber (Receber Dados)

class ListenerNode(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(
String, '/chatter', self.listener_callback, 10
)

def listener_callback(self, msg):
self.get_logger().info(f'Received: {msg.data}')

QoS (Quality of Service) Profiles

from rclpy.qos import QoSProfile, ReliabilityPolicy

qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.RELIABLE
)

publisher = node.create_publisher(String, '/critical_data', qos)

Quando usar:

  • Comandos críticos
  • Configurações
  • Dados que não podem ser perdidos

Desvantagens:

  • Maior latência
  • Retransmissões

Exemplo Prático: IMU Publisher

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
from rclpy.qos import qos_profile_sensor_data
import numpy as np

class IMUPublisher(Node):
def __init__(self):
super().__init__('imu_publisher')

# Publisher @ 200 Hz
self.publisher = self.create_publisher(
Imu,
'/imu/data',
qos_profile_sensor_data
)

# Timer: 200 Hz = 5ms
self.timer = self.create_timer(0.005, self.publish_imu)

self.get_logger().info("IMU Publisher started @ 200 Hz")

def publish_imu(self):
msg = Imu()

# Header
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_link'

# Aceleração linear (m/s²)
msg.linear_acceleration.x = np.random.normal(0, 0.01)
msg.linear_acceleration.y = np.random.normal(0, 0.01)
msg.linear_acceleration.z = 9.81 + np.random.normal(0, 0.01)

# Velocidade angular (rad/s)
msg.angular_velocity.x = np.random.normal(0, 0.001)
msg.angular_velocity.y = np.random.normal(0, 0.001)
msg.angular_velocity.z = np.random.normal(0, 0.001)

# Orientação (quaternion)
msg.orientation.w = 1.0
msg.orientation.x = 0.0
msg.orientation.y = 0.0
msg.orientation.z = 0.0

self.publisher.publish(msg)

def main():
rclpy.init()
node = IMUPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Rodar:

# Terminal 1: Rodar publisher
python3 imu_publisher.py

# Terminal 2: Escutar topic
ros2 topic echo /imu/data

# Terminal 3: Ver frequência
ros2 topic hz /imu/data

🔧 Services (Requisição/Resposta)

Conceito: Request-Response Pattern

Client                    Server
| |
|------- Request --------->|
| (a=5, b=3) |
| | Processa
|<------ Response ---------|
| (sum=8) |
| |

Quando usar services:

  • Operações síncronas (espera resposta)
  • Configuração (set parameter, load map)
  • Queries (get position, check status)
  • Operações rápidas (<1s)

Não use services para:

  • Dados contínuos → Use topics
  • Operações longas (>1s) → Use actions

Server

from example_interfaces.srv import AddTwoInts

class AddServer(Node):
def __init__(self):
super().__init__('add_server')
self.srv = self.create_service(
AddTwoInts, '/add_two_ints', self.add_callback
)
self.get_logger().info("Service /add_two_ints ready")

def add_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'{request.a} + {request.b} = {response.sum}')
return response

Client (Síncrono)

class AddClient(Node):
def __init__(self):
super().__init__('add_client')
self.client = self.create_client(AddTwoInts, '/add_two_ints')

# Esperar servidor ficar disponível
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for service...')

def send_request(self, a: int, b: int) -> int:
request = AddTwoInts.Request()
request.a = a
request.b = b

future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)

if future.result() is not None:
return future.result().sum
else:
self.get_logger().error('Service call failed')
return None

# Uso
node = AddClient()
result = node.send_request(5, 3)
print(f"Result: {result}") # 8

Client (Assíncrono com Async/Await)

import asyncio
from rclpy.executors import MultiThreadedExecutor

class AsyncAddClient(Node):
def __init__(self):
super().__init__('async_add_client')
self.client = self.create_client(AddTwoInts, '/add_two_ints')

async def send_request_async(self, a: int, b: int) -> int:
# Esperar serviço
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for service...')

request = AddTwoInts.Request()
request.a = a
request.b = b

# Chamar de forma assíncrona
future = self.client.call_async(request)

# Esperar resultado sem bloquear
while not future.done():
await asyncio.sleep(0.01)

return future.result().sum

# Uso com asyncio
async def main():
rclpy.init()
node = AsyncAddClient()

executor = MultiThreadedExecutor()
executor.add_node(node)

# Rodar executor em background
executor_thread = threading.Thread(target=executor.spin, daemon=True)
executor_thread.start()

# Fazer múltiplas chamadas em paralelo
results = await asyncio.gather(
node.send_request_async(1, 2),
node.send_request_async(3, 4),
node.send_request_async(5, 6),
)

print(f"Results: {results}") # [3, 7, 11]

executor.shutdown()
rclpy.shutdown()

asyncio.run(main())

Exemplo Prático: Robot Status Service

from std_srvs.srv import Trigger
from geometry_msgs.msg import Pose

class RobotStatusService(Node):
def __init__(self):
super().__init__('robot_status_service')

# Service: Get robot status
self.status_srv = self.create_service(
Trigger,
'/robot/get_status',
self.get_status_callback
)

# Estado interno do robô
self.battery = 85.0
self.position = [1.5, 2.0, 0.0]
self.is_walking = False

self.get_logger().info("Robot Status Service ready")

def get_status_callback(self, request, response):
# Trigger não tem request, apenas response

status_msg = f"""
Robot Status:
- Battery: {self.battery:.1f}%
- Position: x={self.position[0]}, y={self.position[1]}, z={self.position[2]}
- Walking: {self.is_walking}
"""

response.success = True
response.message = status_msg

return response

# Client
client = node.create_client(Trigger, '/robot/get_status')
client.wait_for_service()

request = Trigger.Request()
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)

result = future.result()
print(result.message)

🤖 Exemplo Prático: Controle de Robô

Sistema Completo com Topics e Services

from geometry_msgs.msg import Twist
from std_msgs.msg import String
from std_srvs.srv import SetBool
import numpy as np

class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')

# Publisher: Velocidade do robô
self.vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)

# Subscriber: Comandos de voz
self.voice_sub = self.create_subscription(
String, '/voice_command', self.voice_callback, 10
)

# Subscriber: Joystick
self.joy_sub = self.create_subscription(
Twist, '/joy/cmd_vel', self.joy_callback, 10
)

# Service: Enable/disable robot
self.enable_srv = self.create_service(
SetBool,
'/robot/enable',
self.enable_callback
)

# Estado
self.enabled = False
self.max_linear_speed = 1.0 # m/s
self.max_angular_speed = 1.0 # rad/s

self.get_logger().info("Robot Controller initialized")

def voice_callback(self, msg):
"""Processar comando de voz"""
if not self.enabled:
self.get_logger().warn("Robot disabled, ignoring command")
return

cmd = msg.data.lower()

twist = Twist()
if 'forward' in cmd or 'frente' in cmd:
twist.linear.x = self.max_linear_speed
elif 'back' in cmd or 'trás' in cmd:
twist.linear.x = -self.max_linear_speed
elif 'left' in cmd or 'esquerda' in cmd:
twist.angular.z = self.max_angular_speed
elif 'right' in cmd or 'direita' in cmd:
twist.angular.z = -self.max_angular_speed
elif 'stop' in cmd or 'parar' in cmd:
twist.linear.x = 0.0
twist.angular.z = 0.0
else:
self.get_logger().warn(f"Unknown command: {cmd}")
return

self.vel_pub.publish(twist)
self.get_logger().info(f'Executing: {cmd}')

def joy_callback(self, msg):
"""Processar joystick (prioridade sobre voz)"""
if not self.enabled:
return

# Limitar velocidades
twist = Twist()
twist.linear.x = np.clip(
msg.linear.x,
-self.max_linear_speed,
self.max_linear_speed
)
twist.angular.z = np.clip(
msg.angular.z,
-self.max_angular_speed,
self.max_angular_speed
)

self.vel_pub.publish(twist)

def enable_callback(self, request, response):
"""Service para ativar/desativar robô"""
self.enabled = request.data

if self.enabled:
self.get_logger().info("🟢 Robot ENABLED")
response.success = True
response.message = "Robot enabled successfully"
else:
self.get_logger().info("🔴 Robot DISABLED")
# Parar robô
self.vel_pub.publish(Twist())
response.success = True
response.message = "Robot disabled, stopping motors"

return response

def main():
rclpy.init()
node = RobotController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

Testar sistema:

# Terminal 1: Rodar controller
python3 robot_controller.py

# Terminal 2: Ativar robô
ros2 service call /robot/enable std_srvs/srv/SetBool "{data: true}"

# Terminal 3: Enviar comando de voz
ros2 topic pub /voice_command std_msgs/msg/String "{data: 'forward'}"

# Terminal 4: Monitorar velocidade
ros2 topic echo /cmd_vel

🔍 Ferramentas CLI Essenciais

Inspecionar Sistema

# Listar todos os nodes ativos
ros2 node list

# Info detalhada de um node
ros2 node info /robot_controller

# Output:
# /robot_controller
# Subscribers:
# /voice_command: std_msgs/msg/String
# /joy/cmd_vel: geometry_msgs/msg/Twist
# Publishers:
# /cmd_vel: geometry_msgs/msg/Twist
# Services:
# /robot/enable: std_srvs/srv/SetBool

RQT: Interface Gráfica

# Instalar RQT tools
sudo apt install ros-humble-rqt-*

# Graph: Visualizar nodes e topics
rqt_graph

# Console: Ver logs
rqt_console

# Plot: Graficar dados em tempo real
rqt_plot /imu/data/angular_velocity/z

# Reconfigure: Mudar parâmetros dinamicamente
rqt_reconfigure

📦 Criar Pacote ROS2

Estrutura de Workspace

humanoid_ws/
├── src/
│ ├── my_robot_controller/
│ │ ├── my_robot_controller/
│ │ │ ├── __init__.py
│ │ │ ├── robot_controller.py
│ │ │ └── imu_publisher.py
│ │ ├── package.xml
│ │ ├── setup.py
│ │ └── setup.cfg
│ └── my_robot_interfaces/ # Custom messages
│ ├── msg/
│ ├── srv/
│ └── action/
├── build/
├── install/
└── log/

Criar Pacote Python

cd ~/humanoid_ws/src

# Criar pacote
ros2 pkg create my_robot_controller \
--build-type ament_python \
--dependencies rclpy geometry_msgs sensor_msgs

cd my_robot_controller

Editar setup.py

from setuptools import setup

package_name = 'my_robot_controller'

setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='you@email.com',
description='Robot controller package',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'robot_controller = my_robot_controller.robot_controller:main',
'imu_publisher = my_robot_controller.imu_publisher:main',
],
},
)

Build e Run

cd ~/humanoid_ws

# Build pacote
colcon build --packages-select my_robot_controller

# Source workspace
source install/setup.bash

# Rodar node
ros2 run my_robot_controller robot_controller

🚀 Exemplo do Mundo Real: Boston Dynamics Spot

"""
Arquitetura simplificada inspirada no Spot (Boston Dynamics)
Fonte: Spot SDK documentation
"""

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from sensor_msgs.msg import JointState, Imu
from std_srvs.srv import Trigger
import numpy as np

class SpotController(Node):
"""
Controlador simplificado similar ao Spot

Arquitetura:
- 12 motores (3 por perna: hip, shoulder, knee)
- IMU @ 200 Hz
- Controle de velocidade
- Modos: Stand, Walk, Sit
"""

def __init__(self):
super().__init__('spot_controller')

# Publishers
self.joint_pub = self.create_publisher(
JointState, '/joint_states', 10
)

# Subscribers
self.cmd_vel_sub = self.create_subscription(
Twist, '/cmd_vel', self.cmd_vel_callback, 10
)

self.imu_sub = self.create_subscription(
Imu, '/imu/data', self.imu_callback, 10
)

# Services
self.stand_srv = self.create_service(
Trigger, '/spot/stand', self.stand_callback
)

self.sit_srv = self.create_service(
Trigger, '/spot/sit', self.sit_callback
)

# Estado
self.mode = "sit" # sit, stand, walk
self.joint_positions = np.zeros(12)
self.target_velocity = Twist()
self.imu_data = None

# Control loop @ 100 Hz
self.control_timer = self.create_timer(0.01, self.control_loop)

self.get_logger().info("Spot Controller initialized")

def cmd_vel_callback(self, msg):
"""Receber comando de velocidade"""
if self.mode != "walk":
self.get_logger().warn("Not in walk mode, ignoring cmd_vel")
return

self.target_velocity = msg

def imu_callback(self, msg):
"""Receber dados do IMU"""
self.imu_data = msg

def control_loop(self):
"""Loop principal de controle @ 100 Hz"""
if self.mode == "walk":
# Gait controller
self.update_walking_gait()
elif self.mode == "stand":
# Balance controller
self.update_standing_balance()
elif self.mode == "sit":
# Sitting position (low power)
self.update_sitting_position()

# Publicar estados das juntas
self.publish_joint_states()

def update_walking_gait(self):
"""Atualizar gait de caminhada"""
# Simplified trot gait
t = self.get_clock().now().nanoseconds * 1e-9
phase = (t * 2.0) % (2 * np.pi) # 2 Hz gait

# Diagonal pairs move together
# FL & RR together, FR & RL together

for i in range(12):
if i in [0, 1, 2, 6, 7, 8]: # FL & RR
self.joint_positions[i] = 0.3 * np.sin(phase)
else: # FR & RL
self.joint_positions[i] = 0.3 * np.sin(phase + np.pi)

def update_standing_balance(self):
"""Usar IMU para manter equilíbrio"""
if self.imu_data is None:
return

# Simplified balance control
# Em Spot real: usa MPC (Model Predictive Control)
roll = 0.0 # Extract from IMU quaternion
pitch = 0.0

# Compensar inclinação ajustando juntas
compensation = np.array([roll, pitch, 0.0] * 4) # 4 pernas
self.joint_positions = compensation

def update_sitting_position(self):
"""Posição sentada (baixo consumo)"""
# Pernas dobradas
self.joint_positions = np.array([
0.0, -1.2, 2.4, # FL
0.0, -1.2, 2.4, # FR
0.0, -1.2, 2.4, # RL
0.0, -1.2, 2.4, # RR
])

def publish_joint_states(self):
"""Publicar estado atual das juntas"""
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()

msg.name = [
'fl_hip', 'fl_shoulder', 'fl_knee',
'fr_hip', 'fr_shoulder', 'fr_knee',
'rl_hip', 'rl_shoulder', 'rl_knee',
'rr_hip', 'rr_shoulder', 'rr_knee',
]

msg.position = self.joint_positions.tolist()

self.joint_pub.publish(msg)

def stand_callback(self, request, response):
"""Service: Levantar"""
self.mode = "stand"
self.get_logger().info("🟢 Standing up")

response.success = True
response.message = "Robot is now standing"
return response

def sit_callback(self, request, response):
"""Service: Sentar"""
self.mode = "sit"
self.get_logger().info("🔵 Sitting down")

response.success = True
response.message = "Robot is now sitting"
return response

def main():
rclpy.init()
node = SpotController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Conceitos usados:

  • Múltiplos publishers/subscribers
  • Services para comandos
  • Control loop de alta frequência
  • Máquina de estados (sit/stand/walk)

🔧 Troubleshooting Comum

Problema 1: Nodes não se comunicam

Sintoma:

ros2 topic echo /cmd_vel
# (nada aparece)

Diagnóstico:

# Verificar se há publisher
ros2 topic info /cmd_vel
# Publisher count: 0 ← Problema!

# Verificar DDS
echo $RMW_IMPLEMENTATION
# (vazio) ← Problema!

Solução:

# Configurar DDS
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp

# Ou adicionar permanentemente
echo "export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp" >> ~/.bashrc

Problema 2: QoS Incompatível

Erro:

[WARN] New subscription discovered on topic '/camera/image', requesting incompatible QoS.

Causa: Publisher usa RELIABLE, Subscriber usa BEST_EFFORT (ou vice-versa)

Solução:

# Publisher e Subscriber devem usar mesmo QoS
from rclpy.qos import qos_profile_sensor_data

# Ambos usam sensor_data profile
pub = node.create_publisher(Image, '/camera/image', qos_profile_sensor_data)
sub = node.create_subscription(Image, '/camera/image', callback, qos_profile_sensor_data)

Problema 3: Mensagens atrasadas

Sintoma: Delay de 1-2 segundos entre publicação e recepção

Causas comuns:

  1. QoS depth muito pequeno
  2. Callback lento bloqueando
  3. CPU sobrecarregada

Soluções:

# 1. Aumentar queue size
pub = node.create_publisher(Image, '/camera', 100) # Era 10

# 2. Usar MultiThreadedExecutor
from rclpy.executors import MultiThreadedExecutor

executor = MultiThreadedExecutor()
executor.add_node(node)
executor.spin()

# 3. Mover processamento pesado para thread separada
import threading

def heavy_processing(data):
# Processar em thread separada
pass

def callback(msg):
# Callback rápido, apenas dispara thread
threading.Thread(target=heavy_processing, args=(msg,)).start()

📊 Performance Tips

Otimizar Throughput

TécnicaGanhoQuando Usar
Usar Best Effort QoS2-5xCâmera, LIDAR
Aumentar queue size1.5xBurst traffic
Shared memory10xMesma máquina
Compression3-5xRede limitada
MultiThreadedExecutor2-3xMúltiplos callbacks

Exemplo: Shared Memory Transport

# Ativar shared memory (apenas mesmo host)
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI='<CycloneDDS><Domain><General><Transport><SharedMemory>true</SharedMemory></Transport></General></Domain></CycloneDDS>'

✅ Checklist de Conclusão

  • Entendo diferença entre topics, services e actions
  • Sei criar publishers e subscribers
  • Consigo chamar services (sync e async)
  • Entendo QoS profiles (Reliable vs Best Effort)
  • Sei usar ferramentas CLI (ros2 topic, node, service)
  • Consigo criar pacote ROS2 do zero
  • Entendo arquitetura de sistema robótico real
  • Sei debugar problemas comuns de comunicação

🔗 Próximos Passos

Próximo Módulo

🚀 ROS2 Avançado →

Actions, lifecycle nodes, parameters e TF2.


⏱️ Tempo estimado: 30-35 min 🧠 Nível: Intermediário 💻 Hands-on: 75% prático, 25% teórico