Pular para o conteúdo principal

🔗 Integração de Sistemas

Objetivo do Módulo

Integrar todos os sistemas do humanoide (navegação, visão, manipulação, IA) em uma arquitetura coesa e robusta. Aprenda deployment, otimização, monitoring e debugging de sistemas complexos rodando em produção.

Duração estimada: 50 minutos Pré-requisitos: Módulos 1-8 (Todos os anteriores)


🏗️ Arquitetura Multi-Node

Sistema Completo de Humanoide

┌─────────────────────────────────────────────────────────────────┐
│ MASTER CONTROLLER │
│ (Coordena todos os subsistemas) │
└──────┬──────────────┬──────────────┬──────────────┬────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ NAVEGAÇÃO │ │ VISÃO │ │ MANIPULAÇÃO│ │ IA │
│ │ │ │ │ │ │ │
│ - Nav2 │ │ - YOLO │ │ - MoveIt │ │ - LLM │
│ - SLAM │ │ - Tracking │ │ - Grasping │ │ - RL │
│ - AMCL │ │ - Depth │ │ - Force │ │ - Planning │
└────────────┘ └────────────┘ └────────────┘ └────────────┘
│ │ │ │
└──────────────┴──────────────┴──────────────┘


┌──────────────────────┐
│ ROS2 MIDDLEWARE │
│ (DDS Communication) │
└──────────────────────┘


┌──────────────────────┐
│ HARDWARE LAYER │
│ - Motors/Servos │
│ - Sensors (LIDAR, │
│ Cameras, IMU) │
│ - GPU (CUDA) │
└──────────────────────┘

🚀 Launch Files: Orchestrando o Sistema

Launch File Monolítico (Tudo em um)

# launch/humanoid_full_system.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
import os

def generate_launch_description():
# ========================================
# PARÂMETROS CONFIGURÁVEIS
# ========================================
use_sim = LaunchConfiguration('use_sim', default='true')
robot_name = LaunchConfiguration('robot_name', default='optimus_01')
enable_vision = LaunchConfiguration('enable_vision', default='true')
enable_navigation = LaunchConfiguration('enable_navigation', default='true')

pkg_share = FindPackageShare('humanoid_system').find('humanoid_system')

# ========================================
# CONFIGURAÇÕES
# ========================================
config_dir = PathJoinSubstitution([pkg_share, 'config'])
rviz_config = PathJoinSubstitution([pkg_share, 'rviz', 'full_system.rviz'])

return LaunchDescription([
# ====================================
# ARGUMENTOS
# ====================================
DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Usar simulação ou hardware real'
),

DeclareLaunchArgument(
'robot_name',
default_value='optimus_01',
description='Nome do robô'
),

# ====================================
# 1. SIMULAÇÃO (GAZEBO)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('gazebo_ros'),
'launch',
'gazebo.launch.py'
])
]),
condition=IfCondition(use_sim)
),

# ====================================
# 2. ROBOT STATE PUBLISHER
# ====================================
Node(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
output='screen',
parameters=[{
'robot_description': read_urdf_file(),
'use_sim_time': use_sim
}]
),

# ====================================
# 3. NAVEGAÇÃO (Nav2 + SLAM)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([pkg_share, 'launch', 'navigation.launch.py'])
]),
launch_arguments={'use_sim_time': use_sim}.items(),
condition=IfCondition(enable_navigation)
),

# ====================================
# 4. VISÃO COMPUTACIONAL
# ====================================
Node(
package='humanoid_vision',
executable='yolo_detector',
name='yolo_detector',
output='screen',
parameters=[
PathJoinSubstitution([config_dir, 'vision_params.yaml']),
{'use_sim_time': use_sim}
],
condition=IfCondition(enable_vision)
),

Node(
package='humanoid_vision',
executable='object_tracker',
name='object_tracker',
output='screen',
condition=IfCondition(enable_vision)
),

# ====================================
# 5. MANIPULAÇÃO (MoveIt)
# ====================================
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare('humanoid_moveit_config'),
'launch',
'moveit.launch.py'
])
]),
launch_arguments={'use_sim_time': use_sim}.items()
),

# ====================================
# 6. IA / PLANEJAMENTO
# ====================================
Node(
package='humanoid_ai',
executable='llm_planner',
name='llm_planner',
output='screen',
parameters=[{'robot_name': robot_name}]
),

# ====================================
# 7. MASTER CONTROLLER
# ====================================
Node(
package='humanoid_system',
executable='master_controller',
name='master_controller',
output='screen',
parameters=[
PathJoinSubstitution([config_dir, 'master_params.yaml']),
{
'robot_name': robot_name,
'use_sim_time': use_sim
}
]
),

# ====================================
# 8. MONITORING / DIAGNOSTICS
# ====================================
Node(
package='humanoid_system',
executable='system_monitor',
name='system_monitor',
output='screen'
),

# ====================================
# 9. RVIZ (VISUALIZAÇÃO)
# ====================================
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
output='screen',
arguments=['-d', rviz_config]
),
])

def read_urdf_file():
"""Ler URDF do robô."""
pkg_share = FindPackageShare('humanoid_description').find('humanoid_description')
urdf_file = os.path.join(pkg_share, 'urdf', 'humanoid.urdf')

with open(urdf_file, 'r') as file:
return file.read()

Launch File Modular (Separado por Sistema)

# launch/main.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription

def generate_launch_description():
return LaunchDescription([
# Incluir cada subsistema
IncludeLaunchDescription('navigation.launch.py'),
IncludeLaunchDescription('vision.launch.py'),
IncludeLaunchDescription('manipulation.launch.py'),
IncludeLaunchDescription('ai.launch.py'),
])

🎛️ Master Controller: Coordenando Tudo

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from enum import Enum
import threading

class RobotState(Enum):
"""Estados do robô."""
IDLE = "idle"
NAVIGATING = "navigating"
MANIPULATING = "manipulating"
PROCESSING = "processing"
ERROR = "error"

class MasterController(Node):
"""
Controlador mestre que coordena todos os subsistemas.

Responsabilidades:
- Receber comandos de alto nível
- Decompor em subtarefas
- Coordenar execução entre subsistemas
- Monitorar saúde do sistema
- Lidar com erros e recovery
"""

def __init__(self):
super().__init__('master_controller')

# Estado atual do robô
self.state = RobotState.IDLE
self.state_lock = threading.Lock()

# ====================================
# CLIENTES DOS SUBSISTEMAS
# ====================================

# Navegação
self.nav_client = ActionClient(
self,
NavigateToPose,
'navigate_to_pose'
)

# Manipulação
self.arm_client = ActionClient(
self,
MoveGroup,
'move_action'
)

# Visão (via service)
self.detect_objects_client = self.create_client(
DetectObjects,
'detect_objects'
)

# IA / Planejamento
self.llm_planner_client = self.create_client(
PlanTask,
'plan_task'
)

# ====================================
# SUBSCRIBERS (Monitorar subsistemas)
# ====================================
self.nav_status_sub = self.create_subscription(
NavigationStatus,
'/navigation/status',
self.nav_status_callback,
10
)

self.vision_sub = self.create_subscription(
DetectedObjects,
'/detected_objects',
self.vision_callback,
10
)

# ====================================
# PUBLISHERS (Comandos)
# ====================================
self.status_pub = self.create_publisher(
RobotStatus,
'/robot/status',
10
)

# ====================================
# SERVIÇO (Interface de alto nível)
# ====================================
self.task_service = self.create_service(
ExecuteTask,
'execute_task',
self.execute_task_callback
)

# Estado interno
self.detected_objects = []
self.current_task = None

# Timer para publicar status
self.create_timer(1.0, self.publish_status)

self.get_logger().info('Master Controller iniciado')

def execute_task_callback(self, request, response):
"""
Executar tarefa de alto nível.

Exemplo: request.task = "pegar copo e levar para cozinha"
"""
task_description = request.task
self.get_logger().info(f'Nova tarefa recebida: {task_description}')

try:
# ========================================
# PASSO 1: PLANEJAMENTO (LLM)
# ========================================
plan = self.plan_task_with_llm(task_description)

# ========================================
# PASSO 2: EXECUTAR PLANO
# ========================================
success = self.execute_plan(plan)

response.success = success
response.message = "Tarefa completada" if success else "Falha na execução"

except Exception as e:
self.get_logger().error(f'Erro ao executar tarefa: {e}')
response.success = False
response.message = str(e)

return response

def plan_task_with_llm(self, task_description):
"""
Usar LLM para decompor tarefa em ações primitivas.
"""
self.set_state(RobotState.PROCESSING)

req = PlanTask.Request()
req.task_description = task_description
req.current_state = self.get_robot_state()

future = self.llm_planner_client.call_async(req)
rclpy.spin_until_future_complete(self, future)

plan = future.result().plan
self.get_logger().info(f'Plano gerado: {len(plan.actions)} ações')

return plan

def execute_plan(self, plan):
"""
Executar sequência de ações do plano.
"""
for i, action in enumerate(plan.actions):
self.get_logger().info(
f'Executando ação {i+1}/{len(plan.actions)}: {action.type}'
)

success = self.execute_action(action)

if not success:
self.get_logger().error(f'Ação {i+1} falhou. Abortando plano.')
self.set_state(RobotState.ERROR)
return False

self.set_state(RobotState.IDLE)
return True

def execute_action(self, action):
"""
Executar ação primitiva individual.
"""
if action.type == "navigate":
return self.navigate_to(action.target_pose)

elif action.type == "pick_object":
return self.pick_object(action.object_id)

elif action.type == "place_object":
return self.place_object(action.target_pose)

elif action.type == "detect_objects":
return self.detect_objects()

else:
self.get_logger().error(f'Tipo de ação desconhecida: {action.type}')
return False

def navigate_to(self, target_pose):
"""Navegar até pose alvo."""
self.set_state(RobotState.NAVIGATING)

goal = NavigateToPose.Goal()
goal.pose = target_pose

self.nav_client.wait_for_server()
future = self.nav_client.send_goal_async(goal)

rclpy.spin_until_future_complete(self, future)
goal_handle = future.result()

if not goal_handle.accepted:
self.get_logger().error('Navegação rejeitada')
return False

result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, result_future)

result = result_future.result().result
return result.success

def pick_object(self, object_id):
"""Pegar objeto."""
self.set_state(RobotState.MANIPULATING)

# Localizar objeto via visão
obj_pose = self.find_object(object_id)

if obj_pose is None:
self.get_logger().error(f'Objeto {object_id} não encontrado')
return False

# Comandar braço para pegar
goal = MoveGroup.Goal()
goal.target_pose = obj_pose
goal.planning_group = "left_arm"

future = self.arm_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self, future)

# Resultado
result = future.result().result
return result.success

def detect_objects(self):
"""Detectar objetos no ambiente."""
req = DetectObjects.Request()
future = self.detect_objects_client.call_async(req)

rclpy.spin_until_future_complete(self, future)

response = future.result()
self.detected_objects = response.objects

self.get_logger().info(f'{len(self.detected_objects)} objetos detectados')
return True

def set_state(self, new_state):
"""Atualizar estado do robô com thread-safety."""
with self.state_lock:
old_state = self.state
self.state = new_state
self.get_logger().info(f'Estado: {old_state.value}{new_state.value}')

def publish_status(self):
"""Publicar status do sistema periodicamente."""
msg = RobotStatus()
msg.state = self.state.value
msg.battery_level = self.get_battery_level()
msg.active_subsystems = self.get_active_subsystems()

self.status_pub.publish(msg)

def get_battery_level(self):
"""Obter nível de bateria (mock)."""
return 85.0 # Em produção: ler de sensor real

def get_active_subsystems(self):
"""Listar subsistemas ativos."""
return ["navigation", "vision", "manipulation", "llm_planner"]

def main():
rclpy.init()
controller = MasterController()
rclpy.spin(controller)
controller.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

📊 Monitoring e Diagnostics

System Monitor Node

class SystemMonitor(Node):
"""
Monitorar saúde de todos os subsistemas.

Métricas monitoradas:
- CPU/RAM/GPU usage
- Latência de comunicação ROS
- Taxa de publicação de topics
- Erros e warnings
"""

def __init__(self):
super().__init__('system_monitor')

# Subscribers para topics críticos
self.create_subscription(
Image, '/camera/image_raw',
self.check_camera_rate, 10
)

self.create_subscription(
LaserScan, '/scan',
self.check_lidar_rate, 10
)

# Timers
self.create_timer(5.0, self.check_system_health)
self.create_timer(1.0, self.check_topic_rates)

# Métricas
self.last_camera_time = self.get_clock().now()
self.last_lidar_time = self.get_clock().now()

def check_system_health(self):
"""Verificar recursos do sistema."""
import psutil

cpu_percent = psutil.cpu_percent(interval=1)
ram_percent = psutil.virtual_memory().percent

if cpu_percent > 90:
self.get_logger().warn(f'⚠️ CPU alta: {cpu_percent}%')

if ram_percent > 90:
self.get_logger().warn(f'⚠️ RAM alta: {ram_percent}%')

# GPU (NVIDIA)
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
gpu_util = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu

if gpu_util > 95:
self.get_logger().warn(f'⚠️ GPU alta: {gpu_util}%')

except:
pass # GPU não disponível

def check_camera_rate(self, msg):
"""Verificar taxa de publicação da câmera."""
now = self.get_clock().now()
dt = (now - self.last_camera_time).nanoseconds / 1e9

expected_fps = 30.0
actual_fps = 1.0 / dt if dt > 0 else 0.0

if actual_fps < expected_fps * 0.8: # 80% do esperado
self.get_logger().warn(
f'⚠️ Camera FPS baixo: {actual_fps:.1f} (esperado {expected_fps})'
)

self.last_camera_time = now

def check_topic_rates(self):
"""Verificar se topics críticos estão publicando."""
import subprocess

# Usar 'ros2 topic hz' para verificar taxa
topics_to_check = [
'/camera/image_raw',
'/scan',
'/odom',
'/joint_states'
]

for topic in topics_to_check:
# Implementar verificação de taxa
pass

🚨 Error Handling e Recovery

class ErrorRecovery(Node):
"""
Sistema de recuperação de erros.

Estratégias:
- Retry com exponential backoff
- Rollback para estado seguro
- Emergency stop
- Logging detalhado
"""

def __init__(self):
super().__init__('error_recovery')

self.max_retries = 3
self.retry_delays = [1.0, 2.0, 5.0] # segundos

def execute_with_retry(self, func, *args, **kwargs):
"""
Executar função com retry automático.
"""
for attempt in range(self.max_retries):
try:
result = func(*args, **kwargs)
return result

except Exception as e:
self.get_logger().error(
f'Tentativa {attempt + 1}/{self.max_retries} falhou: {e}'
)

if attempt < self.max_retries - 1:
delay = self.retry_delays[attempt]
self.get_logger().info(f'Tentando novamente em {delay}s...')
time.sleep(delay)
else:
self.get_logger().error('Todas as tentativas falharam!')
raise

def emergency_stop(self):
"""
Parada de emergência total.

- Para navegação
- Para braços
- Desliga motores não-críticos
"""
self.get_logger().error('🚨 PARADA DE EMERGÊNCIA ATIVADA!')

# Cancelar navegação
self.nav_client.cancel_all_goals()

# Parar braços
self.arm_client.cancel_all_goals()

# Publicar comando de parada
stop_cmd = Twist()
stop_cmd.linear.x = 0.0
stop_cmd.angular.z = 0.0
self.cmd_vel_pub.publish(stop_cmd)

def return_to_safe_state(self):
"""
Retornar para estado seguro conhecido.

Exemplo: Home position do braço, posição de charging.
"""
self.get_logger().info('Retornando para estado seguro...')

# Mover braços para home
self.arm_client.send_goal(name="home")

# Navegar para charging station
self.nav_client.send_goal(location="charging_station")

🏭 Deployment em Produção

Docker Container

# Dockerfile
FROM ros:humble

# Instalar dependências
RUN apt-get update && apt-get install -y \
ros-humble-navigation2 \
ros-humble-nav2-bringup \
ros-humble-moveit \
python3-pip \
&& rm -rf /var/lib/apt/lists/*

# Instalar dependências Python
COPY requirements.txt /tmp/
RUN pip3 install -r /tmp/requirements.txt

# Copiar código do robô
WORKDIR /ros2_ws
COPY src/ /ros2_ws/src/

# Build
RUN /bin/bash -c "source /opt/ros/humble/setup.bash && \
colcon build --symlink-install"

# Setup entrypoint
COPY docker/ros_entrypoint.sh /
RUN chmod +x /ros_entrypoint.sh
ENTRYPOINT ["/ros_entrypoint.sh"]

CMD ["ros2", "launch", "humanoid_system", "full_system.launch.py"]
# Build e run
docker build -t humanoid:latest .
docker run --gpus all --network host humanoid:latest

✅ Checklist de Domínio

Antes de avançar, certifique-se de que você consegue:

  • Criar launch files complexos
  • Integrar múltiplos subsistemas
  • Implementar master controller
  • Monitorar saúde do sistema
  • Implementar error handling e recovery
  • Deploy com Docker
  • Otimizar performance (CPU/RAM/GPU)
  • Debug sistemas multi-node
  • Configurar logging adequado
  • Testar integração end-to-end

🔗 Próximos Passos

Próximo Módulo

🏆 Projeto Final →

Aplicar tudo que aprendeu em um projeto completo: robô autônomo que navega, detecta e manipula objetos. Demonstre suas habilidades de ponta a ponta!

Recursos adicionais: