mirror of
https://github.com/jonathanhogg/scopething
synced 2025-07-13 18:52:10 +01:00
Compare commits
3 Commits
ca9011011d
...
b62bffe631
Author | SHA1 | Date | |
---|---|---|---|
b62bffe631 | |||
529a96a31b | |||
22f7d1a81e |
48
analysis.py
48
analysis.py
@ -1,6 +1,8 @@
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from utils import DotDict
|
||||||
|
|
||||||
|
|
||||||
def interpolate_min_x(f, x):
|
def interpolate_min_x(f, x):
|
||||||
return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
|
return 0.5 * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x
|
||||||
@ -39,7 +41,7 @@ def moving_average(samples, width, mode='wrap'):
|
|||||||
|
|
||||||
|
|
||||||
def calculate_periodicity(series, window=0.1):
|
def calculate_periodicity(series, window=0.1):
|
||||||
samples = np.array(series.samples)
|
samples = np.array(series.samples, dtype='double')
|
||||||
window = int(len(samples) * window)
|
window = int(len(samples) * window)
|
||||||
errors = np.zeros(len(samples) - window)
|
errors = np.zeros(len(samples) - window)
|
||||||
for i in range(1, len(errors) + 1):
|
for i in range(1, len(errors) + 1):
|
||||||
@ -93,7 +95,7 @@ def normalize_waveform(samples, smooth=7):
|
|||||||
if first_falling is not None:
|
if first_falling is not None:
|
||||||
crossings.append((n + first_falling - last_rising, last_rising))
|
crossings.append((n + first_falling - last_rising, last_rising))
|
||||||
width, first = min(crossings)
|
width, first = min(crossings)
|
||||||
wave = np.hstack([smoothed[first:], smoothed[:first]]) / scale
|
wave = (np.hstack([samples[first:], samples[:first]]) - offset) / scale
|
||||||
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
|
return wave, offset, scale, first, sorted((i - first % n, w) for (w, i) in crossings)
|
||||||
|
|
||||||
|
|
||||||
@ -111,7 +113,7 @@ def characterize_waveform(samples, crossings):
|
|||||||
return possibles
|
return possibles
|
||||||
|
|
||||||
|
|
||||||
def analyze_series(series):
|
def annotate_series(series):
|
||||||
period = calculate_periodicity(series)
|
period = calculate_periodicity(series)
|
||||||
if period is not None:
|
if period is not None:
|
||||||
waveform = DotDict(period=period, frequency=1 / period)
|
waveform = DotDict(period=period, frequency=1 / period)
|
||||||
@ -122,6 +124,10 @@ def analyze_series(series):
|
|||||||
waveform.count = count
|
waveform.count = count
|
||||||
waveform.amplitude = scale
|
waveform.amplitude = scale
|
||||||
waveform.offset = underlying.mean() + offset
|
waveform.offset = underlying.mean() + offset
|
||||||
|
waveform.timestamps = np.arange(len(wave)) * series.sample_period
|
||||||
|
waveform.sample_period = series.sample_period
|
||||||
|
waveform.sample_rate = series.sample_rate
|
||||||
|
waveform.capture_start = series.capture_start + waveform.beginning * series.sample_period
|
||||||
possibles = characterize_waveform(wave, crossings)
|
possibles = characterize_waveform(wave, crossings)
|
||||||
if possibles:
|
if possibles:
|
||||||
error, shape, duty_cycle = possibles[0]
|
error, shape, duty_cycle = possibles[0]
|
||||||
@ -132,37 +138,5 @@ def analyze_series(series):
|
|||||||
else:
|
else:
|
||||||
waveform.shape = 'unknown'
|
waveform.shape = 'unknown'
|
||||||
series.waveform = waveform
|
series.waveform = waveform
|
||||||
|
return True
|
||||||
|
return False
|
||||||
# %%
|
|
||||||
|
|
||||||
from pylab import figure, plot, show
|
|
||||||
from utils import DotDict
|
|
||||||
|
|
||||||
o = 400
|
|
||||||
m = 5
|
|
||||||
n = o * m
|
|
||||||
samples = square_wave(o)
|
|
||||||
samples = np.hstack([samples] * m) * 2
|
|
||||||
samples = np.hstack([samples[100:], samples[:100]])
|
|
||||||
samples += np.random.normal(size=n) * 0.1
|
|
||||||
samples += np.linspace(4.5, 5.5, n)
|
|
||||||
series = DotDict(samples=samples, sample_rate=1000000)
|
|
||||||
|
|
||||||
analyze_series(series)
|
|
||||||
|
|
||||||
if 'waveform' in series:
|
|
||||||
waveform = series.waveform
|
|
||||||
if 'duty_cycle' in waveform:
|
|
||||||
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
|
|
||||||
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
|
|
||||||
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
|
|
||||||
else:
|
|
||||||
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
|
|
||||||
f"with amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
|
|
||||||
|
|
||||||
figure(1)
|
|
||||||
plot(series.samples)
|
|
||||||
wave = np.hstack([waveform.samples[-waveform.beginning:]] + [waveform.samples] * waveform.count + [waveform.samples[:-waveform.beginning]])
|
|
||||||
plot(wave * waveform.amplitude + waveform.offset)
|
|
||||||
show()
|
|
||||||
|
7
scope.py
7
scope.py
@ -302,7 +302,8 @@ class Scope(vm.VirtualMachine):
|
|||||||
address -= address % 2
|
address -= address % 2
|
||||||
|
|
||||||
traces = DotDict()
|
traces = DotDict()
|
||||||
timestamps = array.array('d', (t*self.master_clock_period for t in range(0, timestamp, ticks*clock_scale)))
|
|
||||||
|
timestamps = array.array('d', (i * sample_period for i in range(nsamples)))
|
||||||
for dump_channel, channel in enumerate(sorted(analog_channels)):
|
for dump_channel, channel in enumerate(sorted(analog_channels)):
|
||||||
asamples = nsamples // len(analog_channels)
|
asamples = nsamples // len(analog_channels)
|
||||||
async with self.transaction():
|
async with self.transaction():
|
||||||
@ -314,7 +315,7 @@ class Scope(vm.VirtualMachine):
|
|||||||
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
|
value_multiplier, value_offset = (1, 0) if raw else (high-low, low-analog_params.ab_offset/2*(1 if channel == 'A' else -1))
|
||||||
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
|
data = await self.read_analog_samples(asamples, capture_mode.sample_width)
|
||||||
series = DotDict({'channel': channel,
|
series = DotDict({'channel': channel,
|
||||||
'start_timestamp': start_timestamp,
|
'capture_start': start_timestamp * self.master_clock_period,
|
||||||
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
|
'timestamps': timestamps[dump_channel::len(analog_channels)] if len(analog_channels) > 1 else timestamps,
|
||||||
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
|
'samples': array.array('f', (value*value_multiplier+value_offset for value in data)),
|
||||||
'sample_period': sample_period*len(analog_channels),
|
'sample_period': sample_period*len(analog_channels),
|
||||||
@ -336,7 +337,7 @@ class Scope(vm.VirtualMachine):
|
|||||||
mask = 1 << i
|
mask = 1 << i
|
||||||
channel = f'L{i}'
|
channel = f'L{i}'
|
||||||
series = DotDict({'channel': channel,
|
series = DotDict({'channel': channel,
|
||||||
'start_timestamp': start_timestamp,
|
'capture_start': start_timestamp * self.master_clock_period,
|
||||||
'timestamps': timestamps,
|
'timestamps': timestamps,
|
||||||
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
|
'samples': array.array('B', (1 if value & mask else 0 for value in data)),
|
||||||
'sample_period': sample_period,
|
'sample_period': sample_period,
|
||||||
|
40
test.py
Normal file
40
test.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from pylab import figure, plot, show
|
||||||
|
|
||||||
|
from analysis import annotate_series
|
||||||
|
from scope import await_, capture, main
|
||||||
|
from utils import DotDict
|
||||||
|
|
||||||
|
|
||||||
|
await_(main())
|
||||||
|
|
||||||
|
# o = 400
|
||||||
|
# m = 5
|
||||||
|
# n = o * m
|
||||||
|
# samples = square_wave(o)
|
||||||
|
# samples = np.hstack([samples] * m) * 2
|
||||||
|
# samples = np.hstack([samples[100:], samples[:100]])
|
||||||
|
# samples += np.random.normal(size=n) * 0.1
|
||||||
|
# samples += np.linspace(4.5, 5.5, n)
|
||||||
|
# series = DotDict(samples=samples, sample_rate=1000000)
|
||||||
|
|
||||||
|
data = capture(['A'], period=20e-3, nsamples=2000)
|
||||||
|
series = data.A
|
||||||
|
|
||||||
|
figure(1)
|
||||||
|
plot(series.timestamps, series.samples)
|
||||||
|
|
||||||
|
if annotate_series(series):
|
||||||
|
waveform = series.waveform
|
||||||
|
if 'duty_cycle' in waveform:
|
||||||
|
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
|
||||||
|
f"with duty cycle {waveform.duty_cycle * 100:.0f}%, "
|
||||||
|
f"amplitude ±{waveform.amplitude:.1f}V and offset {waveform.offset:.1f}V")
|
||||||
|
else:
|
||||||
|
print(f"Found {waveform.frequency:.0f}Hz {waveform.shape} wave, "
|
||||||
|
f"with amplitude ±{waveform.amplitude:.2f}V and offset {waveform.offset:.2f}V")
|
||||||
|
|
||||||
|
plot(waveform.timestamps + waveform.capture_start - series.capture_start, waveform.samples * waveform.amplitude + waveform.offset)
|
||||||
|
|
||||||
|
show()
|
Reference in New Issue
Block a user