Files
JustTwirk/main.py
2025-12-08 20:25:20 +01:00

219 lines
6.7 KiB
Python

import json
import os
import pickle
import socket
import sys
import torch
from ultralytics import YOLO
import cv2
import time
import poses
import utils
from calculate import normalize_pose, compare_poses_boolean
from draw import draw_new
from utils import find_closest
from video_methods import initialize_method
model = YOLO("yolo11s-pose.pt")
model.to(torch.device('cuda:0'))
if len(sys.argv) == 2:
method_type = sys.argv[1]
else:
print("Podaj argument 'cam', albo 'net'.")
exit(1)
method = initialize_method(method_type)
do_pose_shot = False
def click_event(event, x, y, flags, param):
global do_pose_shot
if event == cv2.EVENT_LBUTTONDOWN: # lewy przycisk myszy
do_pose_shot = not do_pose_shot
def main():
last_time = time.time()
currTimeIndex = 0
currIndex = None
currMove = None
currStatus = "Zacznij tanczyc"
mehCount = 0
goodCount = 0
failCount = 0
failRate = 10
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("0.0.0.0", 13425))
server_socket.listen()
print("czekam na clienta...")
client_socket, _ = server_socket.accept()
print("mam clienta!")
data = client_socket.recv(1024).decode()
if not data.startswith("id_"):
client_socket.close()
server_socket.close()
return
map = data.replace("id_", "").replace("\n", "").strip()
if not os.path.isfile(f'moves_{map}.pkl'):
print(map)
print("moves_" + map + ".pkl")
client_socket.sendall("not_exists".encode())
client_socket.close()
server_socket.close()
return
moves = []
with open(f'moves_{map}.pkl', 'rb') as f: # 'rb' = read binary
moves = pickle.load(f)
startValue = moves[0][0]
totalCount = len(moves)
for i, move in enumerate(moves):
moves[i] = ((move[0] - startValue) / 1000, move[1], move[2])
currIndex = 1
currTimeIndex = time.time()
deltaTime = time.time()
currStatus = f"Zaczoles tanczyc {currIndex}"
currMove = moves[0]
doStreak = False
streak = 0
doing = 0
actuallyDoing = False
while True:
doing += 1
frame = method.receive_frame()
frame = cv2.flip(frame, 1)
results = model(frame, verbose=False)
if not actuallyDoing:
client_socket.sendall("start".encode())
actuallyDoing = True
current_time = time.time()
delta = current_time - last_time
last_time = current_time
if doing % 30 == 0:
if doStreak:
streak += 5
client_socket.sendall(f"streak_{streak}".encode())
else:
streak = 0
client_socket.sendall(f"streak_0".encode())
fps = 1 / delta if delta > 0 else float('inf')
# print(f"\rDelta: {delta:.4f}s, FPS: {fps:.2f}", end="")
if len(results) != 0:
result = results[0]
kpts = result.keypoints.data[0] if len(result.keypoints.data) else None
if kpts is None:
continue
img = frame
normalized = normalize_pose(result.keypoints.xy.cpu().numpy()[0])
draw = utils.normalize(result.keypoints.xy.cpu().numpy()[0])
cv2.imshow('you', draw_new(draw * 100 + 100))
if currTimeIndex != 0 and moves.index(find_closest(moves, time.time() - currTimeIndex)) == len(moves) - 1:
mehCount = abs(totalCount - (failCount + goodCount))
stats = {
"failCount": failCount,
"goodCount": goodCount,
"mehCount": mehCount,
"percentage": (goodCount + (0.85 * mehCount)) / totalCount * 100
}
client_socket.sendall(f"finish_{json.dumps(stats)}".encode())
print(
f"PODSUMOWANIE: FAIL {failCount} MEH: {mehCount} PERFECT: {goodCount} PERCENTAGE: {(goodCount + (0.85 * mehCount)) / totalCount * 100}%")
cv2.destroyAllWindows()
break
# thread = Thread(target=print_animation, args=(moves, False))
# thread.start()
else:
changed = False
closest = find_closest(moves, time.time() - currTimeIndex)
cv2.imshow('Dots', draw_new(utils.normalize(closest[2]) * 250 + 250))
if abs((time.time() - currTimeIndex) - moves[currIndex][0]) > failRate:
currStatus = f"FAIL!"
failCount += 1
doStreak = False
if compare_poses_boolean(closest[1], normalized):
# delays += (time.time() - deltaTime - moves[0][0]) * 1000
# delaysCount += 1
# currStatus = f"SUPER! {currIndex} Zostalo {len(moves)} Delay {(time.time() - currTimeIndex - closest[0]) / 1000}ms"
deltaTime = time.time()
currIndex = moves.index(closest) + 1
goodCount += 1
changed = True
doStreak = True
if not changed and compare_poses_boolean(moves[currIndex][1], normalized):
# delays += (time.time() - deltaTime - moves[0][0]) * 1000
# delaysCount += 1
# currStatus = f"SUPER! {currIndex} Zostalo {len(moves)} Delay {(time.time() - currTimeIndex - closest[0]) / 1000}ms"
deltaTime = time.time()
changed = True
currIndex += 1
goodCount += 1
doStreak = True
# if do_pose_shot:
# moves.append((time.time() - startTime, normalize_pose(result.keypoints.xy.cpu().numpy()[0]), result.keypoints.xy.cpu()[0]))
# elif len(moves) != 0:
# with open('moves.pkl', 'wb') as f: # 'wb' = write binary
# pickle.dump
# (moves, f)
#
# exit(1)
cv2.putText(
img, # obraz
currStatus, # tekst
(50, 100), # pozycja (x, y) lewego dolnego rogu tekstu
cv2.FONT_HERSHEY_SIMPLEX, # czcionka
1, # rozmiar (skalowanie)
(0, 0, 255), # kolor (BGR) - tutaj czerwony
2, # grubość linii
cv2.LINE_AA # typ antyaliasingu
)
cv2.imshow('Klatka z kamerki', img)
cv2.setMouseCallback('Klatka z kamerki', click_event)
cv2.waitKey(1) # Czekaj na naciśnięcie klawisza
try:
while True:
main()
except KeyboardInterrupt:
pass