import json import os import pickle import socket import sys import torch from ultralytics import YOLO import cv2 import time import poses import utils from calculate import normalize_pose, compare_poses_boolean from draw import draw_new from utils import find_closest from video_methods import initialize_method model = YOLO("yolo11s-pose.pt") model.to(torch.device('cuda:0')) if len(sys.argv) == 2: method_type = sys.argv[1] else: print("Podaj argument 'cam', albo 'net'.") exit(1) method = initialize_method(method_type) do_pose_shot = False def click_event(event, x, y, flags, param): global do_pose_shot if event == cv2.EVENT_LBUTTONDOWN: # lewy przycisk myszy do_pose_shot = not do_pose_shot def main(): last_time = time.time() currTimeIndex = 0 currIndex = None currMove = None currStatus = "Zacznij tanczyc" mehCount = 0 goodCount = 0 failCount = 0 failRate = 10 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(("0.0.0.0", 13425)) server_socket.listen() print("czekam na clienta...") client_socket, _ = server_socket.accept() print("mam clienta!") data = client_socket.recv(1024).decode() if not data.startswith("id_"): client_socket.close() server_socket.close() return map = data.replace("id_", "").replace("\n", "").strip() if not os.path.isfile(f'moves_{map}.pkl'): print(map) print("moves_" + map + ".pkl") client_socket.sendall("not_exists".encode()) client_socket.close() server_socket.close() return moves = [] with open(f'moves_{map}.pkl', 'rb') as f: # 'rb' = read binary moves = pickle.load(f) startValue = moves[0][0] totalCount = len(moves) for i, move in enumerate(moves): moves[i] = ((move[0] - startValue) / 1000, move[1], move[2]) currIndex = 1 currTimeIndex = time.time() deltaTime = time.time() currStatus = f"Zaczoles tanczyc {currIndex}" currMove = moves[0] doStreak = False streak = 0 doing = 0 actuallyDoing = False while True: doing += 1 frame = method.receive_frame() frame = cv2.flip(frame, 1) results = model(frame, verbose=False) if not actuallyDoing: client_socket.sendall("start".encode()) actuallyDoing = True current_time = time.time() delta = current_time - last_time last_time = current_time if doing % 30 == 0: if doStreak: streak += 5 client_socket.sendall(f"streak_{streak}".encode()) else: streak = 0 client_socket.sendall(f"streak_0".encode()) fps = 1 / delta if delta > 0 else float('inf') # print(f"\rDelta: {delta:.4f}s, FPS: {fps:.2f}", end="") if len(results) != 0: result = results[0] kpts = result.keypoints.data[0] if len(result.keypoints.data) else None if kpts is None: continue img = frame normalized = normalize_pose(result.keypoints.xy.cpu().numpy()[0]) draw = utils.normalize(result.keypoints.xy.cpu().numpy()[0]) cv2.imshow('you', draw_new(draw * 100 + 100)) if currTimeIndex != 0 and moves.index(find_closest(moves, time.time() - currTimeIndex)) == len(moves) - 1: mehCount = abs(totalCount - (failCount + goodCount)) stats = { "failCount": failCount, "goodCount": goodCount, "mehCount": mehCount, "percentage": (goodCount + (0.85 * mehCount)) / totalCount * 100 } client_socket.sendall(f"finish_{json.dumps(stats)}".encode()) print( f"PODSUMOWANIE: FAIL {failCount} MEH: {mehCount} PERFECT: {goodCount} PERCENTAGE: {(goodCount + (0.85 * mehCount)) / totalCount * 100}%") cv2.destroyAllWindows() break # thread = Thread(target=print_animation, args=(moves, False)) # thread.start() else: changed = False closest = find_closest(moves, time.time() - currTimeIndex) cv2.imshow('Dots', draw_new(utils.normalize(closest[2]) * 250 + 250)) if abs((time.time() - currTimeIndex) - moves[currIndex][0]) > failRate: currStatus = f"FAIL!" failCount += 1 doStreak = False if compare_poses_boolean(closest[1], normalized): # delays += (time.time() - deltaTime - moves[0][0]) * 1000 # delaysCount += 1 # currStatus = f"SUPER! {currIndex} Zostalo {len(moves)} Delay {(time.time() - currTimeIndex - closest[0]) / 1000}ms" deltaTime = time.time() currIndex = moves.index(closest) + 1 goodCount += 1 changed = True doStreak = True if not changed and compare_poses_boolean(moves[currIndex][1], normalized): # delays += (time.time() - deltaTime - moves[0][0]) * 1000 # delaysCount += 1 # currStatus = f"SUPER! {currIndex} Zostalo {len(moves)} Delay {(time.time() - currTimeIndex - closest[0]) / 1000}ms" deltaTime = time.time() changed = True currIndex += 1 goodCount += 1 doStreak = True # if do_pose_shot: # moves.append((time.time() - startTime, normalize_pose(result.keypoints.xy.cpu().numpy()[0]), result.keypoints.xy.cpu()[0])) # elif len(moves) != 0: # with open('moves.pkl', 'wb') as f: # 'wb' = write binary # pickle.dump # (moves, f) # # exit(1) cv2.putText( img, # obraz currStatus, # tekst (50, 100), # pozycja (x, y) lewego dolnego rogu tekstu cv2.FONT_HERSHEY_SIMPLEX, # czcionka 1, # rozmiar (skalowanie) (0, 0, 255), # kolor (BGR) - tutaj czerwony 2, # grubość linii cv2.LINE_AA # typ antyaliasingu ) cv2.imshow('Klatka z kamerki', img) cv2.setMouseCallback('Klatka z kamerki', click_event) cv2.waitKey(1) # Czekaj na naciśnięcie klawisza try: while True: main() except KeyboardInterrupt: pass