import numpy as np def interpolate_min_x(f, x): return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x def rms(f): return np.sqrt((f ** 2).mean()) def sine_wave(n): return np.sin(np.linspace(0, 2*np.pi, n, endpoint=False)) def triangle_wave(n): x = np.linspace(0, 4, n, endpoint=False) x2 = x % 2 y = np.where(x2 < 1, x2, 2 - x2) y = np.where(x // 2 < 1, y, -y) return y def square_wave(n, duty=0.5): w = int(n * duty) return np.hstack([np.ones(w), -np.ones(n - w)]) def sawtooth_wave(n): return 2 * (np.linspace(0.5, 1.5, n, endpoint=False) % 1) - 1 def moving_average(samples, width, mode='wrap'): hwidth = width // 2 samples = np.take(samples, np.arange(-hwidth, len(samples)+width-hwidth), mode=mode) cumulative = samples.cumsum() return (cumulative[width:] - cumulative[:-width]) / width def calculate_periodicity(series, window=0.1): samples = np.array(series.samples) window = int(len(samples) * window) errors = np.zeros(len(samples) - window) for i in range(1, len(errors) + 1): errors[i-1] = rms(samples[i:] - samples[:-i]) threshold = errors.max() / 2 minima = [] for i in range(window, len(errors) - window): p = errors[i-window:i+window].argmin() if p == window and errors[p + i - window] < threshold: minima.append(interpolate_min_x(errors, i)) if len(minima) <= 1: return None ks = np.polyfit(np.arange(0, len(minima)), minima, 1) return ks[0] / series.sample_rate def extract_waveform(series, period): p = int(round(series.sample_rate * period)) n = len(series.samples) // p if n <= 2: return None, None samples = np.array(series.samples)[:p*n] cumsum = samples.cumsum() underlying = (cumsum[p:] - cumsum[:-p]) / p n -= 1 samples = samples[p//2:p*n + p//2] - underlying wave = np.zeros(p) for i in range(n): o = i * p wave += samples[o:o+p] wave /= n return wave, p//2, n, underlying def normalize_waveform(samples, smooth=7): n = len(samples) smoothed = moving_average(samples, smooth) scale = (smoothed.max() - smoothed.min()) / 2 offset = (smoothed.max() + smoothed.min()) / 2 smoothed -= offset last_rising = first_falling = None crossings = [] for i in range(n): if smoothed[i-1] < 0 and smoothed[i] > 0: last_rising = i elif smoothed[i-1] > 0 and smoothed[i] < 0: if last_rising is None: first_falling = i else: crossings.append((i - last_rising, last_rising)) if first_falling is not None: crossings.append((n + first_falling - last_rising, last_rising)) width, first = min(crossings) wave = np.hstack([smoothed[first:], smoothed[:first]]) / scale return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings) def characterize_waveform(samples, crossings): n = len(samples) possibles = [] if len(crossings) == 1: duty_cycle = crossings[0][1] / n if duty_cycle > 0.45 and duty_cycle < 0.55: possibles.append((rms(samples - sine_wave(n)), 'sine', None)) possibles.append((rms(samples - triangle_wave(n)), 'triangle', None)) possibles.append((rms(samples - sawtooth_wave(n)), 'sawtooth', None)) possibles.append((rms(samples - square_wave(n, duty_cycle)), 'square', duty_cycle)) possibles.sort() return possibles def analyze_series(series): period = calculate_periodicity(series) if period is not None: waveform = DotDict(period=period, frequency=1 / period) wave, start, count, underlying = extract_waveform(series, period) wave, offset, scale, first, crossings = normalize_waveform(wave) waveform.samples = wave waveform.beginning = start + first waveform.count = count waveform.amplitude = scale waveform.offset = underlying.mean() + offset possibles = characterize_waveform(wave, crossings) if possibles: error, shape, duty_cycle = possibles[0] waveform.error = error waveform.shape = shape if duty_cycle is not None: waveform.duty_cycle = duty_cycle else: waveform.shape = 'unknown' series.waveform = waveform # %% from pylab import figure, plot, show from utils import DotDict o = 400 m = 5 n = o * m samples = square_wave(o) samples = np.hstack([samples] * m) * 2 samples = np.hstack([samples[100:], samples[:100]]) samples += np.random.normal(size=n) * 0.1 samples += np.linspace(4.5, 5.5, n) series = DotDict(samples=samples, sample_rate=1000000) analyze_series(series) if 'waveform' in series: waveform = series.waveform if 'duty_cycle' in waveform: print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, " f"with duty cycle {waveform.duty_cycle * 100:.0f}%, " f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V") else: print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, " f"with amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V") figure(1) plot(series.samples) wave = np.hstack([waveform.samples[-waveform.beginning:]] + [waveform.samples] * waveform.count + [waveform.samples[:-waveform.beginning]]) plot(wave * waveform.amplitude + waveform.offset) show()