Read Time:4 Minute, 52 Second
我們用Arduino Due ADC 抓了雷達迴波之後為了做後段的數位訊號處理, 必須先初步驗證Raw Data 的正確性。這裡會有兩個問題, 一是ADC 的取樣率, 二是Arduino Due 是透過USB(RS232)把data 往電腦送, 若要在PC 端實時看到ADC 取樣率data, ADC 取樣率愈高, USB 的傳輸率就需要愈高。
以下Arduino Due 程式碼 取至 https://gist.github.com/pklaus/5921022
Arduino Due: ADC → DMA → USB @ 1MSPS感謝
Philipp Klaus
#undef HID_ENABLED |
// Arduino Due ADC->DMA->USB 1MSPS |
// by stimmer |
// from http://forum.arduino.cc/index.php?topic=137635.msg1136315#msg1136315 |
// Input: Analog in A0 |
// Output: Raw stream of uint16_t in range 0-4095 on Native USB Serial/ACM |
// on linux, to stop the OS cooking your data: |
// stty -F /dev/ttyACM0 raw -iexten -echo -echoe -echok -echoctl -echoke -onlcr |
volatile int bufn,obufn; |
uint16_t buf[4][256]; // 4 buffers of 256 readings |
void ADC_Handler(){ // move DMA pointers to next buffer |
int f=ADC->ADC_ISR; |
if (f&(1<<27)){ |
bufn=(bufn+1)&3; |
ADC->ADC_RNPR=(uint32_t)buf[bufn]; |
ADC->ADC_RNCR=256; |
} |
} |
void setup(){ |
SerialUSB.begin(0); |
while(!SerialUSB); |
pmc_enable_periph_clk(ID_ADC); |
adc_init(ADC, SystemCoreClock, ADC_FREQ_MAX, ADC_STARTUP_FAST); |
ADC->ADC_MR |=0x80; // free running |
ADC->ADC_CHER=0x80; |
NVIC_EnableIRQ(ADC_IRQn); |
ADC->ADC_IDR=~(1<<27); |
ADC->ADC_IER=1<<27; |
ADC->ADC_RPR=(uint32_t)buf[0]; // DMA buffer |
ADC->ADC_RCR=256; |
ADC->ADC_RNPR=(uint32_t)buf[1]; // next DMA buffer |
ADC->ADC_RNCR=256; |
bufn=obufn=1; |
ADC->ADC_PTCR=1; |
ADC->ADC_CR=2; |
} |
void loop(){ |
while(obufn==bufn); // wait for buffer to be full |
SerialUSB.write((uint8_t *)buf[obufn],512); // send it – 512 bytes = 256 uint16_t |
obufn=(obufn+1)&3; |
} |
同樣在 PC 端顯示()ADC 實時Data
以下Python程式碼 取至 https://gist.github.com/pklaus/5921022
Arduino Due: ADC → DMA → USB @ 1MSPS感謝
Philipp Klaus
#!/usr/bin/env python |
# from http://forum.arduino.cc/index.php?topic=137635.msg1270996#msg1270996 |
import pyqtgraph as pg |
import time, threading, sys |
import serial |
import numpy as np |
class SerialReader(threading.Thread): |
“”” Defines a thread for reading and buffering serial data. |
By default, about 5MSamples are stored in the buffer. |
Data can be retrieved from the buffer by calling get(N)””” |
def __init__(self, port, chunkSize=1024, chunks=5000): |
threading.Thread.__init__(self) |
# circular buffer for storing serial data until it is |
# fetched by the GUI |
self.buffer = np.zeros(chunks*chunkSize, dtype=np.uint16) |
self.chunks = chunks # number of chunks to store in the buffer |
self.chunkSize = chunkSize # size of a single chunk (items, not bytes) |
self.ptr = 0 # pointer to most (recently collected buffer index) + 1 |
self.port = port # serial port handle |
self.sps = 0.0 # holds the average sample acquisition rate |
self.exitFlag = False |
self.exitMutex = threading.Lock() |
self.dataMutex = threading.Lock() |
def run(self): |
exitMutex = self.exitMutex |
dataMutex = self.dataMutex |
buffer = self.buffer |
port = self.port |
count = 0 |
sps = None |
lastUpdate = pg.ptime.time() |
while True: |
# see whether an exit was requested |
with exitMutex: |
if self.exitFlag: |
break |
# read one full chunk from the serial port |
data = port.read(self.chunkSize*2) |
# convert data to 16bit int numpy array |
data = np.fromstring(data, dtype=np.uint16) |
# keep track of the acquisition rate in samples-per-second |
count += self.chunkSize |
now = pg.ptime.time() |
dt = now-lastUpdate |
if dt > 1.0: |
# sps is an exponential average of the running sample rate measurement |
if sps is None: |
sps = count / dt |
else: |
sps = sps * 0.9 + (count / dt) * 0.1 |
count = 0 |
lastUpdate = now |
# write the new chunk into the circular buffer |
# and update the buffer pointer |
with dataMutex: |
buffer[self.ptr:self.ptr+self.chunkSize] = data |
self.ptr = (self.ptr + self.chunkSize) % buffer.shape[0] |
if sps is not None: |
self.sps = sps |
def get(self, num, downsample=1): |
“”” Return a tuple (time_values, voltage_values, rate) |
– voltage_values will contain the *num* most recently-collected samples |
as a 32bit float array. |
– time_values assumes samples are collected at 1MS/s |
– rate is the running average sample rate. |
If *downsample* is > 1, then the number of values returned will be |
reduced by averaging that number of consecutive samples together. In |
this case, the voltage array will be returned as 32bit float. |
“”” |
with self.dataMutex: # lock the buffer and copy the requested data out |
ptr = self.ptr |
if ptr-num < 0: |
data = np.empty(num, dtype=np.uint16) |
data[:num-ptr] = self.buffer[ptr-num:] |
data[num-ptr:] = self.buffer[:ptr] |
else: |
data = self.buffer[self.ptr-num:self.ptr].copy() |
rate = self.sps |
# Convert array to float and rescale to voltage. |
# Assume 3.3V / 12bits |
# (we need calibration data to do a better job on this) |
data = data.astype(np.float32) * (3.3 / 2**12) |
if downsample > 1: # if downsampling is requested, average N samples together |
data = data.reshape(num/downsample,downsample).mean(axis=1) |
num = data.shape[0] |
return np.linspace(0, (num-1)*1e-6*downsample, num), data, rate |
else: |
return np.linspace(0, (num-1)*1e-6, num), data, rate |
def exit(self): |
“”” Instruct the serial thread to exit.””” |
with self.exitMutex: |
self.exitFlag = True |
# Get handle to serial port |
# (your port string may vary; windows users need ‘COMn’) |
s = serial.Serial(‘/dev/ttyACM0’) |
# Create the GUI |
app = pg.mkQApp() |
plt = pg.plot() |
plt.setLabels(left=(‘ADC Signal’, ‘V’), bottom=(‘Time’, ‘s’)) |
plt.setYRange(0.0, 3.3) |
# Create thread to read and buffer serial data. |
thread = SerialReader(s) |
thread.start() |
# Calling update() will request a copy of the most recently-acquired |
# samples and plot them. |
def update(): |
global plt, thread |
t,v,r = thread.get(1000*1024, downsample=100) |
plt.plot(t, v, clear=True) |
plt.setTitle(‘Sample Rate: %0.2f’%r) |
if not plt.isVisible(): |
thread.exit() |
timer.stop() |
# Set up a timer with 0 interval so Qt will call update() |
# as rapidly as it can handle. |
timer = pg.QtCore.QTimer() |
timer.timeout.connect(update) |
timer.start(0) |
# Start Qt event loop. |
if sys.flags.interactive == 0: |
app.exec_() |