对象:ATI
语言:Python
方法:stock
目的与ATI通讯实时读取传感器信息并用将其可视化
直接上代码
import array
import pyqtgraph as pg
from ATI_APIpy import ATI_APIpy
if __name__=='__main__':
## test start
app = pg.mkQApp()#建立app
win = pg.GraphicsWindow()#建立窗口
win.setWindowTitle(u'pyqtgraph逐点画波形图')
win.resize(1600, 1200)#小窗口大小
# draw a dynamic picture
t=[0]
t_now=[0]
Fx_data = array.array('d')
fd = []
td = []
# link the ATI
ip='172.31.1.144'
ATI=ATI_APIpy(ip)
print('Connected ATI')
historyLength = 100#横坐标长度
p = win.addPlot()#把图p加入到窗口中
p.showGrid(x=True, y=True)#把X和Y的表格打开
p.setRange(xRange=[0,historyLength], yRange=[-1.2, 1.2], padding=0)
p.setLabel(axis='left', text='Contact force')#靠左
p.setLabel(axis='bottom', text='time')
p.setTitle('Real time contact force')#表格的名字
curve = p.plot()#绘制一个图形
while True:
def plotData():
ATI.startStreaming() # 告诉ATI开始读取
ATI.recieve() # 开始从ATI接受数据
f = ATI.force()
fd.append(f)
bd=len(fd)
tmp = fd[bd-1][0]
if len(Fx_data) < historyLength:
Fx_data.append(tmp)
else:
Fx_data[:-1] = Fx_data[1:] # 前移
Fx_data[-1] = tmp
curve.setData(Fx_data)
timer = pg.QtCore.QTimer()
timer.timeout.connect(plotData) # 定时调用plotData函数
timer.start(50) # 多少ms调用一次
app.exec_()
读取三维信息,而且同时在一个图框显示的代码:
import array
import pyqtgraph as pg
from ATI_APIpy import ATI_APIpy
from pyqtgraph.Qt import QtGui
if __name__ == '__main__':
## test start
app = pg.mkQApp() # 建立app
win = pg.GraphicsWindow() # 建立窗口
font = QtGui.QFont()
font.setPixelSize(20)
win.setWindowTitle(u'Realtime contact force')
win.resize(1600, 1200) # 小窗口大小
# draw a dynamic picture
t = [0]
t_now = [0]
Fx_data = array.array('d') # 可变的类型
Fy_data = array.array('d') # 可变的类型
Fz_data = array.array('d') # 可变的类型
fd = []
td = []
# link the ATI
ip = '172.31.1.144'
ATI = ATI_APIpy(ip)
print('Connected ATI')
labelStyle = {'color': '#FFF', 'font-size': '24pt'}
historyLength = 100 # 横坐标长度
p = win.addPlot() # 把图p加入到窗口中
p.showGrid(x=True, y=True) # 把X和Y的表格打开
p.setRange(xRange=[0, historyLength], yRange=[-20, 10], padding=0)
p.setLabel(axis='left', text='Contact force',**labelStyle) # 靠左
p.setLabel(axis='bottom', text='time',**labelStyle)
p.setTitle('Real time contact force',**labelStyle) # 表格的名字
p.addLegend() #给线加标注
curve_fx = p.plot() # 绘制一个图形
curve_fy = p.plot() # 绘制一个图形
curve_fz = p.plot() # 绘制一个图形
while True:
def plotData():
ATI.startStreaming() # 告诉ATI开始读取
ATI.recieve() # 开始从ATI接受数据
f = ATI.force()
fd.append(f)
bd = len(fd)
tmpx = fd[bd - 1][0]
tmpy = fd[bd - 1][1]
tmpz = fd[bd - 1][2]
if len(Fx_data) < historyLength:
Fx_data.append(tmpx)
Fy_data.append(tmpy)
Fz_data.append(tmpz)
else:
Fx_data[:-1] = Fx_data[1:] # 前移
Fx_data[-1] = tmpx
Fy_data[:-1] = Fy_data[1:] # 前移
Fy_data[-1] = tmpy
Fz_data[:-1] = Fz_data[1:] # 前移
Fz_data[-1] = tmpz
curve_fx.setData(Fx_data, pen=pg.mkPen('r'))#红色Fx
curve_fy.setData(Fy_data, pen=pg.mkPen('g'))#绿色Fy
curve_fz.setData(Fz_data, pen=pg.mkPen('w'))#白色Fz
timer = pg.QtCore.QTimer()
timer.timeout.connect(plotData) # 定时调用plotData函数
timer.start(50) # 多少ms调用一次
app.exec_()
怎么显示这些线的legend还没搞定。。。
另外调用的ATI_APIpy 代码如下:
import socket
import struct
import numpy as ny
from threading import Thread
from time import sleep
LOG_LEVEL = 2 # Information level
ALWAYS_LOG_LEVEL = 1 # print always
RECEIVE_FORMAT = '3I6i' # Format from the ATI sensor
COMMAND_HEADER = 0x1234 # Uint16_command_header
#Calibration
FORCE_CALIBRATION = 1000000.0
TORQUE_CALIBRATION = 1000000.0
#Uint16 command:
STOP_STREAMING = 0X0000 # Stop mode
REALTIME_STREAMING = 0x0002 # Fast (up to 7000 Hz)
REALTIME_BUFFERED_STREAMING = 0x0003 # Buffer mode
MULTI_UNIT_STREAMING = 0X0004 # Multi mode
INFINITE_STREAMING = 0x0000 #Infinite mode for realtime
STARTUP_STREAMING = 'STOP' # Mode it will start up with
class ATI_APIpy:
'''The class interface for an ATI Force/Torque sensor.
This class contains all the functions necessary to communicate
with an ATI Force/Torque sensor with a Net F/T interface
using RDT.
'''
def __init__(self, ip):
'''Start the sensor interface
This function intializes the class and opens the socket for the
sensor.
Args:
ip (str): The IP address of the Net F/T box.
'''
self.ip = ip
self.port = 49152
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.mean = [0] * 6
self.stream = False
def send(self, command, count = 0):
'''Send a given command to the Net F/T box with specified sample count.
This function sends the given RDT command to the Net F/T box, along with
the specified sample count, if needed.
Args:
command (int): The RDT command.
count (int, optional): The sample count to send. Defaults to 0.
'''
header = COMMAND_HEADER
message = struct.pack('!HHI', header, command, count)
self.sock.sendto(message, (self.ip, self.port))
def recieve(self):
'''Recieves and unpacks a response from the Net F/T box.
This function recieves and unpacks an RDT response from the Net F/T
box and saves it to the data class attribute.
Returns:
list of float: The force and torque values recieved. The first three
values are the forces recorded, and the last three are the measured
torques.
'''
rawdata = self.sock.recv(1024)
data = struct.unpack('!IIIiiiiii', rawdata)[3:]
self.data = [data[i] - self.mean[i] for i in range(6)]
return self.data
def tare(self, n = 10):
'''Tare the sensor.
This function takes a given number of readings from the sensor
and averages them. This mean is then stored and subtracted from
all future measurements.
Args:
n (int, optional): The number of samples to use in the mean.
Defaults to 10.
Returns:
list of float: The mean values calculated.
'''
self.mean = [0] * 6
self.getMeasurements(n = n)
mean = [0] * 6
for i in range(n):
self.recieve()
for i in range(6):
mean[i] += self.measurement()[i] / float(n)
self.mean = mean
return mean
def zero(self):
'''Remove the mean found with `tare` to start recieving raw sensor values.'''
self.mean = [0] * 6
def recieveHandler(self):
'''A handler to recieve and store data.'''
while self.stream:
self.recieve()
def getMeasurement(self):
'''Get a single measurement from the sensor
Request a single measurement from the sensor and return it. If
The sensor is currently streaming, started by running `startStreaming`,
then this function will simply return the most recently returned value.
Returns:
list of float: The force and torque values recieved. The first three
values are the forces recorded, and the last three are the measured
torques.
'''
self.getMeasurements(1)
self.recieve()
return self.data
def measurement(self):
'''Get the most recent force/torque measurement
Returns:
list of float: The force and torque values recieved. The first three
values are the forces recorded, and the last three are the measured
torques.
'''
return self.data
def getForce(self):
'''Get a single force measurement from the sensor
Request a single measurement from the sensor and return it.
Returns:
list of float: The force values recieved.
'''
return self.getMeasurement()[:3]
def force(self):
'''Get the most recent force measurement
Returns:
list of float: The force values recieved.
'''
return ny.asarray(self.measurement()[:3])/ FORCE_CALIBRATION
def getTorque(self):
'''Get a single torque measurement from the sensor
Request a single measurement from the sensor and return it.
Returns:
list of float: The torque values recieved.
'''
return self.getMeasurement()[3:]
def torque(self):
'''Get the most recent torque measurement
Returns:
list of float: The torque values recieved.
'''
return ny.asarray(self.measurement()[3:])/ TORQUE_CALIBRATION
def startStreaming(self, handler = True):
'''Start streaming data continuously
This function commands the Net F/T box to start sending data continuouly.
By default this also starts a new thread with a handler to save all data
points coming in. These data points can still be accessed with `meansurement`,
`force`, and `torque`. This handler can also be disabled and measurements
can be recieved manually using the `recieve` function.
Args:
handler (bool, optional): If True start the handler which saves data to be
used with `measurement`, `force`, and `torque`. If False the
measurements must be recieved manually. Defaults to True.
'''
self.getMeasurements(0)
if handler:
self.stream = True
self.thread = Thread(target = self.recieveHandler)
self.thread.daemon = True
self.thread.start()
def getMeasurements(self, n):
'''Request a given number of samples from the sensor
This function requestes a given number of samples from the sensor. These
measurements must be recieved manually using the `recieve` function.
Args:
n (int): The number of samples to request.
'''
self.send(2, count = n)
def stopStreaming(self):
'''Stop streaming data continuously
This function stops the sensor from streaming continuously as started using
`startStreaming`.
'''
self.stream = False
sleep(0.1)
self.send(0)
class Lib_b:
def D_F(self,a,b):
c =a+b
return c
主要的参考资料是:
ATI的python官方库?
https://pypi/project/NetFT/
python如何实时读取数据
https://blog.csdn/RNG_uzi_/article/details/96736432###
https://blog.csdn/yy123xiang/article/details/79304023
更多推荐
Python实现传感器信息读取和实时显示
发布评论