93 lines
2.9 KiB
Python
93 lines
2.9 KiB
Python
from collections import deque
|
|
|
|
import numpy as np
|
|
|
|
def filter_moves(moves):
|
|
newMoves = []
|
|
lastTime = 0
|
|
|
|
ema = EMAFilter(0.2)
|
|
|
|
for i, move in enumerate(moves):
|
|
s = move[0] / 1000
|
|
|
|
if i != len(moves) - 1:
|
|
origS = s
|
|
s = s - lastTime
|
|
lastTime = origS
|
|
|
|
newMoves.append((s, ema.update(move[1])))
|
|
|
|
return newMoves
|
|
|
|
class MedianFilter:
|
|
def __init__(self, n_channels=8, window_size=3):
|
|
self.n = n_channels
|
|
self.buffers = [deque(maxlen=window_size) for _ in range(n_channels)]
|
|
|
|
def update(self, angles_deg):
|
|
smoothed = []
|
|
for i, ang in enumerate(angles_deg):
|
|
self.buffers[i].append(ang)
|
|
smoothed_ang = np.median(self.buffers[i])
|
|
smoothed.append(smoothed_ang)
|
|
return smoothed
|
|
|
|
class HybridFilter:
|
|
def __init__(self, alpha=0.7, n_channels=8, median_window=3):
|
|
self.alpha = alpha
|
|
self.n = n_channels
|
|
self.median_window = median_window
|
|
|
|
# Bufory do mediany dla każdego kanału
|
|
self.buffers = [deque(maxlen=median_window) for _ in range(n_channels)]
|
|
|
|
# Stan EMA (cos/sin)
|
|
self.cos_state = [None] * n_channels
|
|
self.sin_state = [None] * n_channels
|
|
|
|
def update(self, angles_deg):
|
|
smoothed = []
|
|
for i, ang in enumerate(angles_deg):
|
|
# wrzucamy do bufora mediany
|
|
self.buffers[i].append(ang)
|
|
med = np.median(self.buffers[i]) # filtr medianowy
|
|
|
|
ang_rad = np.deg2rad(med)
|
|
c, s = np.cos(ang_rad), np.sin(ang_rad)
|
|
|
|
if self.cos_state[i] is None:
|
|
self.cos_state[i] = c
|
|
self.sin_state[i] = s
|
|
else:
|
|
self.cos_state[i] = self.alpha * c + (1 - self.alpha) * self.cos_state[i]
|
|
self.sin_state[i] = self.alpha * s + (1 - self.alpha) * self.sin_state[i]
|
|
|
|
smoothed_ang = np.rad2deg(np.arctan2(self.sin_state[i], self.cos_state[i]))
|
|
smoothed.append(smoothed_ang)
|
|
return smoothed
|
|
|
|
class EMAFilter:
|
|
def __init__(self, alpha=0.2, n_channels=8):
|
|
self.alpha = alpha
|
|
self.cos_state = [None] * n_channels
|
|
self.sin_state = [None] * n_channels
|
|
self.n = n_channels
|
|
|
|
def update(self, angles_deg):
|
|
smoothed = []
|
|
for i, ang in enumerate(angles_deg):
|
|
ang_rad = np.deg2rad(ang)
|
|
|
|
c, s = np.cos(ang_rad), np.sin(ang_rad)
|
|
|
|
if self.cos_state[i] is None:
|
|
self.cos_state[i] = c
|
|
self.sin_state[i] = s
|
|
else:
|
|
self.cos_state[i] = self.alpha * c + (1 - self.alpha) * self.cos_state[i]
|
|
self.sin_state[i] = self.alpha * s + (1 - self.alpha) * self.sin_state[i]
|
|
|
|
smoothed_ang = np.rad2deg(np.arctan2(self.sin_state[i], self.cos_state[i]))
|
|
smoothed.append(smoothed_ang)
|
|
return smoothed |