🤖 Autonomia Completa no G1
Objetivo do Módulo
Implementar autonomia completa no G1: state machines para gerenciar tarefas complexas, decision making com behavior trees, error recovery robusto, e navegação multi-floor (elevadores, escadas).
🔀 State Machine Architecture
Finite State Machine (FSM)
#!/usr/bin/env python3
"""
autonomous_delivery.py - Delivery robot usando FSM
"""
from enum import Enum, auto
from dataclasses import dataclass
import time
class RobotState(Enum):
"""Estados do robô"""
IDLE = auto()
NAVIGATING_TO_PICKUP = auto()
PICKING_UP = auto()
NAVIGATING_TO_DELIVERY = auto()
DELIVERING = auto()
RETURNING_HOME = auto()
ERROR = auto()
CHARGING = auto()
@dataclass
class DeliveryTask:
"""Tarefa de entrega"""
pickup_location: tuple # (x, y)
delivery_location: tuple # (x, y)
object_name: str
object_width: float = 0.05
class AutonomousDeliveryFSM:
def __init__(self, robot):
self.robot = robot
self.state = RobotState.IDLE
# Componentes
self.navigator = None # Nav2 navigator
self.manipulator = None # Arm controller
self.perception = None # Object detector
# Task queue
self.tasks = []
self.current_task = None
# Estado
self.home_location = (0.0, 0.0)
self.battery_threshold = 20.0 # %
def add_task(self, task: DeliveryTask):
"""Adiciona tarefa à fila"""
self.tasks.append(task)
print(f"✅ Tarefa adicionada: {task.object_name}")
def run(self):
"""Loop principal da FSM"""
while True:
# Verificar bateria
if self._battery_low() and self.state != RobotState.CHARGING:
self._transition_to(RobotState.CHARGING)
# Processar estado atual
if self.state == RobotState.IDLE:
self._handle_idle()
elif self.state == RobotState.NAVIGATING_TO_PICKUP:
self._handle_navigating_to_pickup()
elif self.state == RobotState.PICKING_UP:
self._handle_picking_up()
elif self.state == RobotState.NAVIGATING_TO_DELIVERY:
self._handle_navigating_to_delivery()
elif self.state == RobotState.DELIVERING:
self._handle_delivering()
elif self.state == RobotState.RETURNING_HOME:
self._handle_returning_home()
elif self.state == RobotState.ERROR:
self._handle_error()
elif self.state == RobotState.CHARGING:
self._handle_charging()
time.sleep(0.1) # 10 Hz
def _transition_to(self, new_state: RobotState):
"""Transição de estado com logging"""
print(f"State: {self.state.name} → {new_state.name}")
self.state = new_state
def _handle_idle(self):
"""Estado IDLE: aguarda tarefas"""
if len(self.tasks) > 0:
self.current_task = self.tasks.pop(0)
print(f"📦 Iniciando tarefa: {self.current_task.object_name}")
self._transition_to(RobotState.NAVIGATING_TO_PICKUP)
def _handle_navigating_to_pickup(self):
"""Navega para local de coleta"""
target = self.current_task.pickup_location
# Enviar goal para Nav2
# success = self.navigator.navigate_to(target)
# Simulação
print(f"Navegando para pickup: {target}")
time.sleep(2.0)
success = True
if success:
self._transition_to(RobotState.PICKING_UP)
else:
print("❌ Falha na navegação")
self._transition_to(RobotState.ERROR)
def _handle_picking_up(self):
"""Pega objeto"""
# Detectar objeto
# obj = self.perception.find_object(self.current_task.object_name)
# Pick com manipulador
# success = self.manipulator.pick_object(obj.position_3d)
# Simulação
print(f"Pegando: {self.current_task.object_name}")
time.sleep(3.0)
success = True
if success:
print("✅ Objeto pegado")
self._transition_to(RobotState.NAVIGATING_TO_DELIVERY)
else:
print("❌ Falha ao pegar")
self._transition_to(RobotState.ERROR)
def _handle_navigating_to_delivery(self):
"""Navega para local de entrega"""
target = self.current_task.delivery_location
print(f"Navegando para delivery: {target}")
time.sleep(2.0)
success = True
if success:
self._transition_to(RobotState.DELIVERING)
else:
self._transition_to(RobotState.ERROR)
def _handle_delivering(self):
"""Entrega objeto"""
# Colocar objeto
# success = self.manipulator.place_object(target_position)
print(f"Entregando: {self.current_task.object_name}")
time.sleep(2.0)
success = True
if success:
print("✅ Objeto entregue")
self.current_task = None
self._transition_to(RobotState.RETURNING_HOME)
else:
self._transition_to(RobotState.ERROR)
def _handle_returning_home(self):
"""Retorna para home"""
print(f"Retornando para home: {self.home_location}")
time.sleep(2.0)
self._transition_to(RobotState.IDLE)
def _handle_error(self):
"""Estado de erro: tentar recovery"""
print("⚠️ Estado de ERRO - tentando recovery")
# Recovery strategies:
# 1. Retentar última operação
# 2. Voltar para IDLE
# 3. Chamar operador humano
time.sleep(5.0)
# Por enquanto: voltar para IDLE
self.current_task = None
self._transition_to(RobotState.IDLE)
def _handle_charging(self):
"""Carregando bateria"""
print("🔋 Carregando... (aguardando bateria > 80%)")
# Navegar para estação de carga
# self.navigator.navigate_to(charging_station_location)
# Aguardar carga
while self._battery_low():
time.sleep(1.0)
print("✅ Bateria carregada")
self._transition_to(RobotState.IDLE)
def _battery_low(self) -> bool:
"""Verifica se bateria está baixa"""
state = self.robot.get_state()
return state.battery_percentage < self.battery_threshold
# Uso:
def main():
robot = G1Robot()
robot.wait_for_connection()
fsm = AutonomousDeliveryFSM(robot)
# Adicionar tarefas
fsm.add_task(DeliveryTask(
pickup_location=(5.0, 2.0),
delivery_location=(10.0, 8.0),
object_name="package_1"
))
fsm.add_task(DeliveryTask(
pickup_location=(3.0, 1.0),
delivery_location=(7.0, 5.0),
object_name="package_2"
))
# Rodar FSM
fsm.run()
if __name__ == "__main__":
main()
🌳 Behavior Trees
Vantagens sobre FSM:
- ✅ Modular (nodes reutilizáveis)
- ✅ Hierárquico (sub-trees)
- ✅ Fácil de visualizar e debugar
#!/usr/bin/env python3
"""
behavior_tree_delivery.py - Delivery usando Behavior Tree
"""
from py_trees import behaviour, composites, blackboard
from py_trees.common import Status
import time
class NavigateToLocation(behaviour.Behaviour):
"""Navega para localização"""
def __init__(self, name, target_key):
super().__init__(name)
self.target_key = target_key
self.blackboard = self.attach_blackboard_client()
self.blackboard.register_key(target_key, access=blackboard.Access.READ)
def update(self):
target = self.blackboard.get(self.target_key)
print(f"Navegando para: {target}")
# Simular navegação
time.sleep(0.5)
# Em produção: chamar Nav2 e verificar resultado
return Status.SUCCESS
class PickObject(behaviour.Behaviour):
"""Pega objeto"""
def __init__(self, name):
super().__init__(name)
def update(self):
print("Pegando objeto...")
time.sleep(0.5)
return Status.SUCCESS
class PlaceObject(behaviour.Behaviour):
"""Coloca objeto"""
def __init__(self, name):
super().__init__(name)
def update(self):
print("Colocando objeto...")
time.sleep(0.5)
return Status.SUCCESS
class CheckBattery(behaviour.Behaviour):
"""Verifica bateria"""
def __init__(self, name, threshold=20.0):
super().__init__(name)
self.threshold = threshold
def update(self):
# Simular leitura de bateria
battery = 85.0
if battery > self.threshold:
return Status.SUCCESS
else:
print(f"⚠️ Bateria baixa: {battery}%")
return Status.FAILURE
def create_delivery_behavior_tree():
"""Cria behavior tree para delivery"""
# Root: Sequence (executa filhos em ordem)
root = composites.Sequence(name="DeliveryTask", memory=False)
# 1. Verificar bateria
check_battery = CheckBattery(name="CheckBattery")
# 2. Navegar para pickup
nav_to_pickup = NavigateToLocation(name="NavToPickup", target_key="pickup_location")
# 3. Pegar objeto
pick = PickObject(name="PickObject")
# 4. Navegar para delivery
nav_to_delivery = NavigateToLocation(name="NavToDelivery", target_key="delivery_location")
# 5. Entregar objeto
place = PlaceObject(name="PlaceObject")
# 6. Retornar para home
nav_to_home = NavigateToLocation(name="NavToHome", target_key="home_location")
# Adicionar filhos ao root
root.add_children([
check_battery,
nav_to_pickup,
pick,
nav_to_delivery,
place,
nav_to_home
])
return root
def main():
# Criar behavior tree
tree = create_delivery_behavior_tree()
# Setup blackboard (memória compartilhada)
bb = blackboard.Client()
bb.register_key("pickup_location", access=blackboard.Access.WRITE)
bb.register_key("delivery_location", access=blackboard.Access.WRITE)
bb.register_key("home_location", access=blackboard.Access.WRITE)
bb.set("pickup_location", (5.0, 2.0))
bb.set("delivery_location", (10.0, 8.0))
bb.set("home_location", (0.0, 0.0))
# Executar tree
tree.setup_with_descendants()
while True:
tree.tick_once()
if tree.status == Status.SUCCESS:
print("✅ Tarefa concluída!")
break
elif tree.status == Status.FAILURE:
print("❌ Tarefa falhou")
break
time.sleep(0.1)
if __name__ == "__main__":
main()
🛡️ Error Recovery
Recovery Behaviors
class RecoveryController:
"""Gerencia recovery de erros"""
def __init__(self, robot):
self.robot = robot
# Histórico de erros
self.error_history = []
# Estratégias de recovery
self.recovery_strategies = {
"navigation_failed": self.recover_navigation,
"grasp_failed": self.recover_grasp,
"fall_detected": self.recover_fall,
"battery_critical": self.recover_battery,
"sensor_failure": self.recover_sensor
}
def handle_error(self, error_type: str) -> bool:
"""
Tenta recover de erro
Returns:
True se recovery bem-sucedido
"""
print(f"⚠️ Erro detectado: {error_type}")
self.error_history.append({
"type": error_type,
"timestamp": time.time()
})
# Verificar se erro recorrente (> 3x em 1 min)
recent_errors = [e for e in self.error_history
if time.time() - e["timestamp"] < 60]
if len(recent_errors) > 3:
print("❌ Erro recorrente - requer intervenção humana")
return False
# Executar estratégia de recovery
strategy = self.recovery_strategies.get(error_type)
if strategy is None:
print(f"❌ Sem estratégia de recovery para: {error_type}")
return False
return strategy()
def recover_navigation(self) -> bool:
"""Recovery de falha de navegação"""
print("Tentando recovery de navegação...")
# 1. Parar movimento
self.robot.emergency_stop()
time.sleep(0.5)
# 2. Limpar costmap
# self.navigator.clear_costmap()
# 3. Recomputar caminho
# success = self.navigator.replan()
# Simulação
success = True
if success:
print("✅ Navegação recuperada")
else:
print("❌ Recovery falhou - backing up")
self._back_up(distance=0.5)
return success
def recover_grasp(self) -> bool:
"""Recovery de falha de grasp"""
print("Tentando recovery de grasp...")
# 1. Abrir gripper
# self.manipulator.gripper.open()
# 2. Retrair braço
# self.manipulator.move_to_home()
# 3. Re-detectar objeto
# obj = self.perception.find_object(target_name)
# 4. Tentar novamente com posição ajustada
# success = self.manipulator.pick_object(obj.position_3d + offset)
success = True
return success
def recover_fall(self) -> bool:
"""Recovery de queda"""
print("⚠️ QUEDA DETECTADA - Tentando levantar")
# 1. Emergency stop
self.robot.emergency_stop()
# 2. Verificar orientação
state = self.robot.get_state()
roll, pitch, yaw = state.imu_euler
if abs(roll) > 1.0 or abs(pitch) > 1.0: # > 57°
print("❌ Robô caído - requer ajuda humana")
return False
# 3. Tentar levantar (stand up sequence)
# self._stand_up_sequence()
return True
def recover_battery(self) -> bool:
"""Recovery de bateria crítica"""
print("🔋 Bateria crítica - navegando para estação")
# Navegar para estação de carga mais próxima
# charging_station = self.find_nearest_charging_station()
# self.navigator.navigate_to(charging_station)
return True
def recover_sensor(self) -> bool:
"""Recovery de falha de sensor"""
print("Sensor failure - tentando reiniciar")
# Tentar reiniciar sensor
# self.sensor.restart()
time.sleep(2.0)
# Verificar se voltou
# if self.sensor.is_alive():
# return True
return False
def _back_up(self, distance: float):
"""Move robô para trás"""
print(f"Backing up {distance}m...")
# Comando de velocidade negativa
# cmd = MotionCommand()
# cmd.vx = -0.2
# duration = distance / 0.2
# ...
time.sleep(distance / 0.2)
🏢 Multi-Floor Navigation
Elevator Integration
class ElevatorController:
"""Controla integração com elevador"""
def __init__(self, robot):
self.robot = robot
def call_elevator(self, floor: int, direction: str) -> bool:
"""
Chama elevador
Args:
floor: Andar atual
direction: "up" ou "down"
Returns:
True se elevador chegou
"""
print(f"Chamando elevador no andar {floor} ({direction})")
# Integração via API do prédio ou button pressing
# Option 1: API
# elevator_api.call(floor, direction)
# Option 2: Pressionar botão fisicamente
button_pos = self._get_button_position(floor, direction)
if button_pos is not None:
self._press_button(button_pos)
# Aguardar elevador
return self._wait_for_elevator(timeout=60.0)
def enter_elevator(self) -> bool:
"""Entra no elevador"""
print("Entrando no elevador...")
# Detectar porta aberta (depth camera)
door_open = self._detect_open_door()
if not door_open:
print("❌ Porta do elevador fechada")
return False
# Navegar para dentro
# self.navigator.navigate_to(inside_elevator_position)
# Simulação
time.sleep(2.0)
print("✅ Dentro do elevador")
return True
def select_floor(self, target_floor: int) -> bool:
"""Seleciona andar no painel"""
print(f"Selecionando andar {target_floor}")
# Detectar painel
# panel = self.perception.detect_elevator_panel()
# Detectar botão do andar
# button = panel.get_button(target_floor)
# Pressionar botão
# self._press_button(button.position_3d)
return True
def exit_elevator(self) -> bool:
"""Sai do elevador"""
print("Saindo do elevador...")
# Aguardar porta abrir
self._wait_for_door_open()
# Navegar para fora
# self.navigator.navigate_to(outside_elevator_position)
time.sleep(2.0)
print("✅ Fora do elevador")
return True
def _press_button(self, position_3d: tuple):
"""Pressiona botão fisicamente"""
print(f"Pressionando botão em {position_3d}")
# Usar braço para pressionar
# self.manipulator.move_to_position(position_3d)
# self.manipulator.push(force=10.0)
time.sleep(1.0)
def _detect_open_door(self) -> bool:
"""Detecta porta aberta com depth camera"""
# Verificar se há espaço livre na frente (> 2m)
# depth = self.perception.get_depth_at_center()
# return depth > 2.0
return True
def _wait_for_elevator(self, timeout: float) -> bool:
"""Aguarda elevador chegar"""
start = time.time()
while (time.time() - start) < timeout:
if self._detect_open_door():
print("✅ Elevador chegou")
return True
time.sleep(0.5)
print("❌ Timeout aguardando elevador")
return False
def _wait_for_door_open(self):
"""Aguarda porta abrir"""
print("Aguardando porta abrir...")
while not self._detect_open_door():
time.sleep(0.5)
def _get_button_position(self, floor: int, direction: str) -> tuple:
"""Retorna posição 3D do botão"""
# Database de posições de botões (mapeado previamente)
button_db = {
(1, "up"): (0.5, -0.2, 1.2),
(1, "down"): (0.5, -0.2, 1.0),
(2, "up"): (0.5, -0.2, 1.2),
# ...
}
return button_db.get((floor, direction))
class MultiFloorNavigator:
"""Navegação multi-andar"""
def __init__(self, robot):
self.robot = robot
self.elevator_ctrl = ElevatorController(robot)
# Mapa de andares
self.floor_maps = {
1: "floor1_map.yaml",
2: "floor2_map.yaml",
3: "floor3_map.yaml"
}
self.current_floor = 1
def navigate_to_floor(self, target_floor: int, target_position: tuple) -> bool:
"""
Navega para posição em andar diferente
Args:
target_floor: Andar de destino
target_position: (x, y) no andar de destino
"""
print(f"Navegando: Andar {self.current_floor} → Andar {target_floor}")
if target_floor == self.current_floor:
# Mesma andar: navegação direta
return self._navigate_on_floor(target_position)
# 1. Navegar para elevador
elevator_pos = self._get_elevator_position(self.current_floor)
if not self._navigate_on_floor(elevator_pos):
return False
# 2. Chamar elevador
direction = "up" if target_floor > self.current_floor else "down"
if not self.elevator_ctrl.call_elevator(self.current_floor, direction):
return False
# 3. Entrar
if not self.elevator_ctrl.enter_elevator():
return False
# 4. Selecionar andar
if not self.elevator_ctrl.select_floor(target_floor):
return False
# 5. Aguardar movimento do elevador
time.sleep(abs(target_floor - self.current_floor) * 5.0)
# 6. Sair
if not self.elevator_ctrl.exit_elevator():
return False
# 7. Atualizar mapa
self.current_floor = target_floor
self._load_floor_map(target_floor)
# 8. Navegar para posição final
return self._navigate_on_floor(target_position)
def _navigate_on_floor(self, position: tuple) -> bool:
"""Navega no mesmo andar"""
print(f"Navegando para {position} no andar {self.current_floor}")
# self.navigator.navigate_to(position)
time.sleep(2.0)
return True
def _get_elevator_position(self, floor: int) -> tuple:
"""Retorna posição do elevador no andar"""
elevator_positions = {
1: (10.0, 5.0),
2: (10.0, 5.0),
3: (10.0, 5.0)
}
return elevator_positions.get(floor)
def _load_floor_map(self, floor: int):
"""Carrega mapa do andar"""
map_file = self.floor_maps.get(floor)
print(f"Carregando mapa: {map_file}")
# self.navigator.load_map(map_file)
✅ Checklist de Conclusão
- Implementei Finite State Machine (FSM) para delivery
- Criei Behavior Tree alternativo
- Implementei Recovery Controller com múltiplas estratégias
- Integrei controle de elevador
- Implementei Multi-Floor Navigator
- Testei navegação entre andares
🔗 Próximos Passos
Próximo Módulo
Aprenda a fazer deploy completo: systemd services, Docker, remote debugging e OTA updates.
⏱️ Tempo estimado: 80-100 min 🧠 Nível: Avançado 💻 Hands-on: 70% prático, 30% teórico