ppy/osu构建 ipad作为osu按键xz笔记2 deepwiki websokect

news2025/7/18 9:01:34

 

ipad当x和z键玩osu #无声打osu#没磁轴怎么打osu

下载 .NET (Linux、macOS 和 Windows) | .NET

dotnet还行

构建:f5 

运行:dotnet run --project osu.Desktop -c Debug 

deepwiki就是nb

uinput是ubuntu的我现在没法调试,放着 

import asyncio  
import json  
import time  
import uinput  
import websockets  
from quart import Quart, websocket  
  
app = Quart(__name__)  
  
# 支持的按键映射 - 专注于 x 和 z  
SUPPORTED_KEYS = {  
    'x': uinput.KEY_X,  
    'z': uinput.KEY_Z  
}  
  
# 存储按键状态  
key_states = {  
    'x': False,  
    'z': False  
}  
  
# 初始化 uinput 设备  
events = [uinput.KEY_X, uinput.KEY_Z]  
device = uinput.Device(events)  
print("Direct input device initialized")  
  
# 存储活跃的 WebSocket 连接  
active_connections = set()  
  
# WebSocket 连接处理  
@app.websocket("/ws")  
async def ws():  
    connection = websockets.WebSocketServerProtocol  
    active_connections.add(connection)  
      
    try:  
        while True:  
            data = await websocket.receive()  
            await process_message(connection, data)  
    except asyncio.CancelledError:  
        pass  
    finally:  
        active_connections.remove(connection)  
  
# 处理接收到的 WebSocket 消息  
async def process_message(connection, message):  
    try:  
        data = json.loads(message)  
        action = data.get('action')  
        key = data.get('key', '').lower()  
          
        if key not in SUPPORTED_KEYS:  
            await connection.send(json.dumps({  
                "status": "error",  
                "message": f"Unsupported key: {key}"  
            }))  
            return  
              
        start_time = time.time()  
          
        if action == 'press_down':  
            if not key_states[key]:  
                key_states[key] = True  
                device.emit(SUPPORTED_KEYS[key], 1)  # 1 = key down  
                print(f"[Direct Input] Key Down: {key}")  
                  
                latency = (time.time() - start_time) * 1000  
                await connection.send(json.dumps({  
                    "status": "success",  
                    "action": "key_down",  
                    "key": key,  
                    "latency": latency  
                }))  
            else:  
                await connection.send(json.dumps({  
                    "status": "ignored",  
                    "message": f"Key {key} already pressed"  
                }))  
                  
        elif action == 'press_up':  
            if key_states[key]:  
                key_states[key] = False  
                device.emit(SUPPORTED_KEYS[key], 0)  # 0 = key up  
                print(f"[Direct Input] Key Up: {key}")  
                  
                latency = (time.time() - start_time) * 1000  
                await connection.send(json.dumps({  
                    "status": "success",  
                    "action": "key_up",  
                    "key": key,  
                    "latency": latency  
                }))  
            else:  
                await connection.send(json.dumps({  
                    "status": "ignored",  
                    "message": f"Key {key} already released"  
                }))  
                  
        elif action == 'press':  
            device.emit(SUPPORTED_KEYS[key], 1)  # Key down  
            await asyncio.sleep(0.05)  # Short delay  
            device.emit(SUPPORTED_KEYS[key], 0)  # Key up  
            print(f"[Direct Input] Key Press: {key}")  
              
            latency = (time.time() - start_time) * 1000  
            await connection.send(json.dumps({  
                "status": "success",  
                "action": "key_press",  
                "key": key,  
                "latency": latency  
            }))  
              
        elif action == 'get_state':  
            await connection.send(json.dumps({  
                "status": "success",  
                "key_states": key_states  
            }))  
              
        elif action == 'reset_all':  
            for k in SUPPORTED_KEYS:  
                if key_states[k]:  
                    key_states[k] = False  
                    device.emit(SUPPORTED_KEYS[k], 0)  # Key up  
              
            await connection.send(json.dumps({  
                "status": "success",  
                "message": "All keys reset"  
            }))  
              
        else:  
            await connection.send(json.dumps({  
                "status": "error",  
                "message": f"Unknown action: {action}"  
            }))  
              
    except json.JSONDecodeError:  
        await connection.send(json.dumps({  
            "status": "error",  
            "message": "Invalid JSON format"  
        }))  
    except Exception as e:  
        await connection.send(json.dumps({  
            "status": "error",  
            "message": str(e)  
        }))  
  
# 广播按键状态给所有连接  
async def broadcast_key_states():  
    if active_connections:  
        message = json.dumps({  
            "type": "state_update",  
            "key_states": key_states  
        })  
        await asyncio.gather(*[conn.send(message) for conn in active_connections])  
  
# 定期广播按键状态  
@app.before_serving  
async def setup_periodic_tasks():  
    app.broadcast_task = asyncio.create_task(periodic_broadcast())  
  
async def periodic_broadcast():  
    while True:  
        await broadcast_key_states()  
        await asyncio.sleep(1)  # 每秒广播一次  
  
@app.after_serving  
async def cleanup():  
    app.broadcast_task.cancel()  
    await app.broadcast_task  
  
# HTTP 路由用于健康检查  
@app.route("/health")  
async def health_check():  
    return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}  
  
if __name__ == "__main__":  
    print("Starting WebSocket server at ws://0.0.0.0:5000/ws")  
    print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")  
    app.run(host="0.0.0.0", port=5000)

可以滑条了可是按下抬起有问题 

 

import asyncio  
import json  
import time  
from pynput.keyboard import Controller, Key  
from quart import Quart, websocket  
  
app = Quart(__name__)  
keyboard = Controller()  
  
# 支持的按键映射 - 专注于 x 和 z  
SUPPORTED_KEYS = {  
    'x': 'x',  
    'z': 'z'  
}  
  
# 存储按键状态  
key_states = {  
    'x': False,  
    'z': False  
}  
  
# 存储活跃的 WebSocket 连接  
active_connections = set()  
  
# WebSocket 连接处理  
@app.websocket("/ws")  
async def ws():  
    # 将当前的websocket对象添加到活跃连接中  
    active_connections.add(websocket._get_current_object())  
      
    try:  
        while True:  
            data = await websocket.receive()  
            await process_message(data)  
    except asyncio.CancelledError:  
        pass  
    finally:  
        active_connections.remove(websocket._get_current_object())  
  
# 处理接收到的 WebSocket 消息  
async def process_message(message):  
    try:  
        data = json.loads(message)  
        action = data.get('action')  
        key = data.get('key', '').lower()  
          
        if key not in SUPPORTED_KEYS:  
            await websocket.send(json.dumps({  
                "status": "error",  
                "message": f"Unsupported key: {key}"  
            }))  
            return  
              
        start_time = time.time()  
          
        if action == 'press_down':  
            if not key_states[key]:  
                key_states[key] = True  
                await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])  
                print(f"[Direct Input] Key Down: {key}")  
                  
                latency = (time.time() - start_time) * 1000  
                await websocket.send(json.dumps({  
                    "status": "success",  
                    "action": "key_down",  
                    "key": key,  
                    "latency": latency  
                }))  
            else:  
                await websocket.send(json.dumps({  
                    "status": "ignored",  
                    "message": f"Key {key} already pressed"  
                }))  
                  
        elif action == 'press_up':  
            if key_states[key]:  
                key_states[key] = False  
                await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])  
                print(f"[Direct Input] Key Up: {key}")  
                  
                latency = (time.time() - start_time) * 1000  
                await websocket.send(json.dumps({  
                    "status": "success",  
                    "action": "key_up",  
                    "key": key,  
                    "latency": latency  
                }))  
            else:  
                await websocket.send(json.dumps({  
                    "status": "ignored",  
                    "message": f"Key {key} already released"  
                }))  
                  
        elif action == 'press':  
            await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])  
            await asyncio.sleep(0.05)  # 短暂延迟,模拟真实按键  
            await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])  
            print(f"[Direct Input] Key Press: {key}")  
              
            latency = (time.time() - start_time) * 1000  
            await websocket.send(json.dumps({  
                "status": "success",  
                "action": "key_press",  
                "key": key,  
                "latency": latency  
            }))  
              
        elif action == 'get_state':  
            await websocket.send(json.dumps({  
                "status": "success",  
                "key_states": key_states  
            }))  
              
        elif action == 'reset_all':  
            for k in SUPPORTED_KEYS:  
                if key_states[k]:  
                    key_states[k] = False  
                    await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[k])  
              
            await websocket.send(json.dumps({  
                "status": "success",  
                "message": "All keys reset"  
            }))  
              
        else:  
            await websocket.send(json.dumps({  
                "status": "error",  
                "message": f"Unknown action: {action}"  
            }))  
              
    except json.JSONDecodeError:  
        await websocket.send(json.dumps({  
            "status": "error",  
            "message": "Invalid JSON format"  
        }))  
    except Exception as e:  
        await websocket.send(json.dumps({  
            "status": "error",  
            "message": str(e)  
        }))  
  
# 广播按键状态给所有连接  
async def broadcast_key_states():  
    if active_connections:  
        message = json.dumps({  
            "type": "state_update",  
            "key_states": key_states  
        })  
        for conn in active_connections:  
            await conn.send(message)  
  
# 定期广播按键状态  
@app.before_serving  
async def setup_periodic_tasks():  
    app.broadcast_task = asyncio.create_task(periodic_broadcast())  
  
async def periodic_broadcast():  
    while True:  
        await broadcast_key_states()  
        await asyncio.sleep(1)  # 每秒广播一次  
  
@app.after_serving  
async def cleanup():  
    app.broadcast_task.cancel()  
    await app.broadcast_task  
  
# HTTP 路由用于健康检查  
@app.route("/health")  
async def health_check():  
    return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}  
  
if __name__ == "__main__":  
    print("Starting WebSocket server at ws://0.0.0.0:5000/ws")  
    print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")  
    app.run(host="0.0.0.0", port=5000)

 

import SwiftUI  
import Combine  
  
class WebSocketManager: ObservableObject {  
    @Published var isConnected = false  
    @Published var keyStates = ["x": false, "z": false]  
    @Published var responseText = "等待连接..."  
      
    private var webSocketTask: URLSessionWebSocketTask?  
    private let serverURL = URL(string: "ws://192.168.1.3:5000/ws")!  
      
    func connect() {  
        let session = URLSession(configuration: .default)  
        webSocketTask = session.webSocketTask(with: serverURL)  
        webSocketTask?.resume()  
          
        responseText = "正在连接..."  
        receiveMessage()  
          
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {  
            self.isConnected = self.webSocketTask?.state == .running  
            if self.isConnected {  
                self.responseText = "已连接到服务器"  
                self.sendMessage(action: "get_state", key: "")  
            } else {  
                self.responseText = "连接失败"  
            }  
        }  
    }  
      
    func disconnect() {  
        webSocketTask?.cancel(with: .normalClosure, reason: nil)  
        isConnected = false  
        responseText = "已断开连接"  
    }  
      
    private func receiveMessage() {  
        webSocketTask?.receive { [weak self] result in  
            guard let self = self else { return }  
              
            switch result {  
            case .success(let message):  
                switch message {  
                case .string(let text):  
                    self.handleMessage(text)  
                case .data(let data):  
                    if let text = String(data: data, encoding: .utf8) {  
                        self.handleMessage(text)  
                    }  
                @unknown default:  
                    break  
                }  
                  
                // 继续接收下一条消息  
                self.receiveMessage()  
                  
            case .failure(let error):  
                DispatchQueue.main.async {  
                    self.isConnected = false  
                    self.responseText = "连接错误: \(error.localizedDescription)"  
                }  
            }  
        }  
    }  
      
    private func handleMessage(_ text: String) {  
        guard let data = text.data(using: .utf8),  
              let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {  
            return  
        }  
          
        DispatchQueue.main.async {  
            // 处理状态更新消息  
            if let type = json["type"] as? String, type == "state_update",  
               let states = json["key_states"] as? [String: Bool] {  
                self.keyStates = states  
                return  
            }  
              
            // 处理响应消息  
            if let status = json["status"] as? String {  
                if status == "success" {  
                    if let action = json["action"] as? String, let key = json["key"] as? String {  
                        if let latency = json["latency"] as? Double {  
                            self.responseText = "\(action) \(key.uppercased()): 延迟 \(String(format: "%.2f", latency)) ms"  
                        } else {  
                            self.responseText = "\(action) \(key.uppercased()) 成功"  
                        }  
                    } else if let message = json["message"] as? String {  
                        self.responseText = message  
                    }  
                      
                    // 更新按键状态  
                    if let states = json["key_states"] as? [String: Bool] {  
                        self.keyStates = states  
                    }  
                } else if status == "error", let message = json["message"] as? String {  
                    self.responseText = "错误: \(message)"  
                }  
            }  
        }  
    }  
      
    func sendMessage(action: String, key: String) {  
        guard isConnected else {  
            responseText = "未连接到服务器"  
            return  
        }  
          
        let message: [String: Any] = ["action": action, "key": key]  
          
        guard let data = try? JSONSerialization.data(withJSONObject: message),  
              let messageString = String(data: data, encoding: .utf8) else {  
            responseText = "消息格式化失败"  
            return  
        }  
          
        webSocketTask?.send(.string(messageString)) { [weak self] error in  
            if let error = error {  
                DispatchQueue.main.async {  
                    self?.responseText = "发送失败: \(error.localizedDescription)"  
                }  
            }  
        }  
    }  
      
    func keyDown(key: String) {  
        sendMessage(action: "press_down", key: key)  
    }  
      
    func keyUp(key: String) {  
        sendMessage(action: "press_up", key: key)  
    }  
}  
  
struct ContentView: View {  
    @StateObject private var wsManager = WebSocketManager()  
      
    var body: some View {  
        VStack(spacing: 20) {  
            // 连接状态指示器  
            HStack {  
                Circle()  
                    .fill(wsManager.isConnected ? Color.green : Color.red)  
                    .frame(width: 12, height: 12)  
                  
                Text(wsManager.isConnected ? "已连接" : "未连接")  
                  
                Spacer()  
                  
                Button(action: {  
                    if wsManager.isConnected {  
                        wsManager.disconnect()  
                    } else {  
                        wsManager.connect()  
                    }  
                }) {  
                    Text(wsManager.isConnected ? "断开" : "连接")  
                        .padding(.horizontal, 20)  
                        .padding(.vertical, 8)  
                        .background(wsManager.isConnected ? Color.red : Color.blue)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
            }  
            .padding(.horizontal)  
              
            HStack {  
                // Z 键按钮  
                Button(action: {}) {  
                    Text("Z")  
                        .font(.system(size: 40, weight: .bold))  
                        .padding(.horizontal, 250)  
                        .padding(.vertical, 200)  
                        .background(wsManager.keyStates["z"] == true ? Color.blue.opacity(0.7) : Color.blue)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
                .simultaneousGesture(  
                    DragGesture(minimumDistance: 0)  
                        .onChanged { _ in  
                            if wsManager.keyStates["z"] != true {  
                                wsManager.keyDown(key: "z")  
                            }  
                        }  
                        .onEnded { _ in  
                            if wsManager.keyStates["z"] == true {  
                                wsManager.keyUp(key: "z")  
                            }  
                        }  
                )  
                  
                // X 键按钮  
                Button(action: {}) {  
                    Text("X")  
                        .font(.system(size: 40, weight: .bold))  
                        .padding(.horizontal, 250)  
                        .padding(.vertical, 200)  
                        .background(wsManager.keyStates["x"] == true ? Color.green.opacity(0.7) : Color.green)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
                .simultaneousGesture(  
                    DragGesture(minimumDistance: 0)  
                        .onChanged { _ in  
                            if wsManager.keyStates["x"] != true {  
                                wsManager.keyDown(key: "x")  
                            }  
                        }  
                        .onEnded { _ in  
                            if wsManager.keyStates["x"] == true {  
                                wsManager.keyUp(key: "x")  
                            }  
                        }  
                )  
            }  
              
            // 响应显示区域  
            Text("服务器响应:\(wsManager.responseText)")  
                .padding()  
                .frame(maxWidth: .infinity, alignment: .leading)  
                .background(Color.secondary.opacity(0.2))  
                .cornerRadius(8)  
                .padding(.horizontal)  
        }  
        .padding()  
        .onAppear {  
            // 启动时自动连接到服务器  
            wsManager.connect()  
        }  
    }  
}

以上代码有按下不会弹起的问题

---------------------------------------------------------------------------------------------------------------

 

websocket端用之前的

import SwiftUI  
import Combine  
  
class WebSocketManager: ObservableObject {  
    @Published var isConnected = false  
    @Published var keyStates = ["x": false, "z": false]  
    @Published var responseText = "等待连接..."  
      
    private var webSocketTask: URLSessionWebSocketTask?  
    private let serverURL = URL(string: "ws://192.168.1.3:5000/ws")!  
      
    func connect() {  
        let session = URLSession(configuration: .default)  
        webSocketTask = session.webSocketTask(with: serverURL)  
        webSocketTask?.resume()  
          
        responseText = "正在连接..."  
        receiveMessage()  
          
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {  
            self.isConnected = self.webSocketTask?.state == .running  
            if self.isConnected {  
                self.responseText = "已连接到服务器"  
                self.sendMessage(action: "get_state", key: "")  
            } else {  
                self.responseText = "连接失败"  
            }  
        }  
    }  
      
    func disconnect() {  
        webSocketTask?.cancel(with: .normalClosure, reason: nil)  
        isConnected = false  
        responseText = "已断开连接"  
    }  
      
    private func receiveMessage() {  
        webSocketTask?.receive { [weak self] result in  
            guard let self = self else { return }  
              
            switch result {  
            case .success(let message):  
                switch message {  
                case .string(let text):  
                    print("收到消息: \(text)")  
                    self.handleMessage(text)  
                case .data(let data):  
                    if let text = String(data: data, encoding: .utf8) {  
                        print("收到数据消息: \(text)")  
                        self.handleMessage(text)  
                    }  
                @unknown default:  
                    break  
                }  
                  
                // 继续接收下一条消息  
                self.receiveMessage()  
                  
            case .failure(let error):  
                print("WebSocket接收错误: \(error.localizedDescription)")  
                DispatchQueue.main.async {  
                    self.isConnected = false  
                    self.responseText = "连接错误: \(error.localizedDescription)"  
                }  
            }  
        }  
    }  
      
    private func handleMessage(_ text: String) {  
        guard let data = text.data(using: .utf8),  
              let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else {  
            print("JSON解析失败: \(text)")  
            return  
        }  
          
        DispatchQueue.main.async {  
            // 处理状态更新消息  
            if let type = json["type"] as? String, type == "state_update",  
               let states = json["key_states"] as? [String: Bool] {  
                self.keyStates = states  
                return  
            }  
              
            // 处理响应消息  
            if let status = json["status"] as? String {  
                if status == "success" {  
                    if let action = json["action"] as? String, let key = json["key"] as? String {  
                        if let latency = json["latency"] as? Double {  
                            self.responseText = "\(action) \(key.uppercased()): 延迟 \(String(format: "%.2f", latency)) ms"  
                        } else {  
                            self.responseText = "\(action) \(key.uppercased()) 成功"  
                        }  
                    } else if let message = json["message"] as? String {  
                        self.responseText = message  
                    }  
                      
                    // 更新按键状态  
                    if let states = json["key_states"] as? [String: Bool] {  
                        self.keyStates = states  
                    }  
                } else if status == "error", let message = json["message"] as? String {  
                    self.responseText = "错误: \(message)"  
                    print("服务器错误: \(message)")  
                }  
            }  
        }  
    }  
      
    func sendMessage(action: String, key: String) {  
        print("发送消息: action=\(action), key='\(key)'")  
          
        guard isConnected else {  
            responseText = "未连接到服务器"  
            return  
        }  
          
        // 验证键值 (除了get_state操作)  
        if action != "get_state" && (key.isEmpty || key.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty) {  
            responseText = "错误: 键值不能为空"  
            return  
        }  
          
        let message: [String: Any] = ["action": action, "key": key]  
          
        guard let data = try? JSONSerialization.data(withJSONObject: message),  
              let messageString = String(data: data, encoding: .utf8) else {  
            responseText = "消息格式化失败"  
            return  
        }  
          
        webSocketTask?.send(.string(messageString)) { [weak self] error in  
            if let error = error {  
                print("发送失败: \(error.localizedDescription)")  
                DispatchQueue.main.async {  
                    self?.responseText = "发送失败: \(error.localizedDescription)"  
                }  
            }  
        }  
    }  
      
    func keyDown(key: String) {  
        print("按下键: \(key)")  
        sendMessage(action: "press_down", key: key)  
    }  
      
    func keyUp(key: String) {  
        print("释放键: \(key)")  
        sendMessage(action: "press_up", key: key)  
    }  
}  
  
struct ContentView: View {  
    @StateObject private var wsManager = WebSocketManager()  
      
    var body: some View {  
        VStack(spacing: 20) {  
            // 连接状态指示器  
            HStack {  
                Circle()  
                    .fill(wsManager.isConnected ? Color.green : Color.red)  
                    .frame(width: 12, height: 12)  
                  
                Text(wsManager.isConnected ? "已连接" : "未连接")  
                  
                Spacer()  
                  
                Button(action: {  
                    if wsManager.isConnected {  
                        wsManager.disconnect()  
                    } else {  
                        wsManager.connect()  
                    }  
                }) {  
                    Text(wsManager.isConnected ? "断开" : "连接")  
                        .padding(.horizontal, 20)  
                        .padding(.vertical, 8)  
                        .background(wsManager.isConnected ? Color.red : Color.blue)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
            }  
            .padding(.horizontal)  
              
            HStack {  
                // Z 键按钮  
                Button(action: {}) {  
                    Text("Z")  
                        .font(.system(size: 40, weight: .bold))  
                        .padding(.horizontal, 250)  
                        .padding(.vertical, 200)  
                        .background(wsManager.keyStates["z"] == true ? Color.blue.opacity(0.7) : Color.blue)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
                .simultaneousGesture(  
                    DragGesture(minimumDistance: 0)  
                        .onChanged { _ in  
                            if wsManager.keyStates["z"] != true {  
                                wsManager.keyDown(key: "z")  
                            }  
                        }  
                        .onEnded { _ in  
                            // Always send key up when touch ends  
                            wsManager.keyUp(key: "z")  
                        }  
                )  
                  
                // X 键按钮  
                Button(action: {}) {  
                    Text("X")  
                        .font(.system(size: 40, weight: .bold))  
                        .padding(.horizontal, 250)  
                        .padding(.vertical, 200)  
                        .background(wsManager.keyStates["x"] == true ? Color.green.opacity(0.7) : Color.green)  
                        .foregroundColor(.white)  
                        .cornerRadius(8)  
                }  
                .simultaneousGesture(  
                    DragGesture(minimumDistance: 0)  
                        .onChanged { _ in  
                            if wsManager.keyStates["x"] != true {  
                                wsManager.keyDown(key: "x")  
                            }  
                        }  
                        .onEnded { _ in  
                            // Always send key up when touch ends  
                            wsManager.keyUp(key: "x")  
                        }  
                )  
            }  
              
            // 响应显示区域  
            Text("服务器响应:\(wsManager.responseText)")  
                .padding()  
                .frame(maxWidth: .infinity, alignment: .leading)  
                .background(Color.secondary.opacity(0.2))  
                .cornerRadius(8)  
                .padding(.horizontal)  
        }  
        .padding()  
        .onAppear {  
            // 启动时自动连接到服务器  
            wsManager.connect()  
        }  
    }  
}

这个版本按多了有锁键的情况 

-------------------------

成功版本

import asyncio  
import json  
import time  
from pynput.keyboard import Controller, Key  
from quart import Quart, websocket  
  
app = Quart(__name__)  
keyboard = Controller()  
  
# 支持的按键映射 - 专注于 x 和 z  
SUPPORTED_KEYS = {  
    'x': 'x',  
    'z': 'z'  
}  
  
# 存储按键状态  
key_states = {  
    'x': False,  
    'z': False  
}  
  
# 存储活跃的 WebSocket 连接  
active_connections = set()  
  
# WebSocket 连接处理  
@app.websocket("/ws")  
async def ws():  
    # 将当前的websocket对象添加到活跃连接中  
    active_connections.add(websocket._get_current_object())  
      
    try:  
        while True:  
            data = await websocket.receive()  
            await process_message(data)  
    except asyncio.CancelledError:  
        pass  
    finally:  
        active_connections.remove(websocket._get_current_object())  
  
# 处理接收到的 WebSocket 消息  
async def process_message(message):  
    try:  
        data = json.loads(message)  
        action = data.get('action')  
        key = data.get('key', '').lower()  
          
        if key not in SUPPORTED_KEYS and action != 'reset_all' and action != 'get_state':  
            await websocket.send(json.dumps({  
                "status": "error",  
                "message": f"Unsupported key: {key}"  
            }))  
            return  
              
        start_time = time.time()  
          
        if action == 'press_down':  
            # 先释放所有其他按下的键  
            for other_key in key_states:  
                if other_key != key and key_states[other_key]:  
                    key_states[other_key] = False  
                    await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[other_key])  
                    print(f"[Direct Input] Auto-released key: {other_key}")  
              
            # 然后按下新键  
            key_states[key] = True  
            await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])  
            print(f"[Direct Input] Key Down: {key}")  
              
            latency = (time.time() - start_time) * 1000  
            await websocket.send(json.dumps({  
                "status": "success",  
                "action": "key_down",  
                "key": key,  
                "latency": latency,  
                "key_states": key_states  
            }))  
                  
        elif action == 'press_up':  
            # 总是释放按键,无论当前状态如何  
            key_states[key] = False  
            await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])  
            print(f"[Direct Input] Key Up: {key}")  
              
            latency = (time.time() - start_time) * 1000  
            await websocket.send(json.dumps({  
                "status": "success",  
                "action": "key_up",  
                "key": key,  
                "latency": latency,  
                "key_states": key_states  
            }))  
                  
        elif action == 'press':  
            # 先释放所有按下的键  
            for other_key in key_states:  
                if key_states[other_key]:  
                    key_states[other_key] = False  
                    await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[other_key])  
                    print(f"[Direct Input] Auto-released key: {other_key}")  
              
            # 执行按下和释放操作  
            await asyncio.to_thread(keyboard.press, SUPPORTED_KEYS[key])  
            await asyncio.sleep(0.05)  # 短暂延迟,模拟真实按键  
            await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[key])  
            print(f"[Direct Input] Key Press: {key}")  
              
            latency = (time.time() - start_time) * 1000  
            await websocket.send(json.dumps({  
                "status": "success",  
                "action": "key_press",  
                "key": key,  
                "latency": latency,  
                "key_states": key_states  
            }))  
              
        elif action == 'get_state':  
            await websocket.send(json.dumps({  
                "status": "success",  
                "key_states": key_states  
            }))  
              
        elif action == 'reset_all':  
            for k in SUPPORTED_KEYS:  
                if key_states[k]:  
                    key_states[k] = False  
                    await asyncio.to_thread(keyboard.release, SUPPORTED_KEYS[k])  
                    print(f"[Direct Input] Reset key: {k}")  
              
            await websocket.send(json.dumps({  
                "status": "success",  
                "message": "All keys reset",  
                "key_states": key_states  
            }))  
              
        else:  
            await websocket.send(json.dumps({  
                "status": "error",  
                "message": f"Unknown action: {action}"  
            }))  
              
    except json.JSONDecodeError:  
        await websocket.send(json.dumps({  
            "status": "error",  
            "message": "Invalid JSON format"  
        }))  
    except Exception as e:  
        print(f"Error processing message: {str(e)}")  
        await websocket.send(json.dumps({  
            "status": "error",  
            "message": str(e)  
        }))  
  
# 广播按键状态给所有连接  
async def broadcast_key_states():  
    if active_connections:  
        message = json.dumps({  
            "type": "state_update",  
            "key_states": key_states  
        })  
        for conn in active_connections:  
            try:  
                await conn.send(message)  
            except Exception as e:  
                print(f"Error broadcasting to connection: {str(e)}")  
  
# 定期广播按键状态  
@app.before_serving  
async def setup_periodic_tasks():  
    app.broadcast_task = asyncio.create_task(periodic_broadcast())  
  
async def periodic_broadcast():  
    while True:  
        try:  
            await broadcast_key_states()  
        except Exception as e:  
            print(f"Error in periodic broadcast: {str(e)}")  
        await asyncio.sleep(1)  # 每秒广播一次  
  
@app.after_serving  
async def cleanup():  
    app.broadcast_task.cancel()  
    await app.broadcast_task  
  
# HTTP 路由用于健康检查  
@app.route("/health")  
async def health_check():  
    return {"status": "ok", "supported_keys": list(SUPPORTED_KEYS.keys())}  
  
if __name__ == "__main__":  
    print("Starting WebSocket server at ws://0.0.0.0:5000/ws")  
    print(f"Supported keys: {list(SUPPORTED_KEYS.keys())}")  
    app.run(host="0.0.0.0", port=5000)

评价是比键盘好用 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2376584.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.NET程序启动就报错,如何截获初期化时的问题json

一:背景 1. 讲故事 前几天训练营里的一位朋友在复习课件的时候,程序一跑就报错,截图如下: 从给出的错误信息看大概是因为json格式无效导致的,在早期的训练营里曾经也有一例这样的报错,最后定位下来是公司…

nacos:服务注册原理

目录 NaCos服务注册原理1、AbstractAutoServiceRegistration功能和作用onApplicationEvent()方法start()方法 2、NacosAutoServiceRegistration功能和作用NacosAutoServiceRegistration.register()方法AbstractAutoServiceRegistration.register()方法 3、NacosServiceRegistry…

基于开源AI大模型与S2B2C生态的个人品牌优势挖掘与标签重构研究

摘要:在数字文明时代,个人品牌塑造已从传统经验驱动转向数据智能驱动。本文以开源AI大模型、AI智能名片与S2B2C商城小程序源码为技术载体,提出"社会评价-数据验证-标签重构"的三维分析框架。通过实证研究发现,结合第三方…

polarctf-web-[简单rce]

考点&#xff1a; (1)RCE(eval函数) (2)执行函数(passthru函数) (3)/顶级(根)目录查看 (4)sort排序查看函数 题目来源&#xff1a;Polarctf-web-[简单rce] 解题&#xff1a; 代码审计 <?php/*​PolarD&N CTF​*/highlight_file(__FILE__);function no($txt){ # …

深入理解 Cortex-M3 特殊寄存器

在上一篇文章中分享了 Cortex-M3 内核寄存器组的相关知识&#xff0c;实际上除了内核寄存器组外&#xff0c;CM3 处理器中还存在多个特殊寄存器&#xff0c;它们分别为 程序状态寄存器&#xff0c;中断/异常屏蔽寄存器 和 控制寄存器。 需要注意的是&#xff0c;特殊寄存器未经…

[Java实战]Spring Boot 3 整合 Ehcache 3(十九)

[Java实战]Spring Boot 3 整合 Ehcache 3&#xff08;十九&#xff09; 引言 在微服务和高并发场景下&#xff0c;缓存是提升系统性能的关键技术之一。Ehcache 作为 Java 生态中成熟的内存缓存框架&#xff0c;其 3.x 版本在性能、功能和易用性上均有显著提升。本文将详细介绍…

建筑物渗水漏水痕迹发霉潮湿分割数据集labelme格式1357张1类别

数据集中有增强图片详情看图片 数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;1357 标注数量(json文件个数)&#xff1a;1357 标注类别数&#xff1a;1 标注类别名称:["water&qu…

第二十二天打卡

数据预处理 import pandas as pd from sklearn.model_selection import train_test_splitdef data_preprocessing(file_path):"""泰坦尼克号生存预测数据预处理函数参数:file_path: 原始数据文件路径返回:preprocessed_data: 预处理后的数据集""&quo…

解锁性能密码:Linux 环境下 Oracle 大页配置全攻略​

在 Oracle 数据库运行过程中&#xff0c;内存管理是影响其性能的关键因素之一。大页内存&#xff08;Large Pages&#xff09;作为一种优化内存使用的技术&#xff0c;能够显著提升 Oracle 数据库的运行效率。本文将深入介绍大页内存的相关概念&#xff0c;并详细阐述 Oracle 在…

Spark,在shell中运行RDD程序

在hdfs中/wcinput中创建一个文件&#xff1a;word2.txt在里面写几个单词 启动hdfs集群 [roothadoop100 ~]# myhadoop start [roothadoop100 ~]# cd /opt/module/spark-yarn/bin [roothadoop100 ~]# ./spark-shell 写个11测试一下 按住ctrlD退出 进入环境&#xff1a;spa…

SAP学习笔记 - 开发11 - RAP(RESTful Application Programming)简介

上一章学习了BTP架构图&#xff0c;实操创建Directory/Subaccount&#xff0c;BTP的内部组成&#xff0c;BTP Cockpit。 SAP学习笔记 - 开发10 - BTP架构图&#xff0c;实操创建Directory/Subaccount&#xff0c;BTP的内部组成&#xff0c;BTP Cockpit-CSDN博客 本章继续学习S…

数据防泄密安全:企业稳健发展的守护盾

在数字化时代&#xff0c;数据已成为企业最核心的资产之一。无论是客户信息、财务数据&#xff0c;还是商业机密&#xff0c;一旦泄露&#xff0c;都可能给企业带来不可估量的损失。近年来&#xff0c;数据泄露事件频发&#xff0c;如Facebook用户数据泄露、Equifax信用数据外泄…

MySQL之基础索引

目录 引言 1、创建索引 2、索引的原理 2、索引的类型 3、索引的使用 1.添加索引 2.删除索引 3.删除主键索引 4.修改索引 5.查询索引 引言 当一个数据库里面的数据特别多&#xff0c;比如800万&#xff0c;光是创建插入数据就要十几分钟&#xff0c;我们查询一条信息也…

拉丁方分析

本文是实验设计与分析&#xff08;第6版&#xff0c;Montgomery著傅珏生译)第4章随机化区组&#xff0c;拉丁方&#xff0c;以及有关的设计第4.2节的python解决方案。本文尽量避免重复书中的理论&#xff0c;着于提供python解决方案&#xff0c;并与原书的运算结果进行对比。您…

软考软件设计师中级——软件工程笔记

1.软件过程 1.1能力成熟度模型&#xff08;CMM&#xff09; 软件能力成熟度模型&#xff08;CMM&#xff09;将软件过程改进分为以下五个成熟度级别&#xff0c;每个级别都定义了特定的过程特征和目标&#xff1a; 初始级 (Initial)&#xff1a; 软件开发过程杂乱无章&#xf…

5.5.1 WPF中的动画2-基于路径的动画

何为动画?一般只会动。但所谓会动,还不仅包括位置移动,还包括角度旋转,颜色变化,透明度增减。动画本质上是一个时间段内某个属性值(位置、颜色等)的变化。因为属性有很多数据类型,它们变化也需要多种动画类比如: BooleanAnimationBase\ ByteAnimationBase\DoubleAnima…

Andorid之TabLayout+ViewPager

文章目录 前言一、效果图二、使用步骤1.主xml布局2.activity代码3.MyTaskFragment代码4.MyTaskFragment的xml布局5.Adapter代码6.item布局 总结 前言 TabLayoutViewPager功能需求已经是常见功能了&#xff0c;我就不多解释了&#xff0c;需要的自取。 一、效果图 二、使用步骤…

26考研——中央处理器_指令流水线_流水线的冒险与处理 流水线的性能指标 高级流水线技术(5)

408答疑 文章目录 六、指令流水线流水线的冒险与处理结构冒险数据冒险延迟执行相关指令采用转发&#xff08;旁路&#xff09;技术load-use 数据冒险的处理 控制冒险 流水线的性能指标流水线的吞吐率流水线的加速比 高级流水线技术超标量流水线技术超长指令字技术超流水线技术 …

酒店旅游类数据采集API接口之携程数据获取地方美食品列表 获取地方美餐馆列表 景点评论

携程 API 接入指南 API 地址&#xff1a; 调用示例&#xff1a; 美食列表 景点列表 景点详情 酒店详情 参数说明 通用参数说明 请谨慎传递参数&#xff0c;避免不必要的费用扣除。 URL 说明&#xff1a;https://api-gw.cn/平台/API类型/ 平台&#xff1a;淘宝&#xff0c;京…

Lora原理及实现浅析

Lora 什么是Lora Lora的原始论文为《LoRA: Low-Rank Adaptation of Large Language Models》&#xff0c;翻译为中文为“大语言模型的低秩自适应”。最初是为了解决大型语言模在进行任务特定微调时消耗大量资源的问题&#xff1b;随后也用在了Diffusion等领域&#xff0c;用于…