opcua协议介绍
opc ua
是一種應用層協議,基于tcp之上,其url通常為opc.tcp://127.0.0.1:4840/abc,在opc ua中常被稱為endpoint
兩種模式
opc ua支持c/s模式,同時也支持類似mqtt的發布訂閱模式,通常各種設備作為opc ua的服務端提供各種服務。
信息模型
opc ua采用面向對象的設計思路, 使用了對象(objects)作為過程系統表示數據和活動的基礎。對象包含了變量,事件和方法,它們通過引用(reference)來互相連接。
OPC UA 信息模型是節點的網絡(Network of Node,),或者稱為結構化圖(graph),由節點(node)和引用(References)組成,這種結構圖稱之為OPC UA 的地址空間。這種圖形結構可以描述各種各樣的結構化信息(對象)。
注意⚠️:opc ua中所說的節點是在一個opc ua服務器中,不要理解為一個服務器對應一個node
節點
opc ua定義了8種類型的節點
對象(Object)
對象類型(Object Type)
變量(Variable)
變量類型(Variable Type)
方法(Method)
視圖(View)
引用(Reference)
數據類型(Data Type)
每種節點都包含一些公共屬性,如下:
| 屬性 | 數據類型 | 說明 | 
|---|---|---|
| NodeId | NodeId | 在OPC UA服務器內唯一確定的一個節點,并且在OPC UA服務器中定位該節點 | 
| NodeClass | Int32 | 該節點的類型(上面列出的8種之一) | 
| BrowseName | QualifiedName | 瀏覽OPC UA服務器事定義的節點。它是非本地化的 | 
| DisplayName | LocalizedText | 包含節點的名字,用來在用戶接口中顯示名字,本地化 | 
| Description | LocalizedText | 本地化的描述(可選) | 
| WriteMask | Uint32 | 指示哪個節點屬性是可寫的,即可被OPC UA客戶端修改(可選) | 
| UserWriteMask | Uint32 | 指示哪個節點屬性可以被當前連接到服務器上的用戶修改(可選) | 
除了數據類型節點之外,其他各個節點都有額外的專屬屬性
引用
引用描述了兩個節點之間的關系,用來連接多個節點。OPC UA預定義了多種引用,常見的引用有:
hasTypeDefinition
描述對象、變量和類型之間的關系
ObjectNode的hasTypeDefinition引用,指向了一個ObjectTypeNode,表示該ObjectNode的類型;
VariableNode的hasTypeDefinition引用,指向一個VariableTypeNode,表示該 VariableNode的類型。
hasSubType
描述對象的擠成關系,當子類從父類繼承后,子類擁有一個hasSubType引用指向父類。
hasComponents
描述一種組合關系
ObjectNode一般都由多個VariableNode組成,ObjectNode包含某個VariableNode時,ObjectNode擁有一個hasComponents引用,指向該VariableNode;
VariableNode也可以包含子VariableNode,此時也用hasComponents描述它們的關系。
Organizes
指明兩個節點的層次結構,通過organizes可以把多個節點組織到同一個父節點下。
完整引用如下
服務
服務可以看成是OPC UA服務器提供的API集合,OPC UA與定義了37個標準服務,常用的服務有:
讀寫服務
可以獲取和修改服務器指定節點指定屬性的值
調用服務
執行服務器上指定節點的方法
訂閱數據變化和訂閱事件
可以監控服務器數據的變化
opc ua編程
Sdk
python(支持客戶端和服務端)
https://github.com/FreeOpcUa/python-opcua
golang(支持客戶端,服務端尚不完善)
https://github.com/gopcua/opcua
客戶端
opcua-client-gui
使用python(pyqt5)開發使用pip可以安裝,跨平臺
sudo pip3 install pyqt5 -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install numpy -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install pyqtgraph -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install cryptography -i https://pypi.mirrors.ustc.edu.cn/simple/
sudo pip3 install opcua-client -i https://pypi.mirrors.ustc.edu.cn/simple/
模擬設備
可利用sdk自己開發 見下面的python demo
golang Demo
讀取服務器數據
package main
import (
	"context"
	"log"
	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)
func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"
	ctx := context.Background()
	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()
	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}
	req := &ua.ReadRequest{
		MaxAge:             2000,
		NodesToRead:        []*ua.ReadValueID{{NodeID: id}},
		TimestampsToReturn: ua.TimestampsToReturnBoth,
	}
	resp, err := c.Read(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	if resp.Results[0].Status != ua.StatusOK {
		log.Fatalf("Status not OK: %v", resp.Results[0].Status)
	}
	log.Printf("%#v", resp.Results[0].Value.Value())
}
向服務器寫數據
package main
import (
	"context"
	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
	"log"
)
func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"
	ctx := context.Background()
	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()
	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}
	v, err := ua.NewVariant(10.0)
	if err != nil {
		log.Fatalf("invalid value: %v", err)
	}
	req := &ua.WriteRequest{
		NodesToWrite: []*ua.WriteValue{
			{
				NodeID:      id,
				AttributeID: ua.AttributeIDValue,
				Value: &ua.DataValue{
					EncodingMask: ua.DataValueValue,
					Value:        v,
				},
			},
		},
	}
	resp, err := c.Write(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	log.Printf("%v", resp.Results[0])
}
監聽服務器數據變化
package main
import (
	"context"
	"github.com/gopcua/opcua/monitor"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"
	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)
func cleanup(sub *monitor.Subscription, wg *sync.WaitGroup) {
	log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped())
	sub.Unsubscribe()
	wg.Done()
}
func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
	sub, err := m.Subscribe(
		ctx,
		&opcua.SubscriptionParameters{
			Interval: interval,
		},
		func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
			if msg.Error != nil {
				log.Printf("[callback] error=%s", msg.Error)
			} else {
				log.Printf("[callback] node=%s value=%v", msg.NodeID, msg.Value.Value())
			}
			time.Sleep(lag)
		},
		nodes...)
	if err != nil {
		log.Fatal(err)
	}
	defer cleanup(sub, wg)
	<-ctx.Done()
}
func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func() {
		<-signalCh
		println()
		cancel()
	}()
	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()
	m, err := monitor.NewNodeMonitor(c)
	if err != nil {
		log.Fatal(err)
	}
	m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
		log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error())
	})
	wg := &sync.WaitGroup{}
	// start callback-based subscription
	wg.Add(1)
	go startCallbackSub(ctx, m, time.Second, 0, wg, nodeID)
	<-ctx.Done()
	wg.Wait()
}
python opcua server demo
#!/usr/bin/env python3
from threading import Thread
import random
import time
from opcua import ua, uamethod, Server
@uamethod
def set_temperature(parent, variant):
    print(f"set_temperature {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)
@uamethod
def set_onoff(parent, variant):
    print(f"set_onoff {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)
# 這個類用于后臺定時隨機修改值
class Temperature(Thread):
    def __init__(self, temperature, onoff):
        Thread.__init__(self)
        self._stop = False
        self.temperature = temperature
        self.onoff = onoff
    def stop(self):
        self._stop = True
    def run(self):
        count = 1
        while not self._stop:
            value = random.randint(-20, 100)
            self.temperature.set_value(value)
            print(f"random set temperature {value}")
            value = bool(random.randint(0, 1))
            self.onoff.set_value(value)
            print(f"random set onoff {value}")
            led_event.event.Message = ua.LocalizedText("high_temperature %d" % count)
            led_event.event.Severity = count
            #led_event.event.temperature = random.randint(60, 100)
            led_event.event.onoff = bool(random.randint(0, 1))
            led_event.trigger()
            count += 1
            time.sleep(10)
if __name__ == "__main__":
    # now setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:40840/tuyaopcua/server/")
    server.set_server_name("TuyaOpcUa Driver Demo Device")
    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
        ua.SecurityPolicyType.Basic128Rsa15_Sign,
        ua.SecurityPolicyType.Basic256_SignAndEncrypt,
        ua.SecurityPolicyType.Basic256_Sign])
    # setup our own namespace
    uri = "http://tuya.com"
    idx = server.register_namespace(uri)
    # 添加一個 `空調` 對象
    air_conditioner = server.nodes.objects.add_object(idx, "AirConditioner")
    temperature = air_conditioner.add_variable(idx, "temperature", 20)
    temperature.set_writable()
    onoff = air_conditioner.add_variable(idx, "onoff", True)
    onoff.set_writable()
    air_conditioner.add_method(idx, "set_temperature", set_temperature, [ua.VariantType.UInt32])
    air_conditioner.add_method(idx, "set_onoff", set_onoff, [ua.VariantType.Boolean])
    # creating a default event object, the event object automatically will have members for all events properties
    led_event_type = server.create_custom_event_type(idx,
                                                     'high_temperature',
                                                     ua.ObjectIds.BaseEventType,
                                                     [('temperature', ua.VariantType.UInt32), ('onoff', ua.VariantType.Boolean)])
    led_event = server.get_event_generator(led_event_type, air_conditioner)
    led_event.event.Severity = 300
    # start opcua server
    server.start()
    print("Start opcua server...")
    temperature_thread = Temperature(temperature, onoff)
    temperature_thread.start()
    try:
        led_event.trigger(message="This is BaseEvent")
        while True:
            time.sleep(5)
    finally:
        print("Exit opcua server...")
        temperature_thread.stop()
        server.stop()
總結
 
                            
                        - 上一篇: SAP 电商云 Spartacus Sc
- 下一篇: 180767676是哪里的dns
