Pular para o conteúdo principal

📡 Comunicação DDS com Unitree G1

Objetivo do Módulo

Dominar o Cyclone DDS usado pelo G1, entender os topics publicados/subscritos, configurar QoS para performance ótima, e debugar problemas de rede em tempo real.


🌐 O Que é DDS?

Data Distribution Service Explained

DDS (Data Distribution Service) é um middleware de comunicação publish-subscribe usado para sistemas distribuídos em tempo real. Pense nele como "MQTT para robótica crítica".

Por que Unitree escolheu DDS?

  • Latência: < 1ms entre publisher e subscriber (mesma rede)
  • 🔄 Descoberta automática: Nodes se encontram sem servidor central
  • 📦 Type-safe: Dados estruturados com IDL (Interface Definition Language)
  • 🛡️ QoS configurável: Confiabilidade vs performance trade-off
  • 🌐 Multicast: Um publisher → múltiplos subscribers eficientemente

Comparação com alternativas:

FeatureDDS (Cyclone)ROS2 (usa DDS)gRPCWebSockets
Latência< 1ms1-5ms5-20ms10-50ms
DescobertaAutomáticaAutomáticaManual (IP:port)Manual
Multicast✅ Sim✅ Sim❌ Não❌ Não
Realtime✅ Hard RT🟡 Soft RT❌ Não❌ Não
OverheadBaixo (~100 bytes)Médio (~200 bytes)Alto (~1KB)Médio

🏗️ Arquitetura DDS do G1

Componentes

┌─────────────────────────────────────────────────────┐
│ Seu Aplicativo Python/C++ │
├─────────────────────────────────────────────────────┤
│ unitree_sdk2_python (wrapper) │
├─────────────────────────────────────────────────────┤
│ Cyclone DDS (implementação do padrão DDS) │
│ - Publishers (envia dados) │
│ - Subscribers (recebe dados) │
│ - Topics (canais de comunicação) │
├─────────────────────────────────────────────────────┤
│ Network Stack (UDP multicast) │
│ - Interface: wlan0 ou eth0 │
│ - Porta: 7400-7450 (DDS padrão) │
└─────────────────────────────────────────────────────┘

Fluxo de Dados

G1 Robot                           Your PC
┌──────────────┐ ┌──────────────┐
│ │ Topic: /state │ │
│ Publisher ├──────────────────>│ Subscriber │
│ (100 Hz) │ UDP Multicast │ (callback) │
│ │ │ │
└──────────────┘ └──────────────┘

┌──────────────┐ ┌──────────────┐
│ │ Topic: /cmd │ │
│ Subscriber │<──────────────────┤ Publisher │
│ (executa) │ UDP Unicast │ (envia cmd) │
│ │ │ │
└──────────────┘ └──────────────┘

Descoberta automática:

  1. Ambos os lados enviam mensagens SPDP (Simple Participant Discovery Protocol)
  2. Quando descobrem, trocam mensagens SEDP (Simple Endpoint Discovery Protocol)
  3. Estabelecem conexão direta para cada topic matchado
  4. Dados começam a fluir sem intervenção manual

📋 Topics do Unitree G1

Topics Publicados pelo G1 (você subscreve)

Descrição: Estado completo do robô (juntas, IMU, forças, bateria)

Frequência: 100 Hz (10ms)

Tipo de dados: unitree_go.msg.dds_.RobotState_

Estrutura:

# Definição IDL (simplificada)
struct RobotState {
# Header
int64 timestamp; # Unix epoch nanoseconds
uint32 sequence_id; # Incrementa a cada mensagem

# Sistema
float battery_voltage;
float battery_percentage;
float cpu_temperature;
string control_mode;

# Juntas (arrays de tamanho dof)
sequence<float> joint_positions; # [rad]
sequence<float> joint_velocities; # [rad/s]
sequence<float> joint_torques; # [Nm]
sequence<float> joint_temperatures; # [°C]

# IMU
Quaternion imu_quaternion;
Vector3 imu_angular_velocity;
Vector3 imu_linear_acceleration;

# Forças
float left_foot_force_z;
float right_foot_force_z;
Vector3 left_foot_torque;
Vector3 right_foot_torque;
};

Exemplo de subscrição:

from unitree_sdk2_python import Subscriber, RobotState

def state_callback(msg: RobotState):
"""Callback executado a cada mensagem (100 Hz)"""
print(f"Bateria: {msg.battery_percentage:.1f}%")
print(f"IMU Pitch: {msg.imu_euler[1] * 57.3:.1f}°")

# Criar subscriber
sub = Subscriber(topic="/robot_state", msg_type=RobotState, callback=state_callback)
sub.spin() # Bloqueia, processa callbacks

QoS padrão: RELIABLE, KEEP_LAST (depth=1)


Topics que o G1 Subscreve (você publica)

Descrição: Comandos para juntas individuais

Tipo de dados: unitree_go.msg.dds_.JointCommand_

Estrutura:

struct JointCommand {
int64 timestamp;
string joint_name; # "left_knee", etc.

# Modo POSITION
float target_position; # [rad]
float max_velocity; # [rad/s] limite
float kp; # Ganho proporcional (opcional)

# Modo VELOCITY
float target_velocity; # [rad/s]

# Modo TORQUE
float target_torque; # [Nm]
float kd; # Ganho derivativo (optional)

# Geral
string control_mode; # "POSITION", "VELOCITY", "TORQUE"
};

Exemplo de publicação:

from unitree_sdk2_python import Publisher, JointCommand
import time

# Criar publisher
pub = Publisher(topic="/joint_command")

# Criar mensagem
cmd = JointCommand()
cmd.timestamp = int(time.time() * 1e9) # Nanoseconds
cmd.joint_name = "left_knee"
cmd.control_mode = "POSITION"
cmd.target_position = 1.57 # 90 graus
cmd.max_velocity = 1.0 # Máximo 1 rad/s

# Publicar
pub.publish(cmd)

QoS: RELIABLE, KEEP_LAST (depth=10) - garante entrega


⚙️ Configuração do Cyclone DDS

Arquivo de Configuração XML

O Cyclone DDS é configurado via arquivo XML. Localização padrão: /etc/cyclonedds/config.xml

Configuração básica (performance):

<?xml version="1.0" encoding="UTF-8"?>
<CycloneDDS xmlns="https://cdds.io/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<Domain id="0">
<General>
<!-- Interface de rede (auto-detect ou específica) -->
<NetworkInterfaceAddress>auto</NetworkInterfaceAddress>

<!-- Permitir multicast (descoberta automática) -->
<AllowMulticast>true</AllowMulticast>

<!-- Fragmentação para mensagens grandes (imagens) -->
<FragmentSize>4096</FragmentSize>
</General>

<Internal>
<!-- Thread scheduling (realtime) -->
<Watermarks>
<WhcHigh>500kB</WhcHigh>
</Watermarks>
</Internal>

<Discovery>
<!-- Descoberta de participantes -->
<ParticipantIndex>auto</ParticipantIndex>

<!-- Intervalo de heartbeats -->
<SPDPInterval>1s</SPDPInterval>
</Discovery>

<Tracing>
<!-- Logging (desabilitar em produção) -->
<Verbosity>warning</Verbosity>
<OutputFile>/tmp/cyclonedds.log</OutputFile>
</Tracing>
</Domain>
</CycloneDDS>

Aplicar configuração:

# Temporário (apenas sessão atual)
export CYCLONEDDS_URI="file:///etc/cyclonedds/config.xml"

# Permanente (adicionar ao ~/.bashrc)
echo 'export CYCLONEDDS_URI="file:///etc/cyclonedds/config.xml"' >> ~/.bashrc
source ~/.bashrc

Otimizações de Performance

Cenário: Controle em tempo real (< 1ms)

<CycloneDDS>
<Domain id="0">
<General>
<NetworkInterfaceAddress>eth0</NetworkInterfaceAddress>
<AllowMulticast>true</AllowMulticast>
<FragmentSize>1472</FragmentSize> <!-- MTU ethernet - headers -->
</General>

<Internal>
<!-- Processar pacotes imediatamente -->
<DeliveryQueueMaxSamples>1</DeliveryQueueMaxSamples>

<!-- Prioridade máxima para threads -->
<SchedulingClass>Realtime</SchedulingClass>
<SchedulingPriority>80</SchedulingPriority>
</Internal>

<Discovery>
<SPDPInterval>100ms</SPDPInterval> <!-- Descoberta rápida -->
</Discovery>
</Domain>
</CycloneDDS>

Requer: Kernel RT-PREEMPT, permissões CAP_SYS_NICE

# Instalar kernel realtime
sudo apt install linux-lowlatency

# Permitir scheduling RT
sudo setcap cap_sys_nice+ep $(which python3)

🔍 Network Debugging

Ferramentas de Diagnóstico

Instalação:

sudo apt install cyclonedds-tools

Medir latência:

# Terminal 1: Subscriber (no seu PC)
ddsperf sub

# Terminal 2: Publisher (no G1 ou outro PC)
ddsperf pub -D 10 # 10 segundos

# Output:
# Latency (us): min=120, median=150, 99%=300, max=1200

Medir throughput:

# Publisher com payloads grandes
ddsperf pub -s 1MB -D 10

# Output:
# Throughput: 850 Mbps (average)

🚨 Troubleshooting Comum

Problema 1: Participantes não se descobrem

Sintoma:

cyclonedds ls
# Output: (vazio)

Checklist:

  1. Mesma rede?

    ping 192.168.123.10
    # Deve responder
  2. Firewall bloqueando?

    # Abrir portas DDS (7400-7450)
    sudo ufw allow 7400:7450/udp
  3. Interface correta?

    # Verificar qual interface está ativa
    ip route get 192.168.123.10
    # Output: ... dev wlan0 ...

    # Forçar interface no XML
    <NetworkInterfaceAddress>wlan0</NetworkInterfaceAddress>
  4. Multicast habilitado?

    # Testar multicast
    iperf3 -c 239.255.0.1 -u -T 32 -t 5 -b 10M

    # Se falhar, desabilitar multicast
    <AllowMulticast>false</AllowMulticast>

Problema 2: Latência alta (> 10ms)

Diagnóstico:

ddsperf pub -D 5 & ddsperf sub

Se latência > 10ms:

  1. WiFi congestionado?

    # Verificar sinal
    iwconfig wlan0
    # Link Quality: 60/70 (bom), 30/70 (ruim)

    # Trocar para Ethernet ou canal WiFi menos congestionado
  2. CPU sobrecarregado?

    top
    # Se python3 > 80% CPU, otimizar código (menos callbacks)
  3. QoS mal configurado?

    # Usar BEST_EFFORT para latência mínima (aceitável perder msgs)
    qos = QoSProfile(reliability="BEST_EFFORT")

Problema 3: Mensagens perdidas

Sintoma: dds_monitor.py mostra Lost: 150

Soluções:

  1. WiFi instável → Ethernet

  2. Buffer overflow:

    <Watermarks>
    <WhcHigh>10MB</WhcHigh> <!-- Aumentar buffer -->
    </Watermarks>
  3. QoS RELIABLE:

    qos = QoSProfile(reliability="RELIABLE")
    pub = Publisher("/joint_command", qos=qos)

📈 Exemplo Completo: Telemetria em Tempo Real

#!/usr/bin/env python3
"""
telemetry.py - Sistema completo de telemetria DDS
Subscreve múltiplos topics e consolida dados
"""

from unitree_sdk2_python import Subscriber, RobotState
from dataclasses import dataclass, field
from typing import Dict
import time
import threading

@dataclass
class TelemetryData:
"""Dados consolidados de todos os topics"""
timestamp: float = 0.0
battery_voltage: float = 0.0
battery_percentage: float = 0.0
cpu_temperature: float = 0.0
imu_pitch: float = 0.0
imu_roll: float = 0.0
left_foot_force: float = 0.0
right_foot_force: float = 0.0
joint_positions: Dict[str, float] = field(default_factory=dict)

class TelemetrySystem:
def __init__(self):
self.data = TelemetryData()
self.lock = threading.Lock()

# Subscribers
self.state_sub = Subscriber("/robot_state", callback=self.state_callback)

# Estatísticas
self.msg_count = 0
self.start_time = time.time()

def state_callback(self, msg: RobotState):
"""Processa mensagens de estado"""
with self.lock:
self.data.timestamp = time.time()
self.data.battery_voltage = msg.battery_voltage
self.data.battery_percentage = msg.battery_percentage
self.data.cpu_temperature = msg.cpu_temperature
self.data.imu_pitch = msg.imu_euler[1]
self.data.imu_roll = msg.imu_euler[0]
self.data.left_foot_force = msg.left_foot_force
self.data.right_foot_force = msg.right_foot_force

# Atualizar posições das juntas
for name, pos in zip(msg.joint_names, msg.joint_positions):
self.data.joint_positions[name] = pos

self.msg_count += 1

def get_data(self) -> TelemetryData:
"""Retorna cópia thread-safe dos dados"""
with self.lock:
return self.data

def get_statistics(self) -> Dict:
"""Retorna estatísticas de comunicação"""
elapsed = time.time() - self.start_time
freq = self.msg_count / elapsed if elapsed > 0 else 0

return {
"messages_received": self.msg_count,
"uptime_seconds": elapsed,
"average_frequency_hz": freq
}

def print_dashboard(self):
"""Imprime dashboard de telemetria"""
while True:
data = self.get_data()
stats = self.get_statistics()

# Clear screen (ANSI escape code)
print("\033[2J\033[H")

print("=" * 70)
print(" " * 20 + "UNITREE G1 TELEMETRY")
print("=" * 70)
print()

print(f"⏱️ Uptime: {stats['uptime_seconds']:.1f}s | "
f"Freq: {stats['average_frequency_hz']:.1f} Hz | "
f"Msgs: {stats['messages_received']}")
print()

print("🔋 BATERIA")
print(f" Voltage: {data.battery_voltage:>6.2f} V")
print(f" Percentage: {data.battery_percentage:>6.1f} %")
print()

print("🌡️ TEMPERATURA")
print(f" CPU: {data.cpu_temperature:>6.1f} °C")
print()

print("🧭 ORIENTAÇÃO")
print(f" Pitch: {data.imu_pitch * 57.3:>6.1f} °")
print(f" Roll: {data.imu_roll * 57.3:>6.1f} °")
print()

print("👣 FORÇAS NOS PÉS")
print(f" Esquerdo: {data.left_foot_force:>6.1f} N")
print(f" Direito: {data.right_foot_force:>6.1f} N")
print(f" Total: {data.left_foot_force + data.right_foot_force:>6.1f} N")
print()

print("🦿 JUNTAS (primeiras 6)")
for i, (name, pos) in enumerate(list(data.joint_positions.items())[:6]):
print(f" {name:<20} {pos:>7.3f} rad ({pos*57.3:>6.1f}°)")

print()
print("=" * 70)

time.sleep(0.1) # 10 Hz

def run(self):
"""Inicia sistema de telemetria"""
# Thread para dashboard
dashboard_thread = threading.Thread(target=self.print_dashboard, daemon=True)
dashboard_thread.start()

# Thread principal processa callbacks
print("🚀 Sistema de telemetria iniciado...")
self.state_sub.spin()

if __name__ == "__main__":
telemetry = TelemetrySystem()
try:
telemetry.run()
except KeyboardInterrupt:
print("\n\n⚠️ Telemetria interrompida")

Executar:

python3 telemetry.py

✅ Checklist de Conclusão

  • Entendi o que é DDS e por que é usado no G1
  • Conheço todos os topics principais (/robot_state, /joint_command, etc.)
  • Configurei Cyclone DDS via XML
  • Testei ddsperf (latência < 5ms)
  • Criei subscriber para /robot_state
  • Criei publisher para /joint_command
  • Executei telemetry.py com sucesso
  • Debuguei comunicação com cyclonedds CLI ou Wireshark

🔗 Próximos Passos

Próximo Módulo

⚡ Controle de Baixo Nível →

Aprenda controle direto de motores: torque mode, position mode, PID tuning e safety limits.

Recursos adicionais:


⏱️ Tempo estimado: 60-80 min 🧠 Nível: Intermediário-Avançado 💻 Hands-on: 70% prático, 30% teórico