Pular para o conteúdo principal

🤖 Autonomia Completa no G1

Objetivo do Módulo

Implementar autonomia completa no G1: state machines para gerenciar tarefas complexas, decision making com behavior trees, error recovery robusto, e navegação multi-floor (elevadores, escadas).


🔀 State Machine Architecture

Finite State Machine (FSM)

#!/usr/bin/env python3
"""
autonomous_delivery.py - Delivery robot usando FSM
"""

from enum import Enum, auto
from dataclasses import dataclass
import time

class RobotState(Enum):
"""Estados do robô"""
IDLE = auto()
NAVIGATING_TO_PICKUP = auto()
PICKING_UP = auto()
NAVIGATING_TO_DELIVERY = auto()
DELIVERING = auto()
RETURNING_HOME = auto()
ERROR = auto()
CHARGING = auto()

@dataclass
class DeliveryTask:
"""Tarefa de entrega"""
pickup_location: tuple # (x, y)
delivery_location: tuple # (x, y)
object_name: str
object_width: float = 0.05

class AutonomousDeliveryFSM:
def __init__(self, robot):
self.robot = robot
self.state = RobotState.IDLE

# Componentes
self.navigator = None # Nav2 navigator
self.manipulator = None # Arm controller
self.perception = None # Object detector

# Task queue
self.tasks = []
self.current_task = None

# Estado
self.home_location = (0.0, 0.0)
self.battery_threshold = 20.0 # %

def add_task(self, task: DeliveryTask):
"""Adiciona tarefa à fila"""
self.tasks.append(task)
print(f"✅ Tarefa adicionada: {task.object_name}")

def run(self):
"""Loop principal da FSM"""
while True:
# Verificar bateria
if self._battery_low() and self.state != RobotState.CHARGING:
self._transition_to(RobotState.CHARGING)

# Processar estado atual
if self.state == RobotState.IDLE:
self._handle_idle()
elif self.state == RobotState.NAVIGATING_TO_PICKUP:
self._handle_navigating_to_pickup()
elif self.state == RobotState.PICKING_UP:
self._handle_picking_up()
elif self.state == RobotState.NAVIGATING_TO_DELIVERY:
self._handle_navigating_to_delivery()
elif self.state == RobotState.DELIVERING:
self._handle_delivering()
elif self.state == RobotState.RETURNING_HOME:
self._handle_returning_home()
elif self.state == RobotState.ERROR:
self._handle_error()
elif self.state == RobotState.CHARGING:
self._handle_charging()

time.sleep(0.1) # 10 Hz

def _transition_to(self, new_state: RobotState):
"""Transição de estado com logging"""
print(f"State: {self.state.name}{new_state.name}")
self.state = new_state

def _handle_idle(self):
"""Estado IDLE: aguarda tarefas"""
if len(self.tasks) > 0:
self.current_task = self.tasks.pop(0)
print(f"📦 Iniciando tarefa: {self.current_task.object_name}")
self._transition_to(RobotState.NAVIGATING_TO_PICKUP)

def _handle_navigating_to_pickup(self):
"""Navega para local de coleta"""
target = self.current_task.pickup_location

# Enviar goal para Nav2
# success = self.navigator.navigate_to(target)

# Simulação
print(f"Navegando para pickup: {target}")
time.sleep(2.0)
success = True

if success:
self._transition_to(RobotState.PICKING_UP)
else:
print("❌ Falha na navegação")
self._transition_to(RobotState.ERROR)

def _handle_picking_up(self):
"""Pega objeto"""
# Detectar objeto
# obj = self.perception.find_object(self.current_task.object_name)

# Pick com manipulador
# success = self.manipulator.pick_object(obj.position_3d)

# Simulação
print(f"Pegando: {self.current_task.object_name}")
time.sleep(3.0)
success = True

if success:
print("✅ Objeto pegado")
self._transition_to(RobotState.NAVIGATING_TO_DELIVERY)
else:
print("❌ Falha ao pegar")
self._transition_to(RobotState.ERROR)

def _handle_navigating_to_delivery(self):
"""Navega para local de entrega"""
target = self.current_task.delivery_location

print(f"Navegando para delivery: {target}")
time.sleep(2.0)
success = True

if success:
self._transition_to(RobotState.DELIVERING)
else:
self._transition_to(RobotState.ERROR)

def _handle_delivering(self):
"""Entrega objeto"""
# Colocar objeto
# success = self.manipulator.place_object(target_position)

print(f"Entregando: {self.current_task.object_name}")
time.sleep(2.0)
success = True

if success:
print("✅ Objeto entregue")
self.current_task = None
self._transition_to(RobotState.RETURNING_HOME)
else:
self._transition_to(RobotState.ERROR)

def _handle_returning_home(self):
"""Retorna para home"""
print(f"Retornando para home: {self.home_location}")
time.sleep(2.0)

self._transition_to(RobotState.IDLE)

def _handle_error(self):
"""Estado de erro: tentar recovery"""
print("⚠️ Estado de ERRO - tentando recovery")

# Recovery strategies:
# 1. Retentar última operação
# 2. Voltar para IDLE
# 3. Chamar operador humano

time.sleep(5.0)

# Por enquanto: voltar para IDLE
self.current_task = None
self._transition_to(RobotState.IDLE)

def _handle_charging(self):
"""Carregando bateria"""
print("🔋 Carregando... (aguardando bateria > 80%)")

# Navegar para estação de carga
# self.navigator.navigate_to(charging_station_location)

# Aguardar carga
while self._battery_low():
time.sleep(1.0)

print("✅ Bateria carregada")
self._transition_to(RobotState.IDLE)

def _battery_low(self) -> bool:
"""Verifica se bateria está baixa"""
state = self.robot.get_state()
return state.battery_percentage < self.battery_threshold

# Uso:
def main():
robot = G1Robot()
robot.wait_for_connection()

fsm = AutonomousDeliveryFSM(robot)

# Adicionar tarefas
fsm.add_task(DeliveryTask(
pickup_location=(5.0, 2.0),
delivery_location=(10.0, 8.0),
object_name="package_1"
))
fsm.add_task(DeliveryTask(
pickup_location=(3.0, 1.0),
delivery_location=(7.0, 5.0),
object_name="package_2"
))

# Rodar FSM
fsm.run()

if __name__ == "__main__":
main()

🌳 Behavior Trees

Vantagens sobre FSM:

  • ✅ Modular (nodes reutilizáveis)
  • ✅ Hierárquico (sub-trees)
  • ✅ Fácil de visualizar e debugar
#!/usr/bin/env python3
"""
behavior_tree_delivery.py - Delivery usando Behavior Tree
"""

from py_trees import behaviour, composites, blackboard
from py_trees.common import Status
import time

class NavigateToLocation(behaviour.Behaviour):
"""Navega para localização"""
def __init__(self, name, target_key):
super().__init__(name)
self.target_key = target_key
self.blackboard = self.attach_blackboard_client()
self.blackboard.register_key(target_key, access=blackboard.Access.READ)

def update(self):
target = self.blackboard.get(self.target_key)
print(f"Navegando para: {target}")

# Simular navegação
time.sleep(0.5)

# Em produção: chamar Nav2 e verificar resultado
return Status.SUCCESS

class PickObject(behaviour.Behaviour):
"""Pega objeto"""
def __init__(self, name):
super().__init__(name)

def update(self):
print("Pegando objeto...")
time.sleep(0.5)
return Status.SUCCESS

class PlaceObject(behaviour.Behaviour):
"""Coloca objeto"""
def __init__(self, name):
super().__init__(name)

def update(self):
print("Colocando objeto...")
time.sleep(0.5)
return Status.SUCCESS

class CheckBattery(behaviour.Behaviour):
"""Verifica bateria"""
def __init__(self, name, threshold=20.0):
super().__init__(name)
self.threshold = threshold

def update(self):
# Simular leitura de bateria
battery = 85.0

if battery > self.threshold:
return Status.SUCCESS
else:
print(f"⚠️ Bateria baixa: {battery}%")
return Status.FAILURE

def create_delivery_behavior_tree():
"""Cria behavior tree para delivery"""
# Root: Sequence (executa filhos em ordem)
root = composites.Sequence(name="DeliveryTask", memory=False)

# 1. Verificar bateria
check_battery = CheckBattery(name="CheckBattery")

# 2. Navegar para pickup
nav_to_pickup = NavigateToLocation(name="NavToPickup", target_key="pickup_location")

# 3. Pegar objeto
pick = PickObject(name="PickObject")

# 4. Navegar para delivery
nav_to_delivery = NavigateToLocation(name="NavToDelivery", target_key="delivery_location")

# 5. Entregar objeto
place = PlaceObject(name="PlaceObject")

# 6. Retornar para home
nav_to_home = NavigateToLocation(name="NavToHome", target_key="home_location")

# Adicionar filhos ao root
root.add_children([
check_battery,
nav_to_pickup,
pick,
nav_to_delivery,
place,
nav_to_home
])

return root

def main():
# Criar behavior tree
tree = create_delivery_behavior_tree()

# Setup blackboard (memória compartilhada)
bb = blackboard.Client()
bb.register_key("pickup_location", access=blackboard.Access.WRITE)
bb.register_key("delivery_location", access=blackboard.Access.WRITE)
bb.register_key("home_location", access=blackboard.Access.WRITE)

bb.set("pickup_location", (5.0, 2.0))
bb.set("delivery_location", (10.0, 8.0))
bb.set("home_location", (0.0, 0.0))

# Executar tree
tree.setup_with_descendants()

while True:
tree.tick_once()

if tree.status == Status.SUCCESS:
print("✅ Tarefa concluída!")
break
elif tree.status == Status.FAILURE:
print("❌ Tarefa falhou")
break

time.sleep(0.1)

if __name__ == "__main__":
main()

🛡️ Error Recovery

Recovery Behaviors

class RecoveryController:
"""Gerencia recovery de erros"""
def __init__(self, robot):
self.robot = robot

# Histórico de erros
self.error_history = []

# Estratégias de recovery
self.recovery_strategies = {
"navigation_failed": self.recover_navigation,
"grasp_failed": self.recover_grasp,
"fall_detected": self.recover_fall,
"battery_critical": self.recover_battery,
"sensor_failure": self.recover_sensor
}

def handle_error(self, error_type: str) -> bool:
"""
Tenta recover de erro

Returns:
True se recovery bem-sucedido
"""
print(f"⚠️ Erro detectado: {error_type}")

self.error_history.append({
"type": error_type,
"timestamp": time.time()
})

# Verificar se erro recorrente (> 3x em 1 min)
recent_errors = [e for e in self.error_history
if time.time() - e["timestamp"] < 60]
if len(recent_errors) > 3:
print("❌ Erro recorrente - requer intervenção humana")
return False

# Executar estratégia de recovery
strategy = self.recovery_strategies.get(error_type)

if strategy is None:
print(f"❌ Sem estratégia de recovery para: {error_type}")
return False

return strategy()

def recover_navigation(self) -> bool:
"""Recovery de falha de navegação"""
print("Tentando recovery de navegação...")

# 1. Parar movimento
self.robot.emergency_stop()
time.sleep(0.5)

# 2. Limpar costmap
# self.navigator.clear_costmap()

# 3. Recomputar caminho
# success = self.navigator.replan()

# Simulação
success = True

if success:
print("✅ Navegação recuperada")
else:
print("❌ Recovery falhou - backing up")
self._back_up(distance=0.5)

return success

def recover_grasp(self) -> bool:
"""Recovery de falha de grasp"""
print("Tentando recovery de grasp...")

# 1. Abrir gripper
# self.manipulator.gripper.open()

# 2. Retrair braço
# self.manipulator.move_to_home()

# 3. Re-detectar objeto
# obj = self.perception.find_object(target_name)

# 4. Tentar novamente com posição ajustada
# success = self.manipulator.pick_object(obj.position_3d + offset)

success = True
return success

def recover_fall(self) -> bool:
"""Recovery de queda"""
print("⚠️ QUEDA DETECTADA - Tentando levantar")

# 1. Emergency stop
self.robot.emergency_stop()

# 2. Verificar orientação
state = self.robot.get_state()
roll, pitch, yaw = state.imu_euler

if abs(roll) > 1.0 or abs(pitch) > 1.0: # > 57°
print("❌ Robô caído - requer ajuda humana")
return False

# 3. Tentar levantar (stand up sequence)
# self._stand_up_sequence()

return True

def recover_battery(self) -> bool:
"""Recovery de bateria crítica"""
print("🔋 Bateria crítica - navegando para estação")

# Navegar para estação de carga mais próxima
# charging_station = self.find_nearest_charging_station()
# self.navigator.navigate_to(charging_station)

return True

def recover_sensor(self) -> bool:
"""Recovery de falha de sensor"""
print("Sensor failure - tentando reiniciar")

# Tentar reiniciar sensor
# self.sensor.restart()

time.sleep(2.0)

# Verificar se voltou
# if self.sensor.is_alive():
# return True

return False

def _back_up(self, distance: float):
"""Move robô para trás"""
print(f"Backing up {distance}m...")

# Comando de velocidade negativa
# cmd = MotionCommand()
# cmd.vx = -0.2
# duration = distance / 0.2
# ...

time.sleep(distance / 0.2)

🏢 Multi-Floor Navigation

Elevator Integration

class ElevatorController:
"""Controla integração com elevador"""
def __init__(self, robot):
self.robot = robot

def call_elevator(self, floor: int, direction: str) -> bool:
"""
Chama elevador

Args:
floor: Andar atual
direction: "up" ou "down"

Returns:
True se elevador chegou
"""
print(f"Chamando elevador no andar {floor} ({direction})")

# Integração via API do prédio ou button pressing
# Option 1: API
# elevator_api.call(floor, direction)

# Option 2: Pressionar botão fisicamente
button_pos = self._get_button_position(floor, direction)
if button_pos is not None:
self._press_button(button_pos)

# Aguardar elevador
return self._wait_for_elevator(timeout=60.0)

def enter_elevator(self) -> bool:
"""Entra no elevador"""
print("Entrando no elevador...")

# Detectar porta aberta (depth camera)
door_open = self._detect_open_door()

if not door_open:
print("❌ Porta do elevador fechada")
return False

# Navegar para dentro
# self.navigator.navigate_to(inside_elevator_position)

# Simulação
time.sleep(2.0)

print("✅ Dentro do elevador")
return True

def select_floor(self, target_floor: int) -> bool:
"""Seleciona andar no painel"""
print(f"Selecionando andar {target_floor}")

# Detectar painel
# panel = self.perception.detect_elevator_panel()

# Detectar botão do andar
# button = panel.get_button(target_floor)

# Pressionar botão
# self._press_button(button.position_3d)

return True

def exit_elevator(self) -> bool:
"""Sai do elevador"""
print("Saindo do elevador...")

# Aguardar porta abrir
self._wait_for_door_open()

# Navegar para fora
# self.navigator.navigate_to(outside_elevator_position)

time.sleep(2.0)

print("✅ Fora do elevador")
return True

def _press_button(self, position_3d: tuple):
"""Pressiona botão fisicamente"""
print(f"Pressionando botão em {position_3d}")

# Usar braço para pressionar
# self.manipulator.move_to_position(position_3d)
# self.manipulator.push(force=10.0)

time.sleep(1.0)

def _detect_open_door(self) -> bool:
"""Detecta porta aberta com depth camera"""
# Verificar se há espaço livre na frente (> 2m)
# depth = self.perception.get_depth_at_center()
# return depth > 2.0

return True

def _wait_for_elevator(self, timeout: float) -> bool:
"""Aguarda elevador chegar"""
start = time.time()

while (time.time() - start) < timeout:
if self._detect_open_door():
print("✅ Elevador chegou")
return True

time.sleep(0.5)

print("❌ Timeout aguardando elevador")
return False

def _wait_for_door_open(self):
"""Aguarda porta abrir"""
print("Aguardando porta abrir...")

while not self._detect_open_door():
time.sleep(0.5)

def _get_button_position(self, floor: int, direction: str) -> tuple:
"""Retorna posição 3D do botão"""
# Database de posições de botões (mapeado previamente)
button_db = {
(1, "up"): (0.5, -0.2, 1.2),
(1, "down"): (0.5, -0.2, 1.0),
(2, "up"): (0.5, -0.2, 1.2),
# ...
}

return button_db.get((floor, direction))

class MultiFloorNavigator:
"""Navegação multi-andar"""
def __init__(self, robot):
self.robot = robot
self.elevator_ctrl = ElevatorController(robot)

# Mapa de andares
self.floor_maps = {
1: "floor1_map.yaml",
2: "floor2_map.yaml",
3: "floor3_map.yaml"
}

self.current_floor = 1

def navigate_to_floor(self, target_floor: int, target_position: tuple) -> bool:
"""
Navega para posição em andar diferente

Args:
target_floor: Andar de destino
target_position: (x, y) no andar de destino
"""
print(f"Navegando: Andar {self.current_floor} → Andar {target_floor}")

if target_floor == self.current_floor:
# Mesma andar: navegação direta
return self._navigate_on_floor(target_position)

# 1. Navegar para elevador
elevator_pos = self._get_elevator_position(self.current_floor)
if not self._navigate_on_floor(elevator_pos):
return False

# 2. Chamar elevador
direction = "up" if target_floor > self.current_floor else "down"
if not self.elevator_ctrl.call_elevator(self.current_floor, direction):
return False

# 3. Entrar
if not self.elevator_ctrl.enter_elevator():
return False

# 4. Selecionar andar
if not self.elevator_ctrl.select_floor(target_floor):
return False

# 5. Aguardar movimento do elevador
time.sleep(abs(target_floor - self.current_floor) * 5.0)

# 6. Sair
if not self.elevator_ctrl.exit_elevator():
return False

# 7. Atualizar mapa
self.current_floor = target_floor
self._load_floor_map(target_floor)

# 8. Navegar para posição final
return self._navigate_on_floor(target_position)

def _navigate_on_floor(self, position: tuple) -> bool:
"""Navega no mesmo andar"""
print(f"Navegando para {position} no andar {self.current_floor}")
# self.navigator.navigate_to(position)
time.sleep(2.0)
return True

def _get_elevator_position(self, floor: int) -> tuple:
"""Retorna posição do elevador no andar"""
elevator_positions = {
1: (10.0, 5.0),
2: (10.0, 5.0),
3: (10.0, 5.0)
}
return elevator_positions.get(floor)

def _load_floor_map(self, floor: int):
"""Carrega mapa do andar"""
map_file = self.floor_maps.get(floor)
print(f"Carregando mapa: {map_file}")
# self.navigator.load_map(map_file)

✅ Checklist de Conclusão

  • Implementei Finite State Machine (FSM) para delivery
  • Criei Behavior Tree alternativo
  • Implementei Recovery Controller com múltiplas estratégias
  • Integrei controle de elevador
  • Implementei Multi-Floor Navigator
  • Testei navegação entre andares

🔗 Próximos Passos

Próximo Módulo

🚀 Deploy de Projetos no G1 →

Aprenda a fazer deploy completo: systemd services, Docker, remote debugging e OTA updates.


⏱️ Tempo estimado: 80-100 min 🧠 Nível: Avançado 💻 Hands-on: 70% prático, 30% teórico