🔗 Integração de Sistemas
Objetivo do Módulo
Integrar todos os sistemas do humanoide (navegação, visão, manipulação, IA) em uma arquitetura coesa e robusta. Aprenda deployment, otimização, monitoring e debugging de sistemas complexos rodando em produção.
Duração estimada: 50 minutos Pré-requisitos: Módulos 1-8 (Todos os anteriores)
🏗️ Arquitetura Multi-Node
Sistema Completo de Humanoide
┌─────────────────────────────────────────────────────────────────┐
│ MASTER CONTROLLER │
│ (Coordena todos os subsistemas) │
└──────┬──────────────┬──────────────┬──────────────┬────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ NAVEGAÇÃO │ │ VISÃO │ │ MANIPULAÇÃO│ │ IA │
│ │ │ │ │ │ │ │
│ - Nav2 │ │ - YOLO │ │ - MoveIt │ │ - LLM │
│ - SLAM │ │ - Tracking │ │ - Grasping │ │ - RL │
│ - AMCL │ │ - Depth │ │ - Force │ │ - Planning │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
│ │ │ │
└──────────────┴──────────────┴──────────────┘
│
▼
┌──────────────────────┐
│ ROS2 MIDDLEWARE │
│ (DDS Communication) │
└──────────────────────┘
│
▼
┌──────────────────────┐
│ HARDWARE LAYER │
│ - Motors/Servos │
│ - Sensors (LIDAR, │
│ Cameras, IMU) │
│ - GPU (CUDA) │
└──────────────────────┘
🚀 Launch Files: Orchestrando o Sistema
Launch File Monolítico (Tudo em um)
# launch/humanoid_full_system.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
import os
def generate_launch_description():
# ========================================
# PARÂMETROS CONFIGURÁVEIS
# ========================================
use_sim = LaunchConfiguration('use_sim', default='true')
robot_name = LaunchConfiguration('robot_name', default='optimus_01')
enable_vision = LaunchConfiguration('enable_vision', default='true')
enable_navigation = LaunchConfiguration('enable_navigation', default='true')
pkg_share = FindPackageShare('humanoid_system').find('humanoid_system')
# ========================================
# CONFIGURAÇÕES
# ========================================
config_dir = PathJoinSubstitution([pkg_share, 'config'])
rviz_config = PathJoinSubstitution([pkg_share, 'rviz', 'full_system.rviz'])
return LaunchDescription([
# ====================================
# ARGUMENTOS
# ====================================
DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Usar simulação ou hardware real'
),
DeclareLaunchArgument(
'robot_name',
default_value='optimus_01',
description='Nome do robô'
),
# ====================================
# 1. SIMULAÇÃO (GAZEBO)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('gazebo_ros'),
'launch',
'gazebo.launch.py'
])
]),
condition=IfCondition(use_sim)
),
# ====================================
# 2. ROBOT STATE PUBLISHER
# ====================================
Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
output='screen',
parameters=[{
'robot_description': read_urdf_file(),
'use_sim_time': use_sim
}]
),
# ====================================
# 3. NAVEGAÇÃO (Nav2 + SLAM)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([pkg_share, 'launch', 'navigation.launch.py'])
]),
launch_arguments={'use_sim_time': use_sim}.items(),
condition=IfCondition(enable_navigation)
),
# ====================================
# 4. VISÃO COMPUTACIONAL
# ====================================
Node(
package='humanoid_vision',
executable='yolo_detector',
name='yolo_detector',
output='screen',
parameters=[
PathJoinSubstitution([config_dir, 'vision_params.yaml']),
{'use_sim_time': use_sim}
],
condition=IfCondition(enable_vision)
),
Node(
package='humanoid_vision',
executable='object_tracker',
name='object_tracker',
output='screen',
condition=IfCondition(enable_vision)
),
# ====================================
# 5. MANIPULAÇÃO (MoveIt)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('humanoid_moveit_config'),
'launch',
'moveit.launch.py'
])
]),
launch_arguments={'use_sim_time': use_sim}.items()
),
# ====================================
# 6. IA / PLANEJAMENTO
# ====================================
Node(
package='humanoid_ai',
executable='llm_planner',
name='llm_planner',
output='screen',
parameters=[{'robot_name': robot_name}]
),
# ====================================
# 7. MASTER CONTROLLER
# ====================================
Node(
package='humanoid_system',
executable='master_controller',
name='master_controller',
output='screen',
parameters=[
PathJoinSubstitution([config_dir, 'master_params.yaml']),
{
'robot_name': robot_name,
'use_sim_time': use_sim
}
]
),
# ====================================
# 8. MONITORING / DIAGNOSTICS
# ====================================
Node(
package='humanoid_system',
executable='system_monitor',
name='system_monitor',
output='screen'
),
# ====================================
# 9. RVIZ (VISUALIZAÇÃO)
# ====================================
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
output='screen',
arguments=['-d', rviz_config]
),
])
def read_urdf_file():
"""Ler URDF do robô."""
pkg_share = FindPackageShare('humanoid_description').find('humanoid_description')
urdf_file = os.path.join(pkg_share, 'urdf', 'humanoid.urdf')
with open(urdf_file, 'r') as file:
return file.read()
Launch File Modular (Separado por Sistema)
- Main Launch
- Navigation Launch
- Vision Launch
# launch/main.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
def generate_launch_description():
return LaunchDescription([
# Incluir cada subsistema
IncludeLaunchDescription('navigation.launch.py'),
IncludeLaunchDescription('vision.launch.py'),
IncludeLaunchDescription('manipulation.launch.py'),
IncludeLaunchDescription('ai.launch.py'),
])
# launch/navigation.launch.py
def generate_launch_description():
return LaunchDescription([
Node(package='slam_toolbox', executable='async_slam_toolbox_node', ...),
Node(package='nav2_amcl', executable='amcl', ...),
Node(package='nav2_planner', executable='planner_server', ...),
Node(package='nav2_controller', executable='controller_server', ...),
])
# launch/vision.launch.py
def generate_launch_description():
return LaunchDescription([
Node(package='realsense2_camera', executable='realsense2_camera_node', ...),
Node(package='humanoid_vision', executable='yolo_detector', ...),
Node(package='humanoid_vision', executable='object_tracker', ...),
])
🎛️ Master Controller: Coordenando Tudo
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from enum import Enum
import threading
class RobotState(Enum):
"""Estados do robô."""
IDLE = "idle"
NAVIGATING = "navigating"
MANIPULATING = "manipulating"
PROCESSING = "processing"
ERROR = "error"
class MasterController(Node):
"""
Controlador mestre que coordena todos os subsistemas.
Responsabilidades:
- Receber comandos de alto nível
- Decompor em subtarefas
- Coordenar execução entre subsistemas
- Monitorar saúde do sistema
- Lidar com erros e recovery
"""
def __init__(self):
super().__init__('master_controller')
# Estado atual do robô
self.state = RobotState.IDLE
self.state_lock = threading.Lock()
# ====================================
# CLIENTES DOS SUBSISTEMAS
# ====================================
# Navegação
self.nav_client = ActionClient(
self,
NavigateToPose,
'navigate_to_pose'
)
# Manipulação
self.arm_client = ActionClient(
self,
MoveGroup,
'move_action'
)
# Visão (via service)
self.detect_objects_client = self.create_client(
DetectObjects,
'detect_objects'
)
# IA / Planejamento
self.llm_planner_client = self.create_client(
PlanTask,
'plan_task'
)
# ====================================
# SUBSCRIBERS (Monitorar subsistemas)
# ====================================
self.nav_status_sub = self.create_subscription(
NavigationStatus,
'/navigation/status',
self.nav_status_callback,
10
)
self.vision_sub = self.create_subscription(
DetectedObjects,
'/detected_objects',
self.vision_callback,
10
)
# ====================================
# PUBLISHERS (Comandos)
# ====================================
self.status_pub = self.create_publisher(
RobotStatus,
'/robot/status',
10
)
# ====================================
# SERVIÇO (Interface de alto nível)
# ====================================
self.task_service = self.create_service(
ExecuteTask,
'execute_task',
self.execute_task_callback
)
# Estado interno
self.detected_objects = []
self.current_task = None
# Timer para publicar status
self.create_timer(1.0, self.publish_status)
self.get_logger().info('Master Controller iniciado')
def execute_task_callback(self, request, response):
"""
Executar tarefa de alto nível.
Exemplo: request.task = "pegar copo e levar para cozinha"
"""
task_description = request.task
self.get_logger().info(f'Nova tarefa recebida: {task_description}')
try:
# ========================================
# PASSO 1: PLANEJAMENTO (LLM)
# ========================================
plan = self.plan_task_with_llm(task_description)
# ========================================
# PASSO 2: EXECUTAR PLANO
# ========================================
success = self.execute_plan(plan)
response.success = success
response.message = "Tarefa completada" if success else "Falha na execução"
except Exception as e:
self.get_logger().error(f'Erro ao executar tarefa: {e}')
response.success = False
response.message = str(e)
return response
def plan_task_with_llm(self, task_description):
"""
Usar LLM para decompor tarefa em ações primitivas.
"""
self.set_state(RobotState.PROCESSING)
req = PlanTask.Request()
req.task_description = task_description
req.current_state = self.get_robot_state()
future = self.llm_planner_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
plan = future.result().plan
self.get_logger().info(f'Plano gerado: {len(plan.actions)} ações')
return plan
def execute_plan(self, plan):
"""
Executar sequência de ações do plano.
"""
for i, action in enumerate(plan.actions):
self.get_logger().info(
f'Executando ação {i+1}/{len(plan.actions)}: {action.type}'
)
success = self.execute_action(action)
if not success:
self.get_logger().error(f'Ação {i+1} falhou. Abortando plano.')
self.set_state(RobotState.ERROR)
return False
self.set_state(RobotState.IDLE)
return True
def execute_action(self, action):
"""
Executar ação primitiva individual.
"""
if action.type == "navigate":
return self.navigate_to(action.target_pose)
elif action.type == "pick_object":
return self.pick_object(action.object_id)
elif action.type == "place_object":
return self.place_object(action.target_pose)
elif action.type == "detect_objects":
return self.detect_objects()
else:
self.get_logger().error(f'Tipo de ação desconhecida: {action.type}')
return False
def navigate_to(self, target_pose):
"""Navegar até pose alvo."""
self.set_state(RobotState.NAVIGATING)
goal = NavigateToPose.Goal()
goal.pose = target_pose
self.nav_client.wait_for_server()
future = self.nav_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self, future)
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Navegação rejeitada')
return False
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, result_future)
result = result_future.result().result
return result.success
def pick_object(self, object_id):
"""Pegar objeto."""
self.set_state(RobotState.MANIPULATING)
# Localizar objeto via visão
obj_pose = self.find_object(object_id)
if obj_pose is None:
self.get_logger().error(f'Objeto {object_id} não encontrado')
return False
# Comandar braço para pegar
goal = MoveGroup.Goal()
goal.target_pose = obj_pose
goal.planning_group = "left_arm"
future = self.arm_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self, future)
# Resultado
result = future.result().result
return result.success
def detect_objects(self):
"""Detectar objetos no ambiente."""
req = DetectObjects.Request()
future = self.detect_objects_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
response = future.result()
self.detected_objects = response.objects
self.get_logger().info(f'{len(self.detected_objects)} objetos detectados')
return True
def set_state(self, new_state):
"""Atualizar estado do robô com thread-safety."""
with self.state_lock:
old_state = self.state
self.state = new_state
self.get_logger().info(f'Estado: {old_state.value} → {new_state.value}')
def publish_status(self):
"""Publicar status do sistema periodicamente."""
msg = RobotStatus()
msg.state = self.state.value
msg.battery_level = self.get_battery_level()
msg.active_subsystems = self.get_active_subsystems()
self.status_pub.publish(msg)
def get_battery_level(self):
"""Obter nível de bateria (mock)."""
return 85.0 # Em produção: ler de sensor real
def get_active_subsystems(self):
"""Listar subsistemas ativos."""
return ["navigation", "vision", "manipulation", "llm_planner"]
def main():
rclpy.init()
controller = MasterController()
rclpy.spin(controller)
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
📊 Monitoring e Diagnostics
System Monitor Node
class SystemMonitor(Node):
"""
Monitorar saúde de todos os subsistemas.
Métricas monitoradas:
- CPU/RAM/GPU usage
- Latência de comunicação ROS
- Taxa de publicação de topics
- Erros e warnings
"""
def __init__(self):
super().__init__('system_monitor')
# Subscribers para topics críticos
self.create_subscription(
Image, '/camera/image_raw',
self.check_camera_rate, 10
)
self.create_subscription(
LaserScan, '/scan',
self.check_lidar_rate, 10
)
# Timers
self.create_timer(5.0, self.check_system_health)
self.create_timer(1.0, self.check_topic_rates)
# Métricas
self.last_camera_time = self.get_clock().now()
self.last_lidar_time = self.get_clock().now()
def check_system_health(self):
"""Verificar recursos do sistema."""
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
ram_percent = psutil.virtual_memory().percent
if cpu_percent > 90:
self.get_logger().warn(f'⚠️ CPU alta: {cpu_percent}%')
if ram_percent > 90:
self.get_logger().warn(f'⚠️ RAM alta: {ram_percent}%')
# GPU (NVIDIA)
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
gpu_util = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
if gpu_util > 95:
self.get_logger().warn(f'⚠️ GPU alta: {gpu_util}%')
except:
pass # GPU não disponível
def check_camera_rate(self, msg):
"""Verificar taxa de publicação da câmera."""
now = self.get_clock().now()
dt = (now - self.last_camera_time).nanoseconds / 1e9
expected_fps = 30.0
actual_fps = 1.0 / dt if dt > 0 else 0.0
if actual_fps < expected_fps * 0.8: # 80% do esperado
self.get_logger().warn(
f'⚠️ Camera FPS baixo: {actual_fps:.1f} (esperado {expected_fps})'
)
self.last_camera_time = now
def check_topic_rates(self):
"""Verificar se topics críticos estão publicando."""
import subprocess
# Usar 'ros2 topic hz' para verificar taxa
topics_to_check = [
'/camera/image_raw',
'/scan',
'/odom',
'/joint_states'
]
for topic in topics_to_check:
# Implementar verificação de taxa
pass
🚨 Error Handling e Recovery
class ErrorRecovery(Node):
"""
Sistema de recuperação de erros.
Estratégias:
- Retry com exponential backoff
- Rollback para estado seguro
- Emergency stop
- Logging detalhado
"""
def __init__(self):
super().__init__('error_recovery')
self.max_retries = 3
self.retry_delays = [1.0, 2.0, 5.0] # segundos
def execute_with_retry(self, func, *args, **kwargs):
"""
Executar função com retry automático.
"""
for attempt in range(self.max_retries):
try:
result = func(*args, **kwargs)
return result
except Exception as e:
self.get_logger().error(
f'Tentativa {attempt + 1}/{self.max_retries} falhou: {e}'
)
if attempt < self.max_retries - 1:
delay = self.retry_delays[attempt]
self.get_logger().info(f'Tentando novamente em {delay}s...')
time.sleep(delay)
else:
self.get_logger().error('Todas as tentativas falharam!')
raise
def emergency_stop(self):
"""
Parada de emergência total.
- Para navegação
- Para braços
- Desliga motores não-críticos
"""
self.get_logger().error('🚨 PARADA DE EMERGÊNCIA ATIVADA!')
# Cancelar navegação
self.nav_client.cancel_all_goals()
# Parar braços
self.arm_client.cancel_all_goals()
# Publicar comando de parada
stop_cmd = Twist()
stop_cmd.linear.x = 0.0
stop_cmd.angular.z = 0.0
self.cmd_vel_pub.publish(stop_cmd)
def return_to_safe_state(self):
"""
Retornar para estado seguro conhecido.
Exemplo: Home position do braço, posição de charging.
"""
self.get_logger().info('Retornando para estado seguro...')
# Mover braços para home
self.arm_client.send_goal(name="home")
# Navegar para charging station
self.nav_client.send_goal(location="charging_station")
🏭 Deployment em Produção
Docker Container
# Dockerfile
FROM ros:humble
# Instalar dependências
RUN apt-get update && apt-get install -y \
ros-humble-navigation2 \
ros-humble-nav2-bringup \
ros-humble-moveit \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Instalar dependências Python
COPY requirements.txt /tmp/
RUN pip3 install -r /tmp/requirements.txt
# Copiar código do robô
WORKDIR /ros2_ws
COPY src/ /ros2_ws/src/
# Build
RUN /bin/bash -c "source /opt/ros/humble/setup.bash && \
colcon build --symlink-install"
# Setup entrypoint
COPY docker/ros_entrypoint.sh /
RUN chmod +x /ros_entrypoint.sh
ENTRYPOINT ["/ros_entrypoint.sh"]
CMD ["ros2", "launch", "humanoid_system", "full_system.launch.py"]
# Build e run
docker build -t humanoid:latest .
docker run --gpus all --network host humanoid:latest
✅ Checklist de Domínio
Antes de avançar, certifique-se de que você consegue:
- Criar launch files complexos
- Integrar múltiplos subsistemas
- Implementar master controller
- Monitorar saúde do sistema
- Implementar error handling e recovery
- Deploy com Docker
- Otimizar performance (CPU/RAM/GPU)
- Debug sistemas multi-node
- Configurar logging adequado
- Testar integração end-to-end
🔗 Próximos Passos
Próximo Módulo
Aplicar tudo que aprendeu em um projeto completo: robô autônomo que navega, detecta e manipula objetos. Demonstre suas habilidades de ponta a ponta!
Recursos adicionais: