import asyncio
import json
import websockets
# 替换为你的实际 Hyperliquid 钱包地址
USER_ADDRESS = "0x你的钱包地址" # 示例: "0x123abc..."
WS_URL = "wss://api.hyperliquid.xyz/ws" # 主网 WebSocket 地址(测试网用 wss://api.hyperliquid-testnet.xyz/ws)
async def monitor_user_twap_slice_fills():
async with websockets.connect(WS_URL) as ws:
# 订阅 userTwapSliceFills 频道
sub_msg = {
"method": "subscribe",
"subscription": {
"type": "userTwapSliceFills",
"user": USER_ADDRESS
}
}
await ws.send(json.dumps(sub_msg))
print(f"已订阅 userTwapSliceFills for user: {USER_ADDRESS}")
async for msg in ws:
data = json.loads(msg)
channel = data.get("channel")
if channel == "userTwapSliceFills":
is_snapshot = data.get("data", {}).get("isSnapshot", False)
twap_slice_fills = data.get("data", {}).get("twapSliceFills", [])
if is_snapshot:
print("收到初始快照 (Snapshot):")
else:
print("收到增量更新:")
for slice_fill in twap_slice_fills:
fill = slice_fill.get("fill", {})
twap_id = slice_fill.get("twapId", "unknown")
# 打印核心信息
print(f" TWAP ID: {twap_id}")
print(f" Coin: {fill.get('coin')}")
print(f" Side: {fill.get('side')} (B=Buy/Long, S=Sell/Short)")
print(f" Size (sz): {fill.get('sz')}")
print(f" Price (px): {fill.get('px')}")
print(f" Time: {fill.get('time')} (ms timestamp)")
print(f" Hash: {fill.get('hash')}")
print(f" Fee: {fill.get('fee')} {fill.get('feeToken')}")
print(f" Is Taker: {fill.get('isTaker')}")
print(f" Liquidation: {fill.get('liq', False)}")
print("---") # 分隔每个 slice fill
# 运行异步函数
asyncio.run(monitor_user_twap_slice_fills())