Read Time:4 Minute, 52 Second
我們用Arduino Due ADC 抓了雷達迴波之後為了做後段的數位訊號處理, 必須先初步驗證Raw Data 的正確性。這裡會有兩個問題, 一是ADC 的取樣率, 二是Arduino Due 是透過USB(RS232)把data 往電腦送, 若要在PC 端實時看到ADC 取樣率data, ADC 取樣率愈高, USB 的傳輸率就需要愈高。
以下Arduino Due 程式碼 取至 https://gist.github.com/pklaus/5921022
Arduino Due: ADC → DMA → USB @ 1MSPS感謝
Philipp Klaus
| #undef HID_ENABLED |
| // Arduino Due ADC->DMA->USB 1MSPS |
| // by stimmer |
| // from http://forum.arduino.cc/index.php?topic=137635.msg1136315#msg1136315 |
| // Input: Analog in A0 |
| // Output: Raw stream of uint16_t in range 0-4095 on Native USB Serial/ACM |
| // on linux, to stop the OS cooking your data: |
| // stty -F /dev/ttyACM0 raw -iexten -echo -echoe -echok -echoctl -echoke -onlcr |
| volatile int bufn,obufn; |
| uint16_t buf[4][256]; // 4 buffers of 256 readings |
| void ADC_Handler(){ // move DMA pointers to next buffer |
| int f=ADC->ADC_ISR; |
| if (f&(1<<27)){ |
| bufn=(bufn+1)&3; |
| ADC->ADC_RNPR=(uint32_t)buf[bufn]; |
| ADC->ADC_RNCR=256; |
| } |
| } |
| void setup(){ |
| SerialUSB.begin(0); |
| while(!SerialUSB); |
| pmc_enable_periph_clk(ID_ADC); |
| adc_init(ADC, SystemCoreClock, ADC_FREQ_MAX, ADC_STARTUP_FAST); |
| ADC->ADC_MR |=0x80; // free running |
| ADC->ADC_CHER=0x80; |
| NVIC_EnableIRQ(ADC_IRQn); |
| ADC->ADC_IDR=~(1<<27); |
| ADC->ADC_IER=1<<27; |
| ADC->ADC_RPR=(uint32_t)buf[0]; // DMA buffer |
| ADC->ADC_RCR=256; |
| ADC->ADC_RNPR=(uint32_t)buf[1]; // next DMA buffer |
| ADC->ADC_RNCR=256; |
| bufn=obufn=1; |
| ADC->ADC_PTCR=1; |
| ADC->ADC_CR=2; |
| } |
| void loop(){ |
| while(obufn==bufn); // wait for buffer to be full |
| SerialUSB.write((uint8_t *)buf[obufn],512); // send it – 512 bytes = 256 uint16_t |
| obufn=(obufn+1)&3; |
| } |
同樣在 PC 端顯示()ADC 實時Data
以下Python程式碼 取至 https://gist.github.com/pklaus/5921022
Arduino Due: ADC → DMA → USB @ 1MSPS感謝
Philipp Klaus
| #!/usr/bin/env python |
| # from http://forum.arduino.cc/index.php?topic=137635.msg1270996#msg1270996 |
| import pyqtgraph as pg |
| import time, threading, sys |
| import serial |
| import numpy as np |
| class SerialReader(threading.Thread): |
| “”” Defines a thread for reading and buffering serial data. |
| By default, about 5MSamples are stored in the buffer. |
| Data can be retrieved from the buffer by calling get(N)””” |
| def __init__(self, port, chunkSize=1024, chunks=5000): |
| threading.Thread.__init__(self) |
| # circular buffer for storing serial data until it is |
| # fetched by the GUI |
| self.buffer = np.zeros(chunks*chunkSize, dtype=np.uint16) |
| self.chunks = chunks # number of chunks to store in the buffer |
| self.chunkSize = chunkSize # size of a single chunk (items, not bytes) |
| self.ptr = 0 # pointer to most (recently collected buffer index) + 1 |
| self.port = port # serial port handle |
| self.sps = 0.0 # holds the average sample acquisition rate |
| self.exitFlag = False |
| self.exitMutex = threading.Lock() |
| self.dataMutex = threading.Lock() |
| def run(self): |
| exitMutex = self.exitMutex |
| dataMutex = self.dataMutex |
| buffer = self.buffer |
| port = self.port |
| count = 0 |
| sps = None |
| lastUpdate = pg.ptime.time() |
| while True: |
| # see whether an exit was requested |
| with exitMutex: |
| if self.exitFlag: |
| break |
| # read one full chunk from the serial port |
| data = port.read(self.chunkSize*2) |
| # convert data to 16bit int numpy array |
| data = np.fromstring(data, dtype=np.uint16) |
| # keep track of the acquisition rate in samples-per-second |
| count += self.chunkSize |
| now = pg.ptime.time() |
| dt = now-lastUpdate |
| if dt > 1.0: |
| # sps is an exponential average of the running sample rate measurement |
| if sps is None: |
| sps = count / dt |
| else: |
| sps = sps * 0.9 + (count / dt) * 0.1 |
| count = 0 |
| lastUpdate = now |
| # write the new chunk into the circular buffer |
| # and update the buffer pointer |
| with dataMutex: |
| buffer[self.ptr:self.ptr+self.chunkSize] = data |
| self.ptr = (self.ptr + self.chunkSize) % buffer.shape[0] |
| if sps is not None: |
| self.sps = sps |
| def get(self, num, downsample=1): |
| “”” Return a tuple (time_values, voltage_values, rate) |
| – voltage_values will contain the *num* most recently-collected samples |
| as a 32bit float array. |
| – time_values assumes samples are collected at 1MS/s |
| – rate is the running average sample rate. |
| If *downsample* is > 1, then the number of values returned will be |
| reduced by averaging that number of consecutive samples together. In |
| this case, the voltage array will be returned as 32bit float. |
| “”” |
| with self.dataMutex: # lock the buffer and copy the requested data out |
| ptr = self.ptr |
| if ptr-num < 0: |
| data = np.empty(num, dtype=np.uint16) |
| data[:num-ptr] = self.buffer[ptr-num:] |
| data[num-ptr:] = self.buffer[:ptr] |
| else: |
| data = self.buffer[self.ptr-num:self.ptr].copy() |
| rate = self.sps |
| # Convert array to float and rescale to voltage. |
| # Assume 3.3V / 12bits |
| # (we need calibration data to do a better job on this) |
| data = data.astype(np.float32) * (3.3 / 2**12) |
| if downsample > 1: # if downsampling is requested, average N samples together |
| data = data.reshape(num/downsample,downsample).mean(axis=1) |
| num = data.shape[0] |
| return np.linspace(0, (num-1)*1e-6*downsample, num), data, rate |
| else: |
| return np.linspace(0, (num-1)*1e-6, num), data, rate |
| def exit(self): |
| “”” Instruct the serial thread to exit.””” |
| with self.exitMutex: |
| self.exitFlag = True |
| # Get handle to serial port |
| # (your port string may vary; windows users need ‘COMn’) |
| s = serial.Serial(‘/dev/ttyACM0’) |
| # Create the GUI |
| app = pg.mkQApp() |
| plt = pg.plot() |
| plt.setLabels(left=(‘ADC Signal’, ‘V’), bottom=(‘Time’, ‘s’)) |
| plt.setYRange(0.0, 3.3) |
| # Create thread to read and buffer serial data. |
| thread = SerialReader(s) |
| thread.start() |
| # Calling update() will request a copy of the most recently-acquired |
| # samples and plot them. |
| def update(): |
| global plt, thread |
| t,v,r = thread.get(1000*1024, downsample=100) |
| plt.plot(t, v, clear=True) |
| plt.setTitle(‘Sample Rate: %0.2f’%r) |
| if not plt.isVisible(): |
| thread.exit() |
| timer.stop() |
| # Set up a timer with 0 interval so Qt will call update() |
| # as rapidly as it can handle. |
| timer = pg.QtCore.QTimer() |
| timer.timeout.connect(update) |
| timer.start(0) |
| # Start Qt event loop. |
| if sys.flags.interactive == 0: |
| app.exec_() |
