🤖 ROS2: Middleware para Robótica Real
O sistema operacional dos robôs: pub/sub, TF2, Nav2 e integração com VLA. ROS2 é o padrão da indústria para robótica de produção — entender sua arquitetura é obrigatório para qualquer deploy real.
Arquitetura ROS2: nós publicam no barramento DDS; o nó VLA Inference recebe sensores e publica ações para o hardware.
ROS2 Humble/Iron: Arquitetura DDS
O "Linux da robótica" reinventado com middleware distribuído e sem master node. DDS garante comunicação real-time e descoberta automática de nós em qualquer topologia de rede.
🏗️ O que é ROS2
ROS2 é o framework middleware padrão da indústria robótica. Diferente do ROS1, usa DDS (Data Distribution Service) como camada de transporte, oferecendo QoS configurável, descoberta automática de nós e comunicação real-time sem master node central.
Qualquer robô que vai para produção industrial usa ROS2. É o "Linux da robótica". Integrar um modelo VLA em ROS2 é obrigatório para deploy real. As distribuições atuais (Humble LTS, Iron) são estáveis e suportadas até 2027+.
✅ Use ROS2 quando
- ✓ Deploy em hardware real com múltiplos sensores
- ✓ Integração com Nav2/MoveIt2 para planejamento
- ✓ Equipe com múltiplos developers em paralelo
- ✓ Necessidade de rosbag2 para replay de dados
- ✓ Comunicação entre processos em CPUs distintas
❌ Evite quando
- ✗ Protótipo solo sem integração de sensores
- ✗ Pure ML training loop (PyTorch já basta)
- ✗ Simulação somente em MuJoCo/PyBullet
- ✗ Projetos com equipe sem conhecimento ROS
- ✗ Ambientes onde overhead de DDS é proibitivo
Topics, Services e Actions
Os três padrões de comunicação em ROS2. Cada um resolve um problema diferente — escolher o errado causa bugs sutis e difíceis de depurar em produção.
Topics — Pub/Sub Assíncrono
Streaming de dados contínuos: imagem da câmera, joint states, poses.
Publisher publica sem saber quem está ouvindo. N publishers → N subscribers possíveis. Alta frequência, sem garantia de entrega por padrão. Ideal para VLA inference output.
ros2 topic echo /image_raw
ros2 topic hz /vla/actions # medir frequência
Services — Request/Reply Síncrono
Chamadas pontuais com resposta garantida: mudar modo, ativar sensor.
Bloqueante por natureza — o client espera a resposta. Usar para configuração e controle de estado, nunca para streaming. Timeout necessário para evitar deadlock.
ros2 service call /switch_mode std_srvs/srv/SetBool "{data: true}"
Actions — Tarefas Longas com Feedback
Mover braço até posição, navegar até waypoint, executar tarefa complexa.
Goal + Feedback periódico + Result final. Cancelável a qualquer momento. VLA recebe comandos de alto nível ("pegar objeto") via Action e publica ações de baixo nível via Topic.
ros2 action send_goal /navigate_to_pose nav2_msgs/action/NavigateToPose \
"{pose: {header: {frame_id: 'map'}, pose: {position: {x: 1.0}}}}"
💡 Dica de Arquitetura VLA
VLA inference publica em /vla/actions (Topic, 25 Hz). Comandos de tarefa chegam como Action Goals. Configuração do modelo chega via Service. Usar rqt_graph para visualizar o grafo de comunicação e identificar gargalos.
TF2: Árvore de Transformações 3D
TF2 mantém uma árvore de transformações entre todos os frames de referência do robô, atualizada em tempo real. VLA precisa saber exatamente onde a câmera está em relação ao gripper.
Árvore TF2 típica: world → map → odom → base_link ramifica em câmera e braço manipulador.
📐 Por que TF2 é crítico para VLA
- → VLA prediz pose do gripper em frame da câmera — TF2 converte para frame do mundo automaticamente
- → Buffer temporal permite lookups no passado: sincroniza câmera (30 Hz) com joint states (1000 Hz)
- →
robot_state_publisherpublica TF tree automaticamente a partir do URDF do robô - → Sem TF2: calcular transformações manualmente a cada frame introduz erros e atraso
// Lookup de transformação em Python (rclpy)
import rclpy
from tf2_ros import Buffer, TransformListener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
# Converter ponto de camera_frame para base_link
try:
t = self.tf_buffer.lookup_transform(
'base_link', # frame destino
'camera_link', # frame origem
rclpy.time.Time() # timestamp (Time() = mais recente)
)
# t.transform.translation.{x,y,z}
# t.transform.rotation.{x,y,z,w}
except Exception as e:
self.get_logger().warn(f'TF lookup failed: {e}')
Nav2 e MoveIt2: Navegação e Manipulação
Nav2 e MoveIt2 executam o planejamento de caminho seguro que o VLA delega. A combinação VLA (o quê fazer) + Nav2/MoveIt2 (como chegar lá sem colidir) é o padrão para mobile manipulators.
🗺 Nav2 — Navegação Autônoma
- • Behavior Trees para lógica de navegação complexa
- • costmap2d: mapa de custo local + global
- • Planner Server: NavFn, Smac (A* em grid 2D)
- • Controller Server: DWB, MPPI, RPP
- • Recovery Behaviors: spin, back-up, clear costmap
🦾 MoveIt2 — Manipulação
- • SRDF: Semantic Robot Description Format
- • Planning Scene: colisões estáticas e dinâmicas
- • OMPL planners: RRT*, PRM, EST
- • Servo: controle real-time para teleop e VLA
- • Pick-and-Place: pipeline grasp → transport → place
🔗 VLA + Nav2/MoveIt2: divisão de responsabilidades
💡 MoveIt Servo para VLA em loop fechado
Para VLAs com controle contínuo (ex: ACT, Diffusion Policy), use moveit_servo em vez do pipeline padrão. Servo aceita comandos de velocidade/pose em alta frequência (100+ Hz) e faz colision checking em tempo real — ideal para políticas que publicam joint velocities continuamente.
ROS2 + VLA: Nó de Inferência
O nó de inferência é a ponte entre o mundo ML e o mundo ROS2. Integrar PyTorch/ONNX em ROS2 tem desafios únicos: GIL, latência de serialização, sincronização câmera-ação e gerenciamento de GPU memory.
// Esqueleto do nó VLA em rclpy
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, JointState
from geometry_msgs.msg import TwistStamped
import message_filters
import onnxruntime as ort
import numpy as np
class VLAInferenceNode(Node):
def __init__(self):
super().__init__('vla_inference')
# Carregar modelo ONNX (inferência otimizada, sem GIL pesado)
self.session = ort.InferenceSession(
'policy.onnx',
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
# Sincronizar câmera + joint states temporalmente
img_sub = message_filters.Subscriber(self, Image, '/image_raw')
js_sub = message_filters.Subscriber(self, JointState, '/joint_states')
self.sync = message_filters.ApproximateTimeSynchronizer(
[img_sub, js_sub], queue_size=5, slop=0.05
)
self.sync.registerCallback(self.inference_callback)
# Publisher de ações (25 Hz)
self.action_pub = self.create_publisher(
TwistStamped, '/vla/actions', 10
)
def inference_callback(self, img_msg, js_msg):
# Preprocessar imagem (sem alocação dinâmica no hot path)
img = np.frombuffer(img_msg.data, dtype=np.uint8)
joints = np.array(js_msg.position, dtype=np.float32)
# Inferência ONNX
outputs = self.session.run(None, {
'image': img[None], 'joints': joints[None]
})
action = outputs[0][0] # (6,) joint velocities
# Publicar
msg = TwistStamped()
msg.header.stamp = self.get_clock().now().to_msg()
# ... preencher msg com action
self.action_pub.publish(msg)
def main():
rclpy.init()
node = VLAInferenceNode()
rclpy.spin(node)
if __name__ == '__main__':
main()
✅ Boas práticas
- ✓ ONNX Runtime para inferência (sem GIL do PyTorch)
- ✓
message_filterspara sincronização temporal - ✓ Pre-allocated buffers numpy (sem malloc no loop)
- ✓ Executor multi-thread separando CB de inferência
- ✓ Composable nodes para zero-copy intra-processo
❌ Armadilhas comuns
- ✗ PyTorch em callback (GIL bloqueia executor ROS2)
- ✗
np.zeros()a cada frame (malloc no hot path) - ✗ Single-thread executor com câmera 30 Hz + inferência
- ✗ Ignorar latência de serialização DDS para arrays grandes
- ✗ Sem warmup do modelo (primeira inferência 10× mais lenta)
💡 TensorRT para máxima performance
Para atingir 50+ Hz de inferência em GPU embarcada (Jetson), converta o modelo para TensorRT antes do deploy: trtexec --onnx=policy.onnx --saveEngine=policy.trt --fp16. Ganho típico de 3-5× sobre ONNX Runtime padrão com INT8/FP16 quantização.
Launch Files e Deploy em Produção
Deploy de robótica em produção é engenharia de sistemas, não só ML. Um launch file bem estruturado com lifecycle management e restart policies é o que separa um demo de um produto real.
Launch File Python
Orquestra todos os nós com parâmetros, remappings e condicionais.
from launch import LaunchDescription
from launch_ros.actions import Node, ComposableNodeContainer
def generate_launch_description():
return LaunchDescription([
Node(
package='vla_inference',
executable='vla_node',
name='vla_inference',
parameters=[{'model_path': 'policy.trt', 'freq_hz': 25}],
remappings=[('/image_raw', '/camera/color/image_raw')]
),
# ... mais nós
])
systemd Unit File
Garante reinício automático após falhas e boot automático.
[Unit]
Description=VLA Robot System
After=network.target
[Service]
ExecStart=/bin/bash -c "source /opt/ros/humble/setup.bash && \
ros2 launch vla_bringup robot.launch.py"
Restart=on-failure
RestartSec=5s
User=robot
[Install]
WantedBy=multi-user.target
rosbag2 para Reprodução e Debug
Grava todos os topics para reprodução offline e análise post-mortem.
# Gravar: todos os topics críticos
ros2 bag record /image_raw /joint_states /vla/actions /tf
# Reproduzir com clock simulado
ros2 bag play recording_2026/ --clock
# Inspecionar bag
ros2 bag info recording_2026/
💡 Health Checks com Lifecycle Nodes
Em produção, use lifecycle_manager do Nav2 como padrão: ele verifica se cada nó critico atingiu o estado active antes de liberar o sistema. Se o nó VLA não ativar em 30 s, o lifecycle manager aborta o startup e aciona a política de fallback.
✅ Resumo do Módulo
Próximo módulo
2.5 — Treinamento: Imitation Learning, ACT, Diffusion Policy