对象:ATI
语言:Python
方法:stock
目的与ATI通讯实时读取传感器信息并用将其可视化
直接上代码

import array
import pyqtgraph as pg
from ATI_APIpy import ATI_APIpy

if __name__=='__main__':
## test start
    app = pg.mkQApp()#建立app
    win = pg.GraphicsWindow()#建立窗口
    win.setWindowTitle(u'pyqtgraph逐点画波形图')
    win.resize(1600, 1200)#小窗口大小
# draw a dynamic picture
    t=[0]
    t_now=[0]
    Fx_data = array.array('d')
    fd = []
    td = []

# link the ATI
    ip='172.31.1.144'
    ATI=ATI_APIpy(ip)
    print('Connected ATI')
    historyLength = 100#横坐标长度
    p = win.addPlot()#把图p加入到窗口中
    p.showGrid(x=True, y=True)#把X和Y的表格打开
    p.setRange(xRange=[0,historyLength], yRange=[-1.2, 1.2], padding=0)
    p.setLabel(axis='left', text='Contact force')#靠左
    p.setLabel(axis='bottom', text='time')
    p.setTitle('Real time contact force')#表格的名字
    curve = p.plot()#绘制一个图形
    while True:
        def plotData():
            ATI.startStreaming()  # 告诉ATI开始读取
            ATI.recieve()  # 开始从ATI接受数据
            f = ATI.force()
            fd.append(f)
            bd=len(fd)
            tmp = fd[bd-1][0]
            if len(Fx_data) < historyLength:
                Fx_data.append(tmp)
            else:
                Fx_data[:-1] = Fx_data[1:]  # 前移
                Fx_data[-1] = tmp
            curve.setData(Fx_data)

        timer = pg.QtCore.QTimer()
        timer.timeout.connect(plotData)  # 定时调用plotData函数
        timer.start(50)  # 多少ms调用一次
        app.exec_()
        


读取三维信息,而且同时在一个图框显示的代码:

import array

import pyqtgraph as pg
from ATI_APIpy import ATI_APIpy
from pyqtgraph.Qt import QtGui

if __name__ == '__main__':
    ## test start
    app = pg.mkQApp()  # 建立app
    win = pg.GraphicsWindow()  # 建立窗口
    font = QtGui.QFont()
    font.setPixelSize(20)
    win.setWindowTitle(u'Realtime contact force')
    win.resize(1600, 1200)  # 小窗口大小
    # draw a dynamic picture
    t = [0]
    t_now = [0]
    Fx_data = array.array('d')  # 可变的类型
    Fy_data = array.array('d')  # 可变的类型
    Fz_data = array.array('d')  # 可变的类型
    fd = []
    td = []

    # link the ATI
    ip = '172.31.1.144'
    ATI = ATI_APIpy(ip)
    print('Connected ATI')
    labelStyle = {'color': '#FFF', 'font-size': '24pt'}
    historyLength = 100  # 横坐标长度
    p = win.addPlot()  # 把图p加入到窗口中
    p.showGrid(x=True, y=True)  # 把X和Y的表格打开
    p.setRange(xRange=[0, historyLength], yRange=[-20, 10], padding=0)
    p.setLabel(axis='left', text='Contact force',**labelStyle)  # 靠左
    p.setLabel(axis='bottom', text='time',**labelStyle)
    p.setTitle('Real time contact force',**labelStyle)  # 表格的名字
    p.addLegend() #给线加标注
    curve_fx = p.plot()  # 绘制一个图形
    curve_fy = p.plot()  # 绘制一个图形
    curve_fz = p.plot()  # 绘制一个图形

    while True:
        def plotData():
            ATI.startStreaming()  # 告诉ATI开始读取
            ATI.recieve()  # 开始从ATI接受数据
            f = ATI.force()
            fd.append(f)
            bd = len(fd)
            tmpx = fd[bd - 1][0]
            tmpy = fd[bd - 1][1]
            tmpz = fd[bd - 1][2]
            if len(Fx_data) < historyLength:
                Fx_data.append(tmpx)
                Fy_data.append(tmpy)
                Fz_data.append(tmpz)
            else:
                Fx_data[:-1] = Fx_data[1:]  # 前移
                Fx_data[-1] = tmpx
                Fy_data[:-1] = Fy_data[1:]  # 前移
                Fy_data[-1] = tmpy
                Fz_data[:-1] = Fz_data[1:]  # 前移
                Fz_data[-1] = tmpz
            curve_fx.setData(Fx_data, pen=pg.mkPen('r'))#红色Fx
            curve_fy.setData(Fy_data, pen=pg.mkPen('g'))#绿色Fy
            curve_fz.setData(Fz_data, pen=pg.mkPen('w'))#白色Fz



        timer = pg.QtCore.QTimer()
        timer.timeout.connect(plotData)  # 定时调用plotData函数
        timer.start(50)  # 多少ms调用一次
        app.exec_()


怎么显示这些线的legend还没搞定。。。
另外调用的ATI_APIpy 代码如下:

import socket
import struct
import numpy as ny
from threading import Thread
from time import sleep

LOG_LEVEL = 2 # Information level
ALWAYS_LOG_LEVEL = 1 # print always
RECEIVE_FORMAT = '3I6i' # Format from the ATI sensor
COMMAND_HEADER = 0x1234 # Uint16_command_header
#Calibration
FORCE_CALIBRATION = 1000000.0
TORQUE_CALIBRATION = 1000000.0
#Uint16 command:
STOP_STREAMING = 0X0000 # Stop mode
REALTIME_STREAMING = 0x0002 # Fast (up to 7000 Hz)
REALTIME_BUFFERED_STREAMING = 0x0003 # Buffer mode
MULTI_UNIT_STREAMING = 0X0004 # Multi mode
INFINITE_STREAMING = 0x0000 #Infinite mode for realtime
STARTUP_STREAMING = 'STOP' # Mode it will start up with



class ATI_APIpy:
	'''The class interface for an ATI Force/Torque sensor.

	This class contains all the functions necessary to communicate
	with an ATI Force/Torque sensor with a Net F/T interface
	using RDT.
	'''
	def __init__(self, ip):
		'''Start the sensor interface

		This function intializes the class and opens the socket for the
		sensor.

		Args:
			ip (str): The IP address of the Net F/T box.
		'''
		self.ip = ip
		self.port = 49152
		self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
		self.mean = [0] * 6
		self.stream = False

	def send(self, command, count = 0):
		'''Send a given command to the Net F/T box with specified sample count.

		This function sends the given RDT command to the Net F/T box, along with
		the specified sample count, if needed.

		Args:
			command (int): The RDT command.
			count (int, optional): The sample count to send. Defaults to 0.
		'''
		header = COMMAND_HEADER
		message = struct.pack('!HHI', header, command, count)
		self.sock.sendto(message, (self.ip, self.port))

	def recieve(self):
		'''Recieves and unpacks a response from the Net F/T box.

		This function recieves and unpacks an RDT response from the Net F/T
		box and saves it to the data class attribute.

		Returns:
			list of float: The force and torque values recieved. The first three
				values are the forces recorded, and the last three are the measured
				torques.
		'''
		rawdata = self.sock.recv(1024)
		data = struct.unpack('!IIIiiiiii', rawdata)[3:]
		self.data = [data[i] - self.mean[i]  for i in range(6)]
		return self.data

	def tare(self, n = 10):
		'''Tare the sensor.

		This function takes a given number of readings from the sensor
		and averages them. This mean is then stored and subtracted from
		all future measurements.

		Args:
			n (int, optional): The number of samples to use in the mean.
				Defaults to 10.

		Returns:
			list of float: The mean values calculated.
		'''
		self.mean = [0] * 6
		self.getMeasurements(n = n)
		mean = [0] * 6
		for i in range(n):
			self.recieve()
			for i in range(6):
				mean[i] += self.measurement()[i] / float(n)
		self.mean = mean
		return mean

	def zero(self):
		'''Remove the mean found with `tare` to start recieving raw sensor values.'''
		self.mean = [0] * 6

	def recieveHandler(self):
		'''A handler to recieve and store data.'''
		while self.stream:
			self.recieve()

	def getMeasurement(self):
		'''Get a single measurement from the sensor

		Request a single measurement from the sensor and return it. If
		The sensor is currently streaming, started by running `startStreaming`,
		then this function will simply return the most recently returned value.

		Returns:
			list of float: The force and torque values recieved. The first three
				values are the forces recorded, and the last three are the measured
				torques.
		'''
		self.getMeasurements(1)
		self.recieve()
		return self.data

	def measurement(self):
		'''Get the most recent force/torque measurement

		Returns:
			list of float: The force and torque values recieved. The first three
				values are the forces recorded, and the last three are the measured
				torques.
		'''
		return self.data

	def getForce(self):
		'''Get a single force measurement from the sensor

		Request a single measurement from the sensor and return it.

		Returns:
			list of float: The force values recieved.
		'''
		return self.getMeasurement()[:3]

	def force(self):
		'''Get the most recent force measurement

		Returns:
			list of float: The force values recieved.
		'''
		return ny.asarray(self.measurement()[:3])/ FORCE_CALIBRATION

	def getTorque(self):
		'''Get a single torque measurement from the sensor

		Request a single measurement from the sensor and return it.

		Returns:
			list of float: The torque values recieved.
		'''
		return self.getMeasurement()[3:]

	def torque(self):
		'''Get the most recent torque measurement

		Returns:
			list of float: The torque values recieved.
		'''

		return ny.asarray(self.measurement()[3:])/ TORQUE_CALIBRATION

	def startStreaming(self, handler = True):
		'''Start streaming data continuously

		This function commands the Net F/T box to start sending data continuouly.
		By default this also starts a new thread with a handler to save all data
		points coming in. These data points can still be accessed with `meansurement`,
		`force`, and `torque`. This handler can also be disabled and measurements
		can be recieved manually using the `recieve` function.

		Args:
			handler (bool, optional): If True start the handler which saves data to be
				used with `measurement`, `force`, and `torque`. If False the
				measurements must be recieved manually. Defaults to True.
		'''
		self.getMeasurements(0)
		if handler:
			self.stream = True
			self.thread = Thread(target = self.recieveHandler)
			self.thread.daemon = True
			self.thread.start()

	def getMeasurements(self, n):
		'''Request a given number of samples from the sensor

		This function requestes a given number of samples from the sensor. These
		measurements must be recieved manually using the `recieve` function.

		Args:
			n (int): The number of samples to request.
		'''
		self.send(2, count = n)

	def stopStreaming(self):
		'''Stop streaming data continuously

		This function stops the sensor from streaming continuously as started using
		`startStreaming`.
		'''
		self.stream = False
		sleep(0.1)
		self.send(0)

class Lib_b:


		def D_F(self,a,b):
				c =a+b
				return c

主要的参考资料是:
ATI的python官方库?
https://pypi/project/NetFT/
python如何实时读取数据
https://blog.csdn/RNG_uzi_/article/details/96736432###
https://blog.csdn/yy123xiang/article/details/79304023

更多推荐

Python实现传感器信息读取和实时显示