Arduino Due – 高速 ADC

0 0
Read Time:4 Minute, 52 Second

我們用Arduino Due ADC 抓了雷達迴波之後為了做後段的數位訊號處理, 必須先初步驗證Raw Data 的正確性。這裡會有兩個問題,  一是ADC 的取樣率, 二是Arduino Due 是透過USB(RS232)把data 往電腦送, 若要在PC 端實時看到ADC 取樣率data, ADC 取樣率愈高, USB 的傳輸率就需要愈高。

以下Arduino Due 程式碼 取至 https://gist.github.com/pklaus/5921022  
Arduino Due: ADC → DMA → USB @ 1MSPS感謝
Philipp Klaus

#undef HID_ENABLED
// Arduino Due ADC->DMA->USB 1MSPS
// by stimmer
// from http://forum.arduino.cc/index.php?topic=137635.msg1136315#msg1136315
// Input: Analog in A0
// Output: Raw stream of uint16_t in range 0-4095 on Native USB Serial/ACM
// on linux, to stop the OS cooking your data:
// stty -F /dev/ttyACM0 raw -iexten -echo -echoe -echok -echoctl -echoke -onlcr
volatile int bufn,obufn;
uint16_t buf[4][256]; // 4 buffers of 256 readings
void ADC_Handler(){ // move DMA pointers to next buffer
int f=ADC->ADC_ISR;
if (f&(1<<27)){
bufn=(bufn+1)&3;
ADC->ADC_RNPR=(uint32_t)buf[bufn];
ADC->ADC_RNCR=256;
}
}
void setup(){
SerialUSB.begin(0);
while(!SerialUSB);
pmc_enable_periph_clk(ID_ADC);
adc_init(ADC, SystemCoreClock, ADC_FREQ_MAX, ADC_STARTUP_FAST);
ADC->ADC_MR |=0x80; // free running
ADC->ADC_CHER=0x80;
NVIC_EnableIRQ(ADC_IRQn);
ADC->ADC_IDR=~(1<<27);
ADC->ADC_IER=1<<27;
ADC->ADC_RPR=(uint32_t)buf[0]; // DMA buffer
ADC->ADC_RCR=256;
ADC->ADC_RNPR=(uint32_t)buf[1]; // next DMA buffer
ADC->ADC_RNCR=256;
bufn=obufn=1;
ADC->ADC_PTCR=1;
ADC->ADC_CR=2;
}
void loop(){
while(obufn==bufn); // wait for buffer to be full
SerialUSB.write((uint8_t *)buf[obufn],512); // send it – 512 bytes = 256 uint16_t
obufn=(obufn+1)&3;
}

同樣在 PC 端顯示()ADC 實時Data 

以下Python程式碼 取至 https://gist.github.com/pklaus/5921022  
Arduino Due: ADC → DMA → USB @ 1MSPS感謝 
Philipp Klaus

#!/usr/bin/env python
# from http://forum.arduino.cc/index.php?topic=137635.msg1270996#msg1270996
import pyqtgraph as pg
import time, threading, sys
import serial
import numpy as np
class SerialReader(threading.Thread):
“”” Defines a thread for reading and buffering serial data.
By default, about 5MSamples are stored in the buffer.
Data can be retrieved from the buffer by calling get(N)”””
def __init__(self, port, chunkSize=1024, chunks=5000):
threading.Thread.__init__(self)
# circular buffer for storing serial data until it is
# fetched by the GUI
self.buffer = np.zeros(chunks*chunkSize, dtype=np.uint16)
self.chunks = chunks # number of chunks to store in the buffer
self.chunkSize = chunkSize # size of a single chunk (items, not bytes)
self.ptr = 0 # pointer to most (recently collected buffer index) + 1
self.port = port # serial port handle
self.sps = 0.0 # holds the average sample acquisition rate
self.exitFlag = False
self.exitMutex = threading.Lock()
self.dataMutex = threading.Lock()
def run(self):
exitMutex = self.exitMutex
dataMutex = self.dataMutex
buffer = self.buffer
port = self.port
count = 0
sps = None
lastUpdate = pg.ptime.time()
while True:
# see whether an exit was requested
with exitMutex:
if self.exitFlag:
break
# read one full chunk from the serial port
data = port.read(self.chunkSize*2)
# convert data to 16bit int numpy array
data = np.fromstring(data, dtype=np.uint16)
# keep track of the acquisition rate in samples-per-second
count += self.chunkSize
now = pg.ptime.time()
dt = now-lastUpdate
if dt > 1.0:
# sps is an exponential average of the running sample rate measurement
if sps is None:
sps = count / dt
else:
sps = sps * 0.9 + (count / dt) * 0.1
count = 0
lastUpdate = now
# write the new chunk into the circular buffer
# and update the buffer pointer
with dataMutex:
buffer[self.ptr:self.ptr+self.chunkSize] = data
self.ptr = (self.ptr + self.chunkSize) % buffer.shape[0]
if sps is not None:
self.sps = sps
def get(self, num, downsample=1):
“”” Return a tuple (time_values, voltage_values, rate)
– voltage_values will contain the *num* most recently-collected samples
as a 32bit float array.
– time_values assumes samples are collected at 1MS/s
– rate is the running average sample rate.
If *downsample* is > 1, then the number of values returned will be
reduced by averaging that number of consecutive samples together. In
this case, the voltage array will be returned as 32bit float.
“””
with self.dataMutex: # lock the buffer and copy the requested data out
ptr = self.ptr
if ptr-num < 0:
data = np.empty(num, dtype=np.uint16)
data[:num-ptr] = self.buffer[ptr-num:]
data[num-ptr:] = self.buffer[:ptr]
else:
data = self.buffer[self.ptr-num:self.ptr].copy()
rate = self.sps
# Convert array to float and rescale to voltage.
# Assume 3.3V / 12bits
# (we need calibration data to do a better job on this)
data = data.astype(np.float32) * (3.3 / 2**12)
if downsample > 1: # if downsampling is requested, average N samples together
data = data.reshape(num/downsample,downsample).mean(axis=1)
num = data.shape[0]
return np.linspace(0, (num-1)*1e-6*downsample, num), data, rate
else:
return np.linspace(0, (num-1)*1e-6, num), data, rate
def exit(self):
“”” Instruct the serial thread to exit.”””
with self.exitMutex:
self.exitFlag = True
# Get handle to serial port
# (your port string may vary; windows users need ‘COMn’)
s = serial.Serial(‘/dev/ttyACM0’)
# Create the GUI
app = pg.mkQApp()
plt = pg.plot()
plt.setLabels(left=(‘ADC Signal’, ‘V’), bottom=(‘Time’, ‘s’))
plt.setYRange(0.0, 3.3)
# Create thread to read and buffer serial data.
thread = SerialReader(s)
thread.start()
# Calling update() will request a copy of the most recently-acquired
# samples and plot them.
def update():
global plt, thread
t,v,r = thread.get(1000*1024, downsample=100)
plt.plot(t, v, clear=True)
plt.setTitle(‘Sample Rate: %0.2f’%r)
if not plt.isVisible():
thread.exit()
timer.stop()
# Set up a timer with 0 interval so Qt will call update()
# as rapidly as it can handle.
timer = pg.QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
# Start Qt event loop.
if sys.flags.interactive == 0:
app.exec_()
Happy
Happy
0 %
Sad
Sad
0 %
Excited
Excited
0 %
Sleepy
Sleepy
0 %
Angry
Angry
0 %
Surprise
Surprise
0 %