KepServer是一款能够方便地对plc等工业设备进行数据读取的工具。Python可使用win32com模块实现通过kepserver读取plc数据的功能。

win32com模块的安装命令:pip install pypiwin32

一、运行环境

1.安装Python:python3各版本均可,但必须是32位。这里使用的是 python-3.7.9,windows下32位。下载:

python-3.7.9-32位安装程序.rar-互联网文档类资源-CSDN下载

2.安装KepServer:各版本均可。这里使用的是Kepware.KEPServerEX.V6,下载:

KEPServerEXV6.4.rar_python读写kepware数据-其它文档类资源-CSDN下载

在KepServer中新建Channel1 -> Device1,建立tag1、tag2、tag3三个变量,连接plc后,在QC下查看数据是否能够正常读取。

3.注册opcdaauto

将opcdaauto.dll文件置于C:\Windows\System32目录下(使用64位环境时,置于SysWOW64目录下),cmd执行命令:regsvr32 C:\Windows\System32\opcdaauto.dll。下载:

注册opcdaauto.rar-互联网文档类资源-CSDN下载

二、python读取plc数据

import logging
import re
import time
from typing import List

import win32com.client
from win32com.client import gencache

logging.basicConfig(level='DEBUG')
logger = logging.getLogger('dll_dispatch')

#加载dll对象
OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
opcServer = OPC_DA_DLL.OPCServer() #获取opcServer对象
group_name_map = {}  # 存放已添加的组名
item_name_map = {}
Groups = None

def connect_opc(progID: str, node: str = '127.0.0.1'):
    opcServer.Connect(progID, node)
    logger.info('已连接到opc - [{}:{}]'.format(node, progID))
    return opcServer

def disconnect_opc(opcServer):
    logger.info('opc断开连接')
    opcServer.Disconnect()

def get_servers(opcServer) -> List[str]:
    """查询可用opc服务"""
    return opcServer.GetOPCServers()

class GroupProperty:
    # DeadBand
    IsActive: bool = True
    IsSubscribed: bool = True
    UpdateRate: int = 1000
    DefaultGroupIsActive: bool = True
    DefaultGroupDeadband: int = 0

def get_groups(opcServer, groupProperty: GroupProperty):
    global Groups
    if not Groups:
        Groups = opcServer.OPCGroups
        Groups.DefaultGroupIsActive = groupProperty.IsActive
        Groups.DefaultGroupDeadband = groupProperty.DefaultGroupDeadband
        Groups.DefaultGroupUpdateRate = groupProperty.UpdateRate
    return Groups

def get_group(opcServer, opcGroupName: str, groupProperty: GroupProperty):
    opcGroups = get_groups(opcServer, GroupProperty())
    if opcGroupName not in group_name_map:
        opcGroup = opcGroups.Add(opcGroupName)
        opcGroup.IsActive = groupProperty.IsActive
        opcGroup.UpdateRate = groupProperty.UpdateRate
        opcGroup.IsSubscribed = groupProperty.IsSubscribed

        group_name_map[opcGroupName] = opcGroup
    else:
        logger.debug('重复添加 GroupName - {}'.format(opcGroupName))

    return group_name_map[opcGroupName]


def get_items(opcServer, opcGroupName: str, groupProperty: GroupProperty):
    group = get_group(opcServer, opcGroupName, groupProperty)
    return group.OPCItems


def add_item(opcServer, itemName: str):
    """添加一个点位"""
    if itemName not in item_name_map:
        try:
            groupName = re.split(r'[.$][^.$]*?$', itemName, 1)[0]
        except IndexError:
            logger.error('点位名-{}-不符合规则'.format(itemName))
            return
        items = get_items(opcServer, groupName, GroupProperty())
        try:
            item = items.AddItem(itemName, len(item_name_map) + 1)
            # 添加失败时 item 为 None
            if not items:
                raise RuntimeError('add item[{}] return None'.format(itemName))
        except Exception as e:
            logger.error('添加点位-{}-失败! [点位可能不存在]'.format(itemName), exc_info=True)
            item_name_map[itemName] = None
        else:
            item.IsActive = True
            time.sleep(0.1)
            item_name_map[itemName] = item
    else:
        logger.debug('重复添加 itemName - {}'.format(itemName))


def add_items(opcServer, item_list: List[str]):
    """批量添加点位"""
    for item in item_list:
        add_item(opcServer, item)


def _sync_read(item):
    data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
    if not data:
        raise ValueError('sync_read return None.')
    return data

def sync_read_item(opcServer, itemName: str):
    """
    同步读取一个 opc 点位
    :param opcServer:
    :param itemName:
    :return:  (数据, 数据质量, 时间)
    """
    if itemName not in item_name_map:
        add_item(opcServer, itemName)
    logger.debug('读取 - {}'.format(itemName))
    item = item_name_map[itemName]
    try:
        if not item:
            raise ValueError('点位不存')
        return _sync_read(item)
    except Exception as e:
        logger.error('采集失败: {} - {}'.format(itemName, e), exc_info=True)
        return None, 0, 0


def sync_read_items(opcServer, itemNameList: List[str]):
    result = []
    for itemName in itemNameList:
        if itemName not in item_name_map:
            add_item(opcServer, itemName)

        result.append(sync_read_item(opcServer, itemName))
    return result

if __name__ == '__main__':
    opcServer = connect_opc('Kepware.KEPServerEX.V6', '127.0.0.1')
    print(get_servers(opcServer))
    # 定义点位列表
    tag_list = ['Channel1.Device1.tag1', 'Channel1.Device1.tag2', 'Channel1.Device1.tag3']
    try:
        while True:
            data = sync_read_items(opcServer, tag_list)
            for i in data:
                print(i[0])
            time.sleep(2)
    except Exception:
        logger.error('异常退出', exc_info=True)
    finally:
        opcServer.Disconnect()

主要参考

  1. https://blog.csdn/wanzheng_96/article/details/108431097?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-6.control&dist_request_id=1332042.24142.16193357577789097&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-6.control

更多推荐

python通过KepServer读取plc数据