ipad当x和z键玩osu #无声打osu#没磁轴怎么打osu
下载 .NET (Linux、macOS 和 Windows) | .NET
dotnet还行
构建:f5
运行:dotnet run --project osu.Desktop -c Debug
deepwiki就是nb
uinput是ubuntu的我现在没法调试,放着
import asyncio
import json
import time
import uinput
import websockets
from quart import Quart, websocket
app = Quart(__name__)
# 支持的按键映射 - 专注于 x 和 z
SUPPORTED_KEYS = {
'x': uinput.KEY_X,
'z': uinput.KEY_Z
}
# 存储按键状态
key_states = {
'x': False,
'z': False
}
# 初始化 uinput 设备
events = [uinput.KEY_X, uinput.KEY_Z]
device = uinput.Device(events)
print("Direct input device initialized")
# 存储活跃的 WebSocket 连接
active_connections = set()
# WebSocket 连接处理
@app.websocket("/ws")
async def ws():
connection = websockets.WebSocketServerProtocol
active_connections.add(connection)
try:
while True:
data = await websocket.receive()
await process_message(connection, data)
except asyncio.CancelledError:
pass
finally:
active_connections.remove(connection)
# 处理接收到的 WebSocket 消息
async def process_message(connection, message):
try:
data = json.loads(message)
action = data.get('action')
key = data.get('key', '').lower()
if key not in SUPPORTED_KEYS:
await connection.send(json.dumps({
"status": "error",
"message": f"Unsupported key: {key}"
}))
return
start_time = time.time()
if action == 'press_down':
if not key_states[key]:
key_states[key] = True
device.emit(SUPPORTED_KEYS[key], 1) # 1 = key down
print(f"[Direct Input] Key Down: {key}")
latency = (time.time() - start_time) * 1000
await connection.send(json.dumps({
"status": "success",
"action": "key_down",
"key": key,
"latency": latency
}))
else:
await connection.send(json.dumps({
"status": "ignored",
"message": f"Key {key} already pressed"
}))
elif action == 'press_up':
if key_states[key]:
key_states[key] = False
device.emit(SUPPORTED_KEYS[key], 0) # 0 = key up
print(f"[Direct Input] Key Up: {key}")
latency = (time.time() - start_time) * 1000
await connection.send(json.dumps({
"status": "success",
"action": "key_up",
"key": key,
"latency": latency
}))
else:
await connection.send(json.dumps({
"status": "ignored",
"message": f"Key {key} already released"
}))
elif action == 'press':
device.emit(SUPPORTED_KEYS[key], 1) # Key down
await asyncio.sleep(0.05) # Short delay
device.emit(SUPPORTED_KEYS[key], 0) # Key up
print(f"[Direct Input] Key Press: {key}")
latency = (time.time() - start_time) * 1000
await connection.send(json.dumps({
"status": "success",
"action": "key_press",
"key": key,
"latency": latency
}))
elif action == 'get_state':
await connection.send(json.dumps({
"status": "success",
"key_states": key_states
}))
elif action == 'reset_all':
for k in SUPPORTED_KEYS:
if key_states[k]:
key_states[k] = False
device.emit(SUPPORTED_KEYS[k], 0) # Key up
await connection.send(json.dumps({
"status": "success",
"message": "All keys reset"
}))
else:
await connection.send(json.dumps({
"status": "error",
"message": f"Unknown action: {action}"
}))
except json.JSONDecodeError:
await connection.send(json.dumps({
"status": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
await connection.send(json.dumps({
"status": "error",
"message": str(e)
}))
# 广播按键状态给所有连接
async def broadcast_key_states():
if active_connections:
message = json.dumps({
"type": "state_update",
"key_states": key_states
})
await asyncio.gather(*[conn.send(message) for conn in active_connections])
# 定期广播按键状态
@app.before_serving
async def setup_periodic_tasks():
app.broadcast_task = asyncio.create_task(periodic_broadcast())
async def periodic_broadcast():
while True:
await broadcast_key_states()
await asyncio.sleep(1) # 每秒广播一次
@app.after_serving
async def cleanup():
app.broadcast_task.cancel()
await app.broadcast_task
# HTTP 路由用于健康检查
@app.route("/health")
async def health_check():
return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}
if __name__ == "__main__":
print("Starting WebSocket server at ws://0.0.0.0:5000/ws")
print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")
app.run(host="0.0.0.0", port=5000)
可以滑条了可是按下抬起有问题
import asyncio
import json
import time
from pynput.keyboard import Controller, Key
from quart import Quart, websocket
app = Quart(__name__)
keyboard = Controller()
# 支持的按键映射 - 专注于 x 和 z
SUPPORTED_KEYS = {
'x': 'x',
'z': 'z'
}
# 存储按键状态
key_states = {
'x': False,
'z': False
}
# 存储活跃的 WebSocket 连接
active_connections = set()
# WebSocket 连接处理
@app.websocket("/ws")
async def ws():
# 将当前的websocket对象添加到活跃连接中
active_connections.add(websocket._get_current_object())
try:
while True:
data = await websocket.receive()
await process_message(data)
except asyncio.CancelledError:
pass
finally:
active_connections.remove(websocket._get_current_object())
# 处理接收到的 WebSocket 消息
async def process_message(message):
try:
data = json.loads(message)
action = data.get('action')
key = data.get('key', '').lower()
if key not in SUPPORTED_KEYS:
await websocket.send(json.dumps({
"status": "error",
"message": f"Unsupported key: {key}"
}))
return
start_time = time.time()
if action == 'press_down':
if not key_states[key]:
key_states[key] = True
await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Down: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_down",
"key": key,
"latency": latency
}))
else:
await websocket.send(json.dumps({
"status": "ignored",
"message": f"Key {key} already pressed"
}))
elif action == 'press_up':
if key_states[key]:
key_states[key] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Up: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_up",
"key": key,
"latency": latency
}))
else:
await websocket.send(json.dumps({
"status": "ignored",
"message": f"Key {key} already released"
}))
elif action == 'press':
await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])
await asyncio.sleep(0.05) # 短暂延迟,模拟真实按键
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Press: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_press",
"key": key,
"latency": latency
}))
elif action == 'get_state':
await websocket.send(json.dumps({
"status": "success",
"key_states": key_states
}))
elif action == 'reset_all':
for k in SUPPORTED_KEYS:
if key_states[k]:
key_states[k] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[k])
await websocket.send(json.dumps({
"status": "success",
"message": "All keys reset"
}))
else:
await websocket.send(json.dumps({
"status": "error",
"message": f"Unknown action: {action}"
}))
except json.JSONDecodeError:
await websocket.send(json.dumps({
"status": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
await websocket.send(json.dumps({
"status": "error",
"message": str(e)
}))
# 广播按键状态给所有连接
async def broadcast_key_states():
if active_connections:
message = json.dumps({
"type": "state_update",
"key_states": key_states
})
for conn in active_connections:
await conn.send(message)
# 定期广播按键状态
@app.before_serving
async def setup_periodic_tasks():
app.broadcast_task = asyncio.create_task(periodic_broadcast())
async def periodic_broadcast():
while True:
await broadcast_key_states()
await asyncio.sleep(1) # 每秒广播一次
@app.after_serving
async def cleanup():
app.broadcast_task.cancel()
await app.broadcast_task
# HTTP 路由用于健康检查
@app.route("/health")
async def health_check():
return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}
if __name__ == "__main__":
print("Starting WebSocket server at ws://0.0.0.0:5000/ws")
print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")
app.run(host="0.0.0.0", port=5000)
import SwiftUI
import Combine
class WebSocketManager: ObservableObject {
@Published var isConnected = false
@Published var keyStates = ["x": false, "z": false]
@Published var responseText = "等待连接..."
private var webSocketTask: URLSessionWebSocketTask?
private let serverURL = URL(string: "ws://192.168.1.3:5000/ws")!
func connect() {
let session = URLSession(configuration: .default)
webSocketTask = session.webSocketTask(with: serverURL)
webSocketTask?.resume()
responseText = "正在连接..."
receiveMessage()
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.isConnected = self.webSocketTask?.state == .running
if self.isConnected {
self.responseText = "已连接到服务器"
self.sendMessage(action: "get_state", key: "")
} else {
self.responseText = "连接失败"
}
}
}
func disconnect() {
webSocketTask?.cancel(with: .normalClosure, reason: nil)
isConnected = false
responseText = "已断开连接"
}
private func receiveMessage() {
webSocketTask?.receive { [weak self] result in
guard let self = self else { return }
switch result {
case .success(let message):
switch message {
case .string(let text):
self.handleMessage(text)
case .data(let data):
if let text = String(data: data, encoding: .utf8) {
self.handleMessage(text)
}
@unknown default:
break
}
// 继续接收下一条消息
self.receiveMessage()
case .failure(let error):
DispatchQueue.main.async {
self.isConnected = false
self.responseText = "连接错误: \(error.localizedDescription)"
}
}
}
}
private func handleMessage(_ text: String) {
guard let data = text.data(using: .utf8),
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {
return
}
DispatchQueue.main.async {
// 处理状态更新消息
if let type = json["type"] as? String, type == "state_update",
let states = json["key_states"] as? [String: Bool] {
self.keyStates = states
return
}
// 处理响应消息
if let status = json["status"] as? String {
if status == "success" {
if let action = json["action"] as? String, let key = json["key"] as? String {
if let latency = json["latency"] as? Double {
self.responseText = "\(action) \(key.uppercased()): 延迟 \(String(format: "%.2f", latency)) ms"
} else {
self.responseText = "\(action) \(key.uppercased()) 成功"
}
} else if let message = json["message"] as? String {
self.responseText = message
}
// 更新按键状态
if let states = json["key_states"] as? [String: Bool] {
self.keyStates = states
}
} else if status == "error", let message = json["message"] as? String {
self.responseText = "错误: \(message)"
}
}
}
}
func sendMessage(action: String, key: String) {
guard isConnected else {
responseText = "未连接到服务器"
return
}
let message: [String: Any] = ["action": action, "key": key]
guard let data = try? JSONSerialization.data(withJSONObject: message),
let messageString = String(data: data, encoding: .utf8) else {
responseText = "消息格式化失败"
return
}
webSocketTask?.send(.string(messageString)) { [weak self] error in
if let error = error {
DispatchQueue.main.async {
self?.responseText = "发送失败: \(error.localizedDescription)"
}
}
}
}
func keyDown(key: String) {
sendMessage(action: "press_down", key: key)
}
func keyUp(key: String) {
sendMessage(action: "press_up", key: key)
}
}
struct ContentView: View {
@StateObject private var wsManager = WebSocketManager()
var body: some View {
VStack(spacing: 20) {
// 连接状态指示器
HStack {
Circle()
.fill(wsManager.isConnected ? Color.green : Color.red)
.frame(width: 12, height: 12)
Text(wsManager.isConnected ? "已连接" : "未连接")
Spacer()
Button(action: {
if wsManager.isConnected {
wsManager.disconnect()
} else {
wsManager.connect()
}
}) {
Text(wsManager.isConnected ? "断开" : "连接")
.padding(.horizontal, 20)
.padding(.vertical, 8)
.background(wsManager.isConnected ? Color.red : Color.blue)
.foregroundColor(.white)
.cornerRadius(8)
}
}
.padding(.horizontal)
HStack {
// Z 键按钮
Button(action: {}) {
Text("Z")
.font(.system(size: 40, weight: .bold))
.padding(.horizontal, 250)
.padding(.vertical, 200)
.background(wsManager.keyStates["z"] == true ? Color.blue.opacity(0.7) : Color.blue)
.foregroundColor(.white)
.cornerRadius(8)
}
.simultaneousGesture(
DragGesture(minimumDistance: 0)
.onChanged { _ in
if wsManager.keyStates["z"] != true {
wsManager.keyDown(key: "z")
}
}
.onEnded { _ in
if wsManager.keyStates["z"] == true {
wsManager.keyUp(key: "z")
}
}
)
// X 键按钮
Button(action: {}) {
Text("X")
.font(.system(size: 40, weight: .bold))
.padding(.horizontal, 250)
.padding(.vertical, 200)
.background(wsManager.keyStates["x"] == true ? Color.green.opacity(0.7) : Color.green)
.foregroundColor(.white)
.cornerRadius(8)
}
.simultaneousGesture(
DragGesture(minimumDistance: 0)
.onChanged { _ in
if wsManager.keyStates["x"] != true {
wsManager.keyDown(key: "x")
}
}
.onEnded { _ in
if wsManager.keyStates["x"] == true {
wsManager.keyUp(key: "x")
}
}
)
}
// 响应显示区域
Text("服务器响应:\(wsManager.responseText)")
.padding()
.frame(maxWidth: .infinity, alignment: .leading)
.background(Color.secondary.opacity(0.2))
.cornerRadius(8)
.padding(.horizontal)
}
.padding()
.onAppear {
// 启动时自动连接到服务器
wsManager.connect()
}
}
}
以上代码有按下不会弹起的问题
---------------------------------------------------------------------------------------------------------------
websocket端用之前的
import SwiftUI
import Combine
class WebSocketManager: ObservableObject {
@Published var isConnected = false
@Published var keyStates = ["x": false, "z": false]
@Published var responseText = "等待连接..."
private var webSocketTask: URLSessionWebSocketTask?
private let serverURL = URL(string: "ws://192.168.1.3:5000/ws")!
func connect() {
let session = URLSession(configuration: .default)
webSocketTask = session.webSocketTask(with: serverURL)
webSocketTask?.resume()
responseText = "正在连接..."
receiveMessage()
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
self.isConnected = self.webSocketTask?.state == .running
if self.isConnected {
self.responseText = "已连接到服务器"
self.sendMessage(action: "get_state", key: "")
} else {
self.responseText = "连接失败"
}
}
}
func disconnect() {
webSocketTask?.cancel(with: .normalClosure, reason: nil)
isConnected = false
responseText = "已断开连接"
}
private func receiveMessage() {
webSocketTask?.receive { [weak self] result in
guard let self = self else { return }
switch result {
case .success(let message):
switch message {
case .string(let text):
print("收到消息: \(text)")
self.handleMessage(text)
case .data(let data):
if let text = String(data: data, encoding: .utf8) {
print("收到数据消息: \(text)")
self.handleMessage(text)
}
@unknown default:
break
}
// 继续接收下一条消息
self.receiveMessage()
case .failure(let error):
print("WebSocket接收错误: \(error.localizedDescription)")
DispatchQueue.main.async {
self.isConnected = false
self.responseText = "连接错误: \(error.localizedDescription)"
}
}
}
}
private func handleMessage(_ text: String) {
guard let data = text.data(using: .utf8),
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {
print("JSON解析失败: \(text)")
return
}
DispatchQueue.main.async {
// 处理状态更新消息
if let type = json["type"] as? String, type == "state_update",
let states = json["key_states"] as? [String: Bool] {
self.keyStates = states
return
}
// 处理响应消息
if let status = json["status"] as? String {
if status == "success" {
if let action = json["action"] as? String, let key = json["key"] as? String {
if let latency = json["latency"] as? Double {
self.responseText = "\(action) \(key.uppercased()): 延迟 \(String(format: "%.2f", latency)) ms"
} else {
self.responseText = "\(action) \(key.uppercased()) 成功"
}
} else if let message = json["message"] as? String {
self.responseText = message
}
// 更新按键状态
if let states = json["key_states"] as? [String: Bool] {
self.keyStates = states
}
} else if status == "error", let message = json["message"] as? String {
self.responseText = "错误: \(message)"
print("服务器错误: \(message)")
}
}
}
}
func sendMessage(action: String, key: String) {
print("发送消息: action=\(action), key='\(key)'")
guard isConnected else {
responseText = "未连接到服务器"
return
}
// 验证键值 (除了get_state操作)
if action != "get_state" && (key.isEmpty || key.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty) {
responseText = "错误: 键值不能为空"
return
}
let message: [String: Any] = ["action": action, "key": key]
guard let data = try? JSONSerialization.data(withJSONObject: message),
let messageString = String(data: data, encoding: .utf8) else {
responseText = "消息格式化失败"
return
}
webSocketTask?.send(.string(messageString)) { [weak self] error in
if let error = error {
print("发送失败: \(error.localizedDescription)")
DispatchQueue.main.async {
self?.responseText = "发送失败: \(error.localizedDescription)"
}
}
}
}
func keyDown(key: String) {
print("按下键: \(key)")
sendMessage(action: "press_down", key: key)
}
func keyUp(key: String) {
print("释放键: \(key)")
sendMessage(action: "press_up", key: key)
}
}
struct ContentView: View {
@StateObject private var wsManager = WebSocketManager()
var body: some View {
VStack(spacing: 20) {
// 连接状态指示器
HStack {
Circle()
.fill(wsManager.isConnected ? Color.green : Color.red)
.frame(width: 12, height: 12)
Text(wsManager.isConnected ? "已连接" : "未连接")
Spacer()
Button(action: {
if wsManager.isConnected {
wsManager.disconnect()
} else {
wsManager.connect()
}
}) {
Text(wsManager.isConnected ? "断开" : "连接")
.padding(.horizontal, 20)
.padding(.vertical, 8)
.background(wsManager.isConnected ? Color.red : Color.blue)
.foregroundColor(.white)
.cornerRadius(8)
}
}
.padding(.horizontal)
HStack {
// Z 键按钮
Button(action: {}) {
Text("Z")
.font(.system(size: 40, weight: .bold))
.padding(.horizontal, 250)
.padding(.vertical, 200)
.background(wsManager.keyStates["z"] == true ? Color.blue.opacity(0.7) : Color.blue)
.foregroundColor(.white)
.cornerRadius(8)
}
.simultaneousGesture(
DragGesture(minimumDistance: 0)
.onChanged { _ in
if wsManager.keyStates["z"] != true {
wsManager.keyDown(key: "z")
}
}
.onEnded { _ in
// Always send key up when touch ends
wsManager.keyUp(key: "z")
}
)
// X 键按钮
Button(action: {}) {
Text("X")
.font(.system(size: 40, weight: .bold))
.padding(.horizontal, 250)
.padding(.vertical, 200)
.background(wsManager.keyStates["x"] == true ? Color.green.opacity(0.7) : Color.green)
.foregroundColor(.white)
.cornerRadius(8)
}
.simultaneousGesture(
DragGesture(minimumDistance: 0)
.onChanged { _ in
if wsManager.keyStates["x"] != true {
wsManager.keyDown(key: "x")
}
}
.onEnded { _ in
// Always send key up when touch ends
wsManager.keyUp(key: "x")
}
)
}
// 响应显示区域
Text("服务器响应:\(wsManager.responseText)")
.padding()
.frame(maxWidth: .infinity, alignment: .leading)
.background(Color.secondary.opacity(0.2))
.cornerRadius(8)
.padding(.horizontal)
}
.padding()
.onAppear {
// 启动时自动连接到服务器
wsManager.connect()
}
}
}
这个版本按多了有锁键的情况
-------------------------
成功版本
import asyncio
import json
import time
from pynput.keyboard import Controller, Key
from quart import Quart, websocket
app = Quart(__name__)
keyboard = Controller()
# 支持的按键映射 - 专注于 x 和 z
SUPPORTED_KEYS = {
'x': 'x',
'z': 'z'
}
# 存储按键状态
key_states = {
'x': False,
'z': False
}
# 存储活跃的 WebSocket 连接
active_connections = set()
# WebSocket 连接处理
@app.websocket("/ws")
async def ws():
# 将当前的websocket对象添加到活跃连接中
active_connections.add(websocket._get_current_object())
try:
while True:
data = await websocket.receive()
await process_message(data)
except asyncio.CancelledError:
pass
finally:
active_connections.remove(websocket._get_current_object())
# 处理接收到的 WebSocket 消息
async def process_message(message):
try:
data = json.loads(message)
action = data.get('action')
key = data.get('key', '').lower()
if key not in SUPPORTED_KEYS and action != 'reset_all' and action != 'get_state':
await websocket.send(json.dumps({
"status": "error",
"message": f"Unsupported key: {key}"
}))
return
start_time = time.time()
if action == 'press_down':
# 先释放所有其他按下的键
for other_key in key_states:
if other_key != key and key_states[other_key]:
key_states[other_key] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[other_key])
print(f"[Direct Input] Auto-released key: {other_key}")
# 然后按下新键
key_states[key] = True
await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Down: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_down",
"key": key,
"latency": latency,
"key_states": key_states
}))
elif action == 'press_up':
# 总是释放按键,无论当前状态如何
key_states[key] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Up: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_up",
"key": key,
"latency": latency,
"key_states": key_states
}))
elif action == 'press':
# 先释放所有按下的键
for other_key in key_states:
if key_states[other_key]:
key_states[other_key] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[other_key])
print(f"[Direct Input] Auto-released key: {other_key}")
# 执行按下和释放操作
await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])
await asyncio.sleep(0.05) # 短暂延迟,模拟真实按键
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])
print(f"[Direct Input] Key Press: {key}")
latency = (time.time() - start_time) * 1000
await websocket.send(json.dumps({
"status": "success",
"action": "key_press",
"key": key,
"latency": latency,
"key_states": key_states
}))
elif action == 'get_state':
await websocket.send(json.dumps({
"status": "success",
"key_states": key_states
}))
elif action == 'reset_all':
for k in SUPPORTED_KEYS:
if key_states[k]:
key_states[k] = False
await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[k])
print(f"[Direct Input] Reset key: {k}")
await websocket.send(json.dumps({
"status": "success",
"message": "All keys reset",
"key_states": key_states
}))
else:
await websocket.send(json.dumps({
"status": "error",
"message": f"Unknown action: {action}"
}))
except json.JSONDecodeError:
await websocket.send(json.dumps({
"status": "error",
"message": "Invalid JSON format"
}))
except Exception as e:
print(f"Error processing message: {str(e)}")
await websocket.send(json.dumps({
"status": "error",
"message": str(e)
}))
# 广播按键状态给所有连接
async def broadcast_key_states():
if active_connections:
message = json.dumps({
"type": "state_update",
"key_states": key_states
})
for conn in active_connections:
try:
await conn.send(message)
except Exception as e:
print(f"Error broadcasting to connection: {str(e)}")
# 定期广播按键状态
@app.before_serving
async def setup_periodic_tasks():
app.broadcast_task = asyncio.create_task(periodic_broadcast())
async def periodic_broadcast():
while True:
try:
await broadcast_key_states()
except Exception as e:
print(f"Error in periodic broadcast: {str(e)}")
await asyncio.sleep(1) # 每秒广播一次
@app.after_serving
async def cleanup():
app.broadcast_task.cancel()
await app.broadcast_task
# HTTP 路由用于健康检查
@app.route("/health")
async def health_check():
return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}
if __name__ == "__main__":
print("Starting WebSocket server at ws://0.0.0.0:5000/ws")
print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")
app.run(host="0.0.0.0", port=5000)
评价是比键盘好用