TechBlog
首页分类标签搜索关于

© 2025 TechBlog. All rights reserved.

python使用opcua的订阅和mqtt

11/23/2025

python使用opcua的订阅和mqtt

Python实现OPC UA订阅与MQTT集成

1:写法一,最简单的写法

# python
from datetime import datetime
import json
import os
import string
import time
import logging
import threading
from logging.handlers import TimedRotatingFileHandler
import random

from opcua import Client
import paho.mqtt.client as mqtt


num = string.ascii_letters + string.digits
client_id = "".join(random.sample(num, 10))  # 随机生成一串数字字母,防止mqtt登录id冲突
# mqtt服务器地址
HOST = "127.0.0.1"
PORT = 1883
username = "admin"
password = "public"
pub_topic = "hdx/pub123_fk"
heartbeat_topic = "hdx/heartbeat_QX"
state_mqtt = False
state_opcua = False

# 全局共享结构与锁
out_data = {}
out_lock = threading.Lock()

SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]


class MyLogFilter(logging.Filter):
    def filter(self, record):
        # 只允许特定名称的日志记录器通过
        return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))


def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, filename)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')

    file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.INFO)

    root = logging.getLogger()
    root.setLevel(level)
    # root.setLevel(logging.WARNING)
    # logging.getLogger('mqtt').setLevel(logging.INFO)

    # logging.getLogger('opcua').setLevel(logging.WARNING)
    # logging.getLogger('paho').setLevel(logging.WARNING)
    # logging.getLogger('urllib3').setLevel(logging.WARNING)

    abs_log_path = os.path.abspath(log_path)
    if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
               root.handlers):
        root.addHandler(file_handler)
        # # 只为新添加的文件处理器添加过滤器
        # file_handler.addFilter(MyLogFilter())

    console_level = console_level if console_level is not None else level
    existing_console = None
    for h in root.handlers:
        if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
            existing_console = h
            break

    if existing_console is None:
        console = logging.StreamHandler(sys.stdout)
        console.setFormatter(formatter)
        console.setLevel(console_level)
        root.addHandler(console)
        # # 只为新添加的控制台处理器添加过滤器
        # console.addFilter(MyLogFilter())
    else:
        existing_console.setFormatter(formatter)
        existing_console.setLevel(console_level)
        # # 只为现有控制台处理器添加过滤器
        # existing_console.addFilter(MyLogFilter())

# 连接mqtt服务器成功回调
def mqtt_on_connect(client, userdata, flags, rc):
    global state_mqtt
    logger = logging.getLogger("mqtt")
    if rc == 0:
        state_mqtt = True
        logger.info("连接 MQTT 成功")
       	# 在这里添加订阅主题的代码
        '''
        try:
            # 订阅单个主题
            client.subscribe("your/topic/name", qos=0)
            # 或者订阅多个主题
            # client.subscribe([("topic1", 0), ("topic2", 1)])
            logger.info("订阅主题成功")
        except Exception as e:
            logger.exception("订阅主题失败: %s", e)
        '''
    else:
        state_mqtt = False
        logger.warning("MQTT 连接返回码 %s", rc)


# 订阅成功回调
def mqtt_on_subscribe(client, userdata, mid, granted_qos):
    # print("订阅成功: qos = %d" % granted_qos)
    print("消息订阅成功!")

def mqtt_on_message(client, userdata, msg):
    logger = logging.getLogger("mqtt")
    try:
        logger.info("收到消息: topic=%s, qos=%s, payload=%s", 
                   msg.topic, msg.qos, msg.payload.decode('utf-8'))
        # 处理接收到的消息,这是业务自己写方法
        #handle_received_message(msg.topic, msg.payload)
    except Exception as e:
        logger.exception("处理消息异常: %s", e)

# 服务器断开回调
def mqtt_on_disconnect(client, userdata, rc):
    global state_mqtt
    state_mqtt = False
    logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)



def connect_timer():
    try:
        if state_mqtt:
            client.publish(heartbeat_topic, payload=json.dumps({
                "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                "name": str(heartbeat_topic.split('_')[-1]),
                "heartbeat_plc": state_opcua
            }), qos=0)  # 发布消息
    except Exception as e:
        print("心跳发送异常。", e)

    timer = threading.Timer(5, connect_timer)  # 设置一个5s 的定时器,循环发送通讯心跳。
    timer.start()  # 启动线程


class SubHandler(object):
    def __init__(self, lock):
        self.lock = lock
        self.logger = logging.getLogger("SubHandler")

    def datachange_notification(self, node, value, event_data):
        try:
            node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
        except Exception:
            node_id = str(node)
        self.logger.warning("节点【%s】, 值【%s】, 时间【%s】", node_id, value, time.strftime('%Y-%m-%d %H:%M:%S'))
        with self.lock:
            out_data[node_id] = value


def opcua_client(server_url=SERVER_URL, node_list=NODE_LIST):
    logger = logging.getLogger("opcua_client")
    handler = SubHandler(out_lock)
    subscription = None
    subscription_handle_list = []
    client = Client(url=server_url)
    global state_opcua
    try:
        while True:
            # 连接
            try:
                logger.info("connecting to %s...", server_url)
                client.connect()
                client.load_type_definitions()
                state_opcua = True
                logger.info("connected")
            except Exception as e:
                logger.exception("connection error, retry in 5s: %s", e)
                time.sleep(5)
                state_opcua = False
                continue

            # 构建订阅节点列表
            try:
                root = client.get_root_node()
                project_main = root.get_child(["0:Objects"])
                nodes_groups = []
                for node in node_list:
                    try:
                        children = project_main.get_child([node]).get_children()
                        nodes_groups.append(children)
                        # 初始化输出字典(可根据需求调整)
                        with out_lock:
                            out_data[str(node.split(':')[-1])] = {}
                    except Exception as e:
                        logger.exception("获取节点 %s 失败: %s", node, e)

                # 扁平化节点列表并订阅
                nodes_to_subscribe = [n for grp in nodes_groups for n in grp]
                if nodes_to_subscribe:
                    subscription = client.create_subscription(200, handler)
                    subscription_handle_list = []
                    for group in nodes_groups:
                        # subscribe_data_change 接受节点列表
                        handle = subscription.subscribe_data_change(group)
                        subscription_handle_list.append(handle)
                    logger.info("订阅句柄: %s", subscription_handle_list)
                else:
                    logger.warning("没有可订阅的节点")
            except Exception as e:
                logger.exception("subscription error: %s", e)
                # 出错时短暂等待并重试连接循环
                time.sleep(1)
                # 清理并继续循环,使客户端重新连接
                try:
                    if subscription:
                        subscription.delete()
                except Exception:
                    pass
                client.disconnect()
                time.sleep(1)
                continue

            # 运行监控 service level
            try:
                while True:
                    try:
                        service_level = client.get_node("ns=0;i=2267").get_value()
                        logger.debug("service level: %s", service_level)
                        if service_level < 200:
                            logger.warning("service level low (%s), reconnecting", service_level)
                            break  # 跳出到外层重连逻辑
                    except Exception as e:
                        state_opcua = False
                        logger.exception("读取 service level 失败: %s", e)
                        break
                    time.sleep(5)
            finally:
                # 取消订阅并断开连接(确保执行)
                try:
                    if subscription and subscription_handle_list:
                        for h in subscription_handle_list:
                            try:
                                subscription.unsubscribe(h)
                            except Exception:
                                logger.exception("unsubscribe handle %s 失败", h)
                        subscription.delete()
                        logger.info("unsubscribed")
                except Exception:
                    logger.exception("删除 subscription 失败")
                try:
                    client.disconnect()
                    logger.info("disconnected")
                except Exception:
                    logger.exception("disconnect error")
                # 清理本次状态,短暂等待后重试
                subscription = None
                subscription_handle_list = []
                time.sleep(5)
    except KeyboardInterrupt:
        logger.info("用户中断,退出")
    finally:
        try:
            client.disconnect()
        except Exception:
            pass





class PublisherWorker(threading.Thread):
    def __init__(self, threadID, name, counter):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.counter = counter
        self.out_data = {}

    def run(self):
        while True:
            if state_mqtt:
                if len(out_data):
                    for _ in list(out_data.keys()):
                        msg_1 = json.dumps({
                            "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                            "name": str(_),
                            "data": out_data[_]
                        })
                        try:
                            client.publish(pub_topic, payload=msg_1, qos=0)  # 发布消息
                            # print("发布", msg_1)
                        except KeyboardInterrupt:
                            print("EXIT")
                            # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
                            client.disconnect()
                        except Exception as e1:
                            print("mqtt推送异常", e1)
            time.sleep(0.5)  # 间隔0.5秒推送


if __name__ == "__main__":
    setup_logging(log_dir='logs', filename='Run_CX.log')

    # 连接的id(key)
    client = mqtt.Client(
        client_id=client_id,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION1  # 或 VERSION2
    )
    # 连接用的用户名密码
    client.username_pw_set(username, password)
    # 回调函数
    client.on_connect = mqtt_on_connect
    # client.on_message = on_message
    # client.on_subscribe = on_subscribe
    client.on_disconnect = mqtt_on_disconnect
    # client.reconnect_delay_set(min_delay=1, max_delay=120)  设置掉线重连次数 默认max_delay=120
    # 开始连接mqtt服务器
    client.connect(host=HOST, port=PORT, keepalive=60)
    client.loop_start()  # 相对于client.loop_forever() ,它不会阻塞进程
    connect_timer()  # 通讯心跳
    # 创建新线程
    publisherWorker = PublisherWorker(1, "publisher_worker-Thread", 1)
    publisherWorker.start()
    opcua_client()


2:写法二,线程管理

# python
import os
import sys
import time
import json
import random
import string
import logging
import threading
from datetime import datetime
from logging.handlers import TimedRotatingFileHandler
import queue

from opcua import Client
import paho.mqtt.client as mqtt

# 配置
HOST = "127.0.0.1"
PORT = 1883
USERNAME = "admin"
PASSWORD = "public"
PUB_TOPIC = "hdx/pub123_fk"
HEARTBEAT_TOPIC = "hdx/heartbeat_QX"

SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]

CLIENT_ID = "".join(random.sample(string.ascii_letters + string.digits, 10))

# 共享状态
out_data = {}  # nodeid -> latest value
out_lock = threading.Lock()
state_opcua = False
state_mqtt = False

stop_event = threading.Event()


class MyLogFilter(logging.Filter):
    def filter(self, record):
        # 只允许特定名称的日志记录器通过
        return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))


def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, filename)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')

    file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.INFO)

    root = logging.getLogger()
    root.setLevel(level)
    # root.setLevel(logging.WARNING)
    # logging.getLogger('mqtt').setLevel(logging.INFO)

    # logging.getLogger('opcua').setLevel(logging.WARNING)
    # logging.getLogger('paho').setLevel(logging.WARNING)
    # logging.getLogger('urllib3').setLevel(logging.WARNING)

    abs_log_path = os.path.abspath(log_path)
    if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
               root.handlers):
        root.addHandler(file_handler)
        # # 只为新添加的文件处理器添加过滤器
        # file_handler.addFilter(MyLogFilter())

    console_level = console_level if console_level is not None else level
    existing_console = None
    for h in root.handlers:
        if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
            existing_console = h
            break

    if existing_console is None:
        console = logging.StreamHandler(sys.stdout)
        console.setFormatter(formatter)
        console.setLevel(console_level)
        root.addHandler(console)
        # # 只为新添加的控制台处理器添加过滤器
        # console.addFilter(MyLogFilter())
    else:
        existing_console.setFormatter(formatter)
        existing_console.setLevel(console_level)
        # # 只为现有控制台处理器添加过滤器
        # existing_console.addFilter(MyLogFilter())


class SubHandler(object):
    def __init__(self, lock):
        self.lock = lock
        self.logger = logging.getLogger("SubHandler")

    def datachange_notification(self, node, value, event_data):
        try:
            node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
        except Exception:
            node_id = str(node)
        self.logger.debug("datachange %s -> %s", node_id, value)
        with self.lock:
            out_data[node_id] = value


def opcua_worker(server_url, node_list, stop_evt):
    logger = logging.getLogger("opcua_worker")
    handler = SubHandler(out_lock)
    client = None
    global state_opcua

    while not stop_evt.is_set():
        try:
            client = Client(url=server_url)
            logger.info("Connecting to OPC UA %s", server_url)
            client.connect()
            client.load_type_definitions()
            state_opcua = True
            logger.info("OPC UA connected")

            # 找到节点并订阅
            try:
                root = client.get_root_node()
                project_main = root.get_child(["0:Objects"])
            except Exception:
                project_main = None
                logger.exception("获取根节点失败")

            nodes_groups = []
            for n in node_list:
                try:
                    if project_main is None:
                        raise RuntimeError("project_main None")
                    children = project_main.get_child([n]).get_children()
                    nodes_groups.append(children)
                    with out_lock:
                        out_data[str(n.split(':')[-1])] = {}  # 初始化子结构(可选)
                except Exception:
                    logger.exception("无法获取节点 %s 的子项", n)

            if not any(nodes_groups):
                logger.warning("没有找到可订阅的节点, 等待后重试")
                client.disconnect()
                state_opcua = False
                time.sleep(5)
                continue

            subscription = client.create_subscription(200, handler)
            handles = []
            try:
                for group in nodes_groups:
                    if group:
                        h = subscription.subscribe_data_change(group)
                        handles.append(h)
                logger.info("Subscribed handles: %s", handles)
            except Exception:
                logger.exception("订阅节点失败")

            # 运行检查 loop
            while not stop_evt.is_set():
                try:
                    service_level = client.get_node("ns=0;i=2267").get_value()
                    logger.debug("service level: %s", service_level)
                    if service_level < 200:
                        logger.warning("Service level low: %s, 将重连", service_level)
                        break
                except Exception:
                    state_opcua = False
                    logger.exception("读取 service level 失败,准备重连")
                    break
                time.sleep(5)

        except Exception:
            logger.exception("OPC UA 主循环异常,短暂等待后重试")
            state_opcua = False
            time.sleep(5)
        finally:
            # 清理订阅和连接
            try:
                if 'subscription' in locals() and subscription:
                    if handles:
                        for h in handles:
                            try:
                                subscription.unsubscribe(h)
                            except Exception:
                                logger.exception("取消订阅句柄 %s 失败", h)
                    try:
                        subscription.delete()
                    except Exception:
                        logger.exception("删除 subscription 失败")
                if client:
                    client.disconnect()
                    logger.info("OPC UA disconnected")
            except Exception:
                logger.exception("断开或清理失败")
            state_opcua = False
            # 小等候以免 tight loop
            time.sleep(1)

    logger.info("opcua_worker 退出")


def mqtt_on_connect(client, userdata, flags, rc):
    global state_mqtt
    logger = logging.getLogger("mqtt")
    if rc == 0:
        state_mqtt = True
        logger.info("连接 MQTT 成功")
        # 在这里添加订阅主题的代码
        '''
        try:
            # 订阅单个主题
            client.subscribe("your/topic/name", qos=0)
            # 或者订阅多个主题
            # client.subscribe([("topic1", 0), ("topic2", 1)])
            logger.info("订阅主题成功")
        except Exception as e:
            logger.exception("订阅主题失败: %s", e)
        '''
    else:
        state_mqtt = False
        logger.warning("MQTT 连接返回码 %s", rc)

def mqtt_on_message(client, userdata, msg):
    logger = logging.getLogger("mqtt")
    try:
        logger.info("收到消息: topic=%s, qos=%s, payload=%s", 
                   msg.topic, msg.qos, msg.payload.decode('utf-8'))
        # 处理接收到的消息,这是业务自己写方法
        #handle_received_message(msg.topic, msg.payload)
    except Exception as e:
        logger.exception("处理消息异常: %s", e)

def mqtt_on_subscribe(client, userdata, mid, granted_qos):
    logger = logging.getLogger("mqtt")
    logger.info("订阅成功: mid=%s, qos=%s", mid, granted_qos)

def mqtt_on_disconnect(client, userdata, rc):
    global state_mqtt
    state_mqtt = False
    logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)


def heartbeat_worker(mqtt_client, stop_evt, interval=5):
    logger = logging.getLogger("heartbeat")
    while not stop_evt.is_set():
        try:
            if state_mqtt:
                payload = json.dumps({
                    "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                    "name": HEARTBEAT_TOPIC.split('_')[-1],
                    "heartbeat_plc": state_opcua
                })
                mqtt_client.publish(HEARTBEAT_TOPIC, payload=payload, qos=0)
                logger.debug("Heartbeat published")
        except Exception:
            logger.exception("心跳发送异常")
        stop_evt.wait(interval)
    logger.info("heartbeat_worker 退出")


def publisher_worker(mqtt_client, stop_evt, poll_interval=0.5):
    logger = logging.getLogger("publisher")
    last_sent = {}
    while not stop_evt.is_set():
        try:
            if state_mqtt:
                with out_lock:
                    # 拷贝并清理可选:这里只对有变更的推送
                    current = dict(out_data)
                for key, val in current.items():
                    if last_sent.get(key) != val:
                        msg = json.dumps({
                            "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                            "name": str(key),
                            "data": val
                        })
                        try:
                            mqtt_client.publish(PUB_TOPIC, payload=msg, qos=0)
                            logger.debug("Published %s -> %s", key, msg)
                            last_sent[key] = val
                        except Exception:
                            logger.exception("MQTT 发布失败")
        except Exception:
            logger.exception("publisher_worker 异常")
        stop_evt.wait(poll_interval)
    logger.info("publisher_worker 退出")


def main():
    setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=logging.DEBUG)
    log = logging.getLogger("main")

    # 创建 MQTT client
    mqtt_client = mqtt.Client(client_id=CLIENT_ID, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
    mqtt_client.username_pw_set(USERNAME, PASSWORD)
    mqtt_client.on_connect = mqtt_on_connect
    
    mqtt_client.on_disconnect = mqtt_on_disconnect
    try:
        mqtt_client.connect(HOST, PORT, keepalive=60)
    except Exception:
        log.exception("MQTT 初次连接失败,仍将启动并自动重试")
    mqtt_client.loop_start()

    # 启动线程
    threads = []

    opcua_thread = threading.Thread(target=opcua_worker, name="OPCUA-Thread", args=(SERVER_URL, NODE_LIST, stop_event),
                                    daemon=True)
    threads.append(opcua_thread)
    opcua_thread.start()

    hb_thread = threading.Thread(target=heartbeat_worker, name="Heartbeat-Thread", args=(mqtt_client, stop_event),
                                 daemon=True)
    threads.append(hb_thread)
    hb_thread.start()

    pub_thread = threading.Thread(target=publisher_worker, name="Publisher-Thread", args=(mqtt_client, stop_event),
                                  daemon=True)
    threads.append(pub_thread)
    pub_thread.start()

    try:
        while True:
            time.sleep(0.5)
    except KeyboardInterrupt:
        log.info("收到中断信号,准备退出")
        stop_event.set()
    finally:
        # 停止 MQTT loop 并断开
        try:
            mqtt_client.loop_stop()
            mqtt_client.disconnect()
        except Exception:
            log.exception("MQTT 清理异常")
        # 等待线程退出
        for t in threads:
            t.join(timeout=3)
        log.info("进程退出完毕")


if __name__ == "__main__":
    main()


写法三,线程池的管理方式

# python
import os
import sys
import time
import json
import random
import string
import logging
import threading
from datetime import datetime
from logging.handlers import TimedRotatingFileHandler
import queue
from concurrent.futures import ThreadPoolExecutor

from opcua import Client
import paho.mqtt.client as mqtt

# 配置
HOST = "127.0.0.1"
PORT = 1883
USERNAME = "admin"
PASSWORD = "public"
PUB_TOPIC = "hdx/pub123_fk"
HEARTBEAT_TOPIC = "hdx/heartbeat_QX"

SERVER_URL = "opc.tcp://HDX-XG:53530/OPCUA/SimulationServer"
NODE_LIST = ["3:Simulation", "3:Simulation1", "3:Simulation2"]

CLIENT_ID = "".join(random.sample(string.ascii_letters + string.digits, 10))

# 共享状态
out_data = {}  # nodeid -> latest value
out_lock = threading.Lock()
state_opcua = False
state_mqtt = False

stop_event = threading.Event()


class MyLogFilter(logging.Filter):
    def filter(self, record):
        # 只允许特定名称的日志记录器通过
        return record.name.startswith(('main', 'opcua_worker', 'heartbeat', 'publisher', 'SubHandler', 'mqtt'))


def setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=None):
    os.makedirs(log_dir, exist_ok=True)
    log_path = os.path.join(log_dir, filename)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(threadName)s - %(message)s')

    file_handler = TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30, encoding='utf-8')
    file_handler.setFormatter(formatter)
    file_handler.setLevel(logging.INFO)

    root = logging.getLogger()
    root.setLevel(level)
    # root.setLevel(logging.WARNING)
    # logging.getLogger('mqtt').setLevel(logging.INFO)

    # logging.getLogger('opcua').setLevel(logging.WARNING)
    # logging.getLogger('paho').setLevel(logging.WARNING)
    # logging.getLogger('urllib3').setLevel(logging.WARNING)

    abs_log_path = os.path.abspath(log_path)
    if not any(isinstance(h, TimedRotatingFileHandler) and getattr(h, "baseFilename", None) == abs_log_path for h in
               root.handlers):
        root.addHandler(file_handler)
        # # 只为新添加的文件处理器添加过滤器
        # file_handler.addFilter(MyLogFilter())

    console_level = console_level if console_level is not None else level
    existing_console = None
    for h in root.handlers:
        if isinstance(h, logging.StreamHandler) and getattr(h, "stream", None) in (sys.stdout, sys.stderr):
            existing_console = h
            break

    if existing_console is None:
        console = logging.StreamHandler(sys.stdout)
        console.setFormatter(formatter)
        console.setLevel(console_level)
        root.addHandler(console)
        # # 只为新添加的控制台处理器添加过滤器
        # console.addFilter(MyLogFilter())
    else:
        existing_console.setFormatter(formatter)
        existing_console.setLevel(console_level)
        # # 只为现有控制台处理器添加过滤器
        # existing_console.addFilter(MyLogFilter())


class SubHandler(object):
    def __init__(self, lock):
        self.lock = lock
        self.logger = logging.getLogger("SubHandler")

    def datachange_notification(self, node, value, event_data):
        try:
            node_id = node.nodeid.to_string() if hasattr(node, "nodeid") else str(node)
        except Exception:
            node_id = str(node)
        self.logger.debug("datachange %s -> %s", node_id, value)
        with self.lock:
            out_data[node_id] = value


def opcua_worker(server_url, node_list, stop_evt):
    logger = logging.getLogger("opcua_worker")
    handler = SubHandler(out_lock)
    client = None
    global state_opcua

    while not stop_evt.is_set():
        try:
            client = Client(url=server_url)
            logger.info("Connecting to OPC UA %s", server_url)
            client.connect()
            client.load_type_definitions()
            state_opcua = True
            logger.info("OPC UA connected")

            # 找到节点并订阅
            try:
                root = client.get_root_node()
                project_main = root.get_child(["0:Objects"])
            except Exception:
                project_main = None
                logger.exception("获取根节点失败")

            nodes_groups = []
            for n in node_list:
                try:
                    if project_main is None:
                        raise RuntimeError("project_main None")
                    children = project_main.get_child([n]).get_children()
                    nodes_groups.append(children)
                    with out_lock:
                        out_data[str(n.split(':')[-1])] = {}  # 初始化子结构(可选)
                except Exception:
                    logger.exception("无法获取节点 %s 的子项", n)

            if not any(nodes_groups):
                logger.warning("没有找到可订阅的节点, 等待后重试")
                client.disconnect()
                state_opcua = False
                time.sleep(5)
                continue

            subscription = client.create_subscription(200, handler)
            handles = []
            try:
                for group in nodes_groups:
                    if group:
                        h = subscription.subscribe_data_change(group)
                        handles.append(h)
                logger.info("Subscribed handles: %s", handles)
            except Exception:
                logger.exception("订阅节点失败")

            # 运行检查 loop
            while not stop_evt.is_set():
                try:
                    service_level = client.get_node("ns=0;i=2267").get_value()
                    logger.debug("service level: %s", service_level)
                    if service_level < 200:
                        logger.warning("Service level low: %s, 将重连", service_level)
                        break
                except Exception:
                    state_opcua = False
                    logger.exception("读取 service level 失败,准备重连")
                    break
                time.sleep(5)

        except Exception:
            logger.exception("OPC UA 主循环异常,短暂等待后重试")
            state_opcua = False
            time.sleep(5)
        finally:
            # 清理订阅和连接
            try:
                if 'subscription' in locals() and subscription:
                    if handles:
                        for h in handles:
                            try:
                                subscription.unsubscribe(h)
                            except Exception:
                                logger.exception("取消订阅句柄 %s 失败", h)
                    try:
                        subscription.delete()
                    except Exception:
                        logger.exception("删除 subscription 失败")
                if client:
                    client.disconnect()
                    logger.info("OPC UA disconnected")
            except Exception:
                logger.exception("断开或清理失败")
            state_opcua = False
            # 小等候以免 tight loop
            time.sleep(1)

    logger.info("opcua_worker 退出")


def mqtt_on_connect(client, userdata, flags, rc):
    global state_mqtt
    logger = logging.getLogger("mqtt")
    if rc == 0:
        state_mqtt = True
        logger.info("连接 MQTT 成功")
        # 在这里添加订阅主题的代码
        try:
            # 订阅单个主题
            client.subscribe("testTopic", qos=0)
            # 或者订阅多个主题
            # client.subscribe([("topic1", 0), ("topic2", 1)])
            logger.info("订阅主题成功")
        except Exception as e:
            logger.exception("订阅主题失败: %s", e)
    else:
        state_mqtt = False
        logger.warning("MQTT 连接返回码 %s", rc)


def mqtt_on_message(client, userdata, msg):
    logger = logging.getLogger("mqtt")
    try:
        logger.info("收到消息: topic=%s, qos=%s, payload=%s",
                    msg.topic, msg.qos, msg.payload.decode('utf-8'))
        # 处理接收到的消息
        # handle_received_message(msg.topic, msg.payload)
    except Exception as e:
        logger.exception("处理消息异常: %s", e)


def mqtt_on_subscribe(client, userdata, mid, granted_qos):
    logger = logging.getLogger("mqtt")
    logger.info("订阅成功: mid=%s, qos=%s", mid, granted_qos)


def mqtt_on_disconnect(client, userdata, rc):
    global state_mqtt
    state_mqtt = False
    logging.getLogger("mqtt").warning("MQTT 断开 rc=%s", rc)


def heartbeat_worker(mqtt_client, stop_evt, interval=5):
    logger = logging.getLogger("heartbeat")
    while not stop_evt.is_set():
        try:
            if state_mqtt:
                payload = json.dumps({
                    "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                    "name": HEARTBEAT_TOPIC.split('_')[-1],
                    "heartbeat_plc": state_opcua
                })
                mqtt_client.publish(HEARTBEAT_TOPIC, payload=payload, qos=0)
                logger.debug("Heartbeat published")
        except Exception:
            logger.exception("心跳发送异常")
        stop_evt.wait(interval)
    logger.info("heartbeat_worker 退出")


def publisher_worker(mqtt_client, stop_evt, poll_interval=0.5):
    logger = logging.getLogger("publisher")
    last_sent = {}
    while not stop_evt.is_set():
        try:
            if state_mqtt:
                with out_lock:
                    # 拷贝并清理可选:这里只对有变更的推送
                    current = dict(out_data)
                for key, val in current.items():
                    if last_sent.get(key) != val:
                        msg = json.dumps({
                            "time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                            "name": str(key),
                            "data": val
                        })
                        try:
                            mqtt_client.publish(PUB_TOPIC, payload=msg, qos=0)
                            logger.debug("Published %s -> %s", key, msg)
                            last_sent[key] = val
                        except Exception:
                            logger.exception("MQTT 发布失败")
        except Exception:
            logger.exception("publisher_worker 异常")
        stop_evt.wait(poll_interval)
    logger.info("publisher_worker 退出")


def main():
    setup_logging(log_dir='logs', filename='Run_CX.log', level=logging.INFO, console_level=logging.DEBUG)
    log = logging.getLogger("main")

    # 创建 MQTT client
    mqtt_client = mqtt.Client(client_id=CLIENT_ID, callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
    mqtt_client.username_pw_set(USERNAME, PASSWORD)
    mqtt_client.on_connect = mqtt_on_connect
    mqtt_client.on_subscribe = mqtt_on_subscribe
    mqtt_client.on_message = mqtt_on_message
    mqtt_client.on_disconnect = mqtt_on_disconnect
    try:
        mqtt_client.connect(HOST, PORT, keepalive=60)
    except Exception:
        log.exception("MQTT 初次连接失败,仍将启动并自动重试")
    mqtt_client.loop_start()

    # 使用线程池管理线程
    with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
        # 提交任务到线程池
        futures = []
        futures.append(executor.submit(opcua_worker, SERVER_URL, NODE_LIST, stop_event))
        futures.append(executor.submit(heartbeat_worker, mqtt_client, stop_event))
        futures.append(executor.submit(publisher_worker, mqtt_client, stop_event))
        try:
            while True:
                time.sleep(0.5)
        except KeyboardInterrupt:
            log.info("收到中断信号,准备退出")
            stop_event.set()
        finally:
            # 停止 MQTT loop 并断开
            try:
                mqtt_client.loop_stop()
                mqtt_client.disconnect()
            except Exception:
                log.exception("MQTT 清理异常")
            log.info("进程退出完毕")


if __name__ == "__main__":
    main()


3:对于opcua的节点的读取操作和注意事项,有一些点里面可能是一个数组,通过博途软件可以查看到如下

在这里插入图片描述

    # node = NodeId(1007, 3)  # 测试节点是否存在
    # client.get_node(node)
    # client.get_node(NodeId(1007, 3))
    # client.get_node("ns=3;s=1008")
    # client.get_node("ns=3;i=1001").get_value()
    
    
    # 下面这个两种写法是一样的,一种是直接获取指定的节点(常用是这样,方便),另一种是先获取根节点再获取子节点是一种遍历一层一层下去
    # project_main = client.get_node("ns=0;s=Objects").get_children()
    # project_main = client.get_root_node().get_child(["0:Objects"]).get_children()