欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

梳理与解析 KubeEdge 主题探讨

最编程 2024-07-27 16:35:24
...

KubeEdge分析-topic整理

前言

在《KubeEdge分析-mapper与deviceTwin交互流程》,已经分析了部分topic以及twin与mapper的交互,但是没有对topic进行一个完整的整理,
这里我们整理下目前所有的topic以及所需的消息格式。

结论

内容比较多,所以先把结论提前

topic 发布者(publish) 订阅者(subscribe) 用途简介 mapper是否必须实现
$hw/events/node/+/membership/updated edgecore mapper 订阅设备列表的变化 建议实现
$hw/events/node/+/membership/get mapper edgecore 查询设备列表 建议实现
$hw/events/node/+/membership/get/result edgecore mapper 获取查询设备列表的结果 建议实现
$hw/events/device/+/updated edgecore mapper 订阅设备属性描述的变化
$hw/events/device/+/twin/update/result edgecore mapper 获取设备属性更新是否成功 建议实现
$hw/events/device/+/twin/update/delta edgecore mapper 获取设备属性更新的值
$hw/events/device/+/twin/update/document edgecore mapper 获取设备属性更新的操作记录
$hw/events/device/+/twin/get/result edgecore mapper、apps 返回获取设备属性的值
$hw/events/device/+/twin/update mapper edgecore 通知设备属性的值更新
$hw/events/device/+/twin/get mapper,apps edgecore 获取设备属性的值
$hw/events/device/+/state/update mapper edgecore 通知设备状态更新 建议实现
$hw/events/device/+/state/update/result edgecore mapper 获取设备状态更新结果
$ke/events/device/+/data/update mapper apps 获取设备属性的时序数据
$hw/events/upload/# 暂无 edgecore 转发云端
SYS/dis/upload_records 暂无 edgecore 转发云端

topics

之前的文档中就提过,在官方文档中的message_topics的topic列表和eventbus文档中的是不一样的

  • message_topics文档
1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
6. "$ke/events/+/device/data/update"
  • eventbus文档
- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+

下面我们就根据代码来确认一下

eventbus代码中的topic

在edgecore中,只有eventbus会和mqtt broker进行交互,所以,eventbus中代码中的topic应该是最准确的。

见edge\pkg\eventbus\mqtt\client.go

const UploadTopic = "SYS/dis/upload_records"

var (
    // MQTTHub client
    MQTTHub *Client
    // GroupID stands for group id
    GroupID string
    // ConnectedTopic to send connect event
    ConnectedTopic = "$hw/events/connected/%s"
    // DisconnectedTopic to send disconnect event
    DisconnectedTopic = "$hw/events/disconnected/%s"
    // MemberGet to get membership device
    MemberGet = "$hw/events/edgeGroup/%s/membership/get"
    // MemberGetRes to get membership device
    MemberGetRes = "$hw/events/edgeGroup/%s/membership/get/result"
    // MemberDetail which edge-client should be pub when service start
    MemberDetail = "$hw/events/edgeGroup/%s/membership/detail"
    // MemberDetailRes MemberDetail topic resp
    MemberDetailRes = "$hw/events/edgeGroup/%s/membership/detail/result"
    // MemberUpdate updating of the twin
    MemberUpdate = "$hw/events/edgeGroup/%s/membership/updated"
    // GroupUpdate updates a edgegroup
    GroupUpdate = "$hw/events/edgeGroup/%s/updated"
    // GroupAuthGet get temperary aksk from cloudhub
    GroupAuthGet = "$hw/events/edgeGroup/%s/authInfo/get"
    // GroupAuthGetRes temperary aksk from cloudhub
    GroupAuthGetRes = "$hw/events/edgeGroup/%s/authInfo/get/result"
    // SubTopics which edge-client should be sub
    SubTopics = []string{
        "$hw/events/upload/#",
        "$hw/events/device/+/state/update",
        "$hw/events/device/+/twin/+",
        "$hw/events/node/+/membership/get",
        UploadTopic,
    }
)

devicetwin中的topic


    // LifeCycleConnectETPrefix the topic prefix for connected event
    LifeCycleConnectETPrefix = "$hw/events/connected/"
    // LifeCycleDisconnectETPrefix the topic prefix for disconnected event
    LifeCycleDisconnectETPrefix = "$hw/events/disconnected/"

    // MemETPrefix the topic prefix for membership event
    MemETPrefix = "$hw/events/node/"
    // MemETUpdateSuffix the topic suffix for membership updated event
    MemETUpdateSuffix = "/membership/updated"
    // MemETDetailSuffix the topic suffix for membership detail
    MemETDetailSuffix = "/membership/detail"
    // MemETDetailResultSuffix the topic suffix for membership detail event
    MemETDetailResultSuffix = "/membership/detail/result"
    // MemETGetSuffix the topic suffix for membership get
    MemETGetSuffix = "/membership/get"
    // MemETGetResultSuffix the topic suffix for membership get event
    MemETGetResultSuffix = "/membership/get/result"

    // DeviceETPrefix the topic prefix for device event
    DeviceETPrefix = "$hw/events/device/"
    // TwinETUpdateSuffix the topic suffix for twin update event
    TwinETUpdateSuffix = "/twin/update"
    // TwinETUpdateResultSuffix the topic suffix for twin update result event
    TwinETUpdateResultSuffix = "/twin/update/result"
    // TwinETGetSuffix the topic suffix for twin get
    TwinETGetSuffix = "/twin/get"
    // TwinETGetResultSuffix the topic suffix for twin get event
    TwinETGetResultSuffix = "/twin/get/result"
    // TwinETCloudSyncSuffix the topic suffix for twin sync event
    TwinETCloudSyncSuffix = "/twin/cloud_updated"
    // TwinETEdgeSyncSuffix the topic suffix for twin sync event
    TwinETEdgeSyncSuffix = "/twin/edge_updated"
    // TwinETDeltaSuffix the topic suffix for twin delta event
    TwinETDeltaSuffix = "/twin/update/delta"
    // TwinETDocumentSuffix the topic suffix for twin document event
    TwinETDocumentSuffix = "/twin/update/document"

    // DeviceETUpdatedSuffix the topic suffix for device updated event
    DeviceETUpdatedSuffix = "/updated"
    // DeviceETStateUpdateSuffix the topic suffix for device state update event
    DeviceETStateUpdateSuffix = "/state/update"
    // DeviceETStateGetSuffix the topipc suffix for device state get event
    DeviceETStateGetSuffix = "/state/get"

本来觉得是eventbus中定义了所有topic的,但是发现devicetwin中又定义了一些,这里有些应该是和eventbus中重复的。

分析完源码回过头来看,eventbus是支持其他module告诉他要处理些什么topic的,所以这里是有可能device twinm module告诉eventbus要订阅、发布哪些topic。

eventbus订阅

eventbus代码中定义了很多topic,但是这些topic目前有没有用到,需要一一确认。

InitInternalTopics(edge\pkg\eventbus\mqtt\server.go)中会对SubTopics中topic都进行订阅

// InitInternalTopics sets internal topics to server by default.
func (m *Server) InitInternalTopics() {
    for _, v := range SubTopics {
        m.tree.Set(v, packet.Subscription{Topic: v, QOS: packet.QOS(m.qos)})
        klog.Infof("Subscribe internal topic to %s", v)
    }
}

eventbus处理

// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
    klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
    // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
    // for other, send to hub
    // for topic, no need to base64 topic
    var target string
    resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
    if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
        target = modules.TwinGroup
    } else {
        target = modules.HubGroup
        if message.Topic() == UploadTopic {
            resource = UploadTopic
        }
    }
    // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
    msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
        resource, "response").FillBody(string(message.Payload()))
    klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
    beehiveContext.SendToGroup(target, *msg)
}

eventbus内部mqtt

eventbus内部是有个mqtt server的,也就是说eventbus可以不依赖外部的mqttbroker来执行。

通过设置mqttMode为0来指定使用内部的mqttbroker,默认是2.

因此,eventbus中的edge\pkg\eventbus\mqtt\server.go,是不用看的,不要被相关代码干扰。

module name与group

由于整个beehive框架都是依赖module name 和group来分发消息的,而edgecore中涉及的module又特别多,所以有必要梳理一下:

模块 module name module group
devicetwin DeviceTwinModuleName TwinGroup
edged EdgedModuleName EdgedGroup
edgehub ModuleNameEdgeHub HubGroup
edgestream ModuleNameEdgeHub GroupNameEdgeStream
eventbus "eventbus" BusGroup
metamanager MetaManagerModuleName MetaGroup
servicebus "servicebus" BusGroup
stub "stubCloudHub" MetaGroup

这里看,其实还是挺乱的,有些直接用来字符安,有的定义了常量

订阅的topic梳理

SYS/dis/upload_records

从message处理方法看,对这个topic的消息,收到后的处理是把resource设置为“SYS/dis/upload_records”,targe设置为“HubGroup”,然后通过beehive送到对应的模块处理.

HubGroup的消息会被beehive发送给edgehub(edge\pkg\edgehub\edgehub.go)

        go eh.routeToEdge()
        go eh.routeToCloud()
        go eh.keepalive()

edgehub主要就启了这3个goroutine,

  • routeToEdge, 这个是从websocket中取消息,然后根据消息的group通过beehive,分发到对应的模块中取处理
  • routeToCloud,这个是从beehive中取EdgeHub的消息(根据ModuleNameEdgeHub),去掉的消息,通过sendToCloud发送到云端(这里每次发送回起一个KeepChannel,等待云端的返回,超时后,会删掉这个通道)
  • keepalive就是定时往云端发心跳了。

SYS/dis/upload_records小结

综上,可以看出,SYS/dis/upload_records收到的消息,会通过edgehub送到云上。
但是目前的代码中,是没有哪个组件会发送这个topic的,云端也没有对应的处理流程(待定)。

$hw/events/upload/#

从eventbus的OnSubMessageReceived中可以看出,除了“hw/events/device”以及“hw/events/node”的开头的topic,其他topic都是丢给edgehub处理的。
和之前的SYS/dis/upload_records相比,区别在于source的设置,两者的resource是不一样的。resource会影响最终的消息路由,但是在edge节点的处理上,是没啥区别的,这个resource影响最终云端收到消息后的处理(后续再仔细分析)

$hw/events/upload/#小结

综上,$hw/events/upload/#也是直接把消息送到云端。

$hw/events/device/+/state/update

从之前的分析中可知,这个topic就是交给device twin模块来处理的,

func initEventActionMap() {
    EventActionMap = make(map[string]map[string]string)
    EventActionMap[dtcommon.MemETPrefix] = make(map[string]string)
    EventActionMap[dtcommon.DeviceETPrefix] = make(map[string]string)
    EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETDetailResultSuffix] = dtcommon.MemDetailResult
    EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETUpdateSuffix] = dtcommon.MemUpdated
    EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETGetSuffix] = dtcommon.MemGet
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETStateGetSuffix] = dtcommon.DeviceStateGet
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETUpdatedSuffix] = dtcommon.DeviceUpdated
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETStateUpdateSuffix] = dtcommon.DeviceStateUpdate
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETUpdateSuffix] = dtcommon.TwinUpdate
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETCloudSyncSuffix] = dtcommon.TwinCloudSync
    EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETGetSuffix] = dtcommon.TwinGet
}

dt的actionmap中,定义了相关操作是DeviceStateUpdate,这个又对应了dealDeviceStateUpdate操作。
dealDeviceStateUpdate会同时更新cloud和edge,这里也先待定,暂时不展开分析

$hw/events/device/+/twin/+

这里有个坑,topic写的是twin/+,从edge\pkg\devicetwin\dtcommon\common.go可以看出,这个twin/+能匹配的消息包括:

  • /twin/update
  • /twin/get

和上面类似,这个topic分别对应的是EventActionMap中的DeviceETUpdatedSuffix和TwinETGetSuffix,也就是分别对应了dealTwinUpdate和dealTwinGet两个方法

$hw/events/device/+/twin/get

dealTwinGet --> DealGetTwin -->BuildDeviceTwinResult

//BuildDeviceTwinResult build device twin result, 0:get,1:update,2:sync
func BuildDeviceTwinResult(baseMessage BaseMessage, twins map[string]*MsgTwin, dealType int) ([]byte, error) {
    result := make(map[string]*MsgTwin)
    if dealType == 0 {
        for k, v := range twins {
            if v == nil {
                result[k] = nil
                continue
            }
            if v.Metadata != nil && strings.Compare(v.Metadata.Type, "deleted") == 0 {
                continue
            }
            twin := *v

            twin.ActualVersion = nil
            twin.ExpectedVersion = nil
            result[k] = &twin
        }
    } else {
        result = twins
    }

    payload, err := json.Marshal(DeviceTwinResult{BaseMessage: baseMessage, Twin: result})
    if err != nil {
        return []byte(""), err
    }
    return payload, nil
}

这里这里的dealType传入的是0,所以这里是从数据库中查出数据里,放到twin中,放的时候,ActualVersion和ExpectedVersion都会被设置为nil.

不过我觉得mapper是没有必要去发送这个get请求的,这里应该是为其他第三方的应用准备的,当第三方应用需要数据时,就发一个twin get消息给edgecore。

$hw/events/device/+/twin/update

dealTwinUpdate-->Updated-->DealDeviceTwin-->DealMsgTwin-->dealUpdateResult
-->dealDocument
-->dealDelta
-->dealSyncResult

DealDeviceTwin是个很长的方法,之前《KubeEdge分析-mapper与deviceTwin交互流程》 中有详细分析过,这里就不详细分析了,简要概况就是要更新edge的数据库,然后发送到云端

$hw/events/node/+/membership/get

这个topic根据之前的分析,是交给twin模块来处理的,最终处理的方法是
dealMembershipGet --> dealMembershipGetInner --> dealMembershipGetInner-->BuildMembershipGetResult

func BuildMembershipGetResult(baseMessage BaseMessage, devices []*Device) ([]byte, error) {
   result := make([]Device, 0, len(devices))
   for _, v := range devices {
       result = append(result, Device{
           ID:          v.ID,
           Name:        v.Name,
           Description: v.Description,
           State:       v.State,
           LastOnline:  v.LastOnline,
           Attributes:  v.Attributes})
   }
   payload, err := json.Marshal(MembershipGetResult{BaseMessage: baseMessage, Devices: result})
   if err != nil {
       return []byte(""), err
   }
   return payload, nil
}

这里就是返回节点上的device列表

$hw/events/connected/%s

代码中只有Twin模块有对相关topic的处理,在event模块中,并没找到订阅相关的消息。
在twin中的处理流程如下:
runDeviceTwin-->distributeMsg-->classifyMsg-->dtcommon.LifeCycle-->dealLifeCycle

$hw/events/connected/%s小结

从名称上看,这个topic是用来mapper发现了新增设备,上报给edgecore的,但是从实现上看,并没有对这个topic进行处理。

$hw/events/disconnected/%s

和上面的connected topic类似,目前应该也没有监听

其他

    // MemberGet to get membership device
    MemberGet = "$hw/events/edgeGroup/%s/membership/get"
    // MemberGetRes to get membership device
    MemberGetRes = "$hw/events/edgeGroup/%s/membership/get/result"
    // MemberDetail which edge-client should be pub when service start
    MemberDetail = "$hw/events/edgeGroup/%s/membership/detail"
    // MemberDetailRes MemberDetail topic resp
    MemberDetailRes = "$hw/events/edgeGroup/%s/membership/detail/result"
    // MemberUpdate updating of the twin
    MemberUpdate = "$hw/events/edgeGroup/%s/membership/updated"
    // GroupUpdate updates a edgegroup
    GroupUpdate = "$hw/events/edgeGroup/%s/updated"
    // GroupAuthGet get temperary aksk from cloudhub
    GroupAuthGet = "$hw/events/edgeGroup/%s/authInfo/get"
    // GroupAuthGetRes temperary aksk from cloudhub
    GroupAuthGetRes = "$hw/events/edgeGroup/%s/authInfo/get/result"

这堆topic目前看也都是没有监听的,所以edgecore应该是不会进行处理的

发布的topic梳理

之前分析的都是edgecore监听的topic,现在看看edgecore会publish哪些topic

pubCloudMsgToEdge-->publish-->pubMQTT

func (eb *eventbus) pubCloudMsgToEdge() {
    for {
        select {
        case <-beehiveContext.Done():
            klog.Warning("EventBus PubCloudMsg To Edge stop")
            return
        default:
        }
        accessInfo, err := beehiveContext.Receive(eb.Name())
        if err != nil {
            klog.Errorf("Fail to get a message from channel: %v", err)
            continue
        }
        operation := accessInfo.GetOperation()
        resource := accessInfo.GetResource()
        switch operation {
        case "subscribe":
            eb.subscribe(resource)
            klog.Infof("Edge-hub-cli subscribe topic to %s", resource)
        case "message":
            body, ok := accessInfo.GetContent().(map[string]interface{})
            if !ok {
                klog.Errorf("Message is not map type")
                return
            }
            message := body["message"].(map[string]interface{})
            topic := message["topic"].(string)
            payload, _ := json.Marshal(&message)
            eb.publish(topic, payload)
        case "publish":
            topic := resource
            var ok bool
            // cloud and edge will send different type of content, need to check
            payload, ok := accessInfo.GetContent().([]byte)
            if !ok {
                content := accessInfo.GetContent().(string)
                payload = []byte(content)
            }
            eb.publish(topic, payload)
        case "get_result":
            if resource != "auth_info" {
                klog.Info("Skip none auth_info get_result message")
                return
            }
            topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName)
            payload, _ := json.Marshal(accessInfo.GetContent())
            eb.publish(topic, payload)
        default:
            klog.Warningf("Action not found")
        }
    }
}

从beehive中取出消息,然后消息中的Operation来执行相关动作。Operation是定义在Router中的,设置operation有两个方法,一个是BuildMsg,另一个是BuildModelMessage。
BuildMsg调用的地方比较多,但都是metamanager调用的,metamanager主要是处理K8S的一些原生对象的,所以这里和eventbus无关。

./edge/pkg/eventbus

//constant defining node connection types
const (
    ResourceTypeNodeConnection = "node/connection"
    OperationNodeConnection    = "publish"
    SourceNodeConnection       = "edgehub"
)

//BuildMsg returns message object with router and content details
func BuildMsg(group, parentID, sourceName, resource, operation string, content interface{}) *model.Message {
    msg := model.NewMessage(parentID).BuildRouter(sourceName, group, resource, operation).FillBody(content)
    return msg
}

//BuildModelMessage build mode messages
func (dtc *DTContext) BuildModelMessage(group string, parentID string, resource string, operation string, content interface{}) *model.Message {
    msg := model.NewMessage(parentID)
    msg.BuildRouter(modules.TwinGroup, group, resource, operation)
    msg.Content = content
    return msg
}

BuildModelMessage,有被发往云上的,也有发到busgroup的,这里就只看发到busgroup的。
目前看到的代码中的发往bus group的operation,都是"publish"类型的,所以目前会走到以下代码中

        case "publish":
            topic := resource
            var ok bool
            // cloud and edge will send different type of content, need to check
            payload, ok := accessInfo.GetContent().([]byte)
            if !ok {
                content := accessInfo.GetContent().(string)
                payload = []byte(content)
            }
            eb.publish(topic, payload)

这里梳理的所有通过BuildModelMessage

发布的topic列表

Membership update

topic := dtcommon.MemETPrefix + context.NodeName + dtcommon.MemETUpdateSuffix
$hw/events/node/+/membership/updated

当有设备添加或者删除的时候,会触发这个topic,mapper可以通过订阅这个topic来了解需要采集的设备的情况

Membership get result

topic := dtcommon.MemETPrefix + context.NodeName + dtcommon.MemETGetResultSuffix
$hw/events/node/+/membership/get/result
这个topic是响应membership get的请求,也就是mapper向edgecore查询有哪些设备

device updated

topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.DeviceETUpdatedSuffix
"$hw/events/device/+/updated"
这个topic是用来更新device model中的属性描述的(不是属性的值),也就是添加了一个新的属性或者更新一个属性的描述。
测试了更新设备model,发现并不能触发device updated的逻辑,从源码中看,目前只有设备的方法中有相关内容,但是设备首次添加,又不会触发这个update,所以不知道什么场景下才能触发了

twin update result

topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETUpdateResultSuffix
"$hw/events/device/+/twin/update/result"
这个topic是用来响应twin/update的,目前看,代码中用到的地方都是update失败的情况,也就是mapper向edgecore发送更新,twin/update/result返回更新结果

twin delta

topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETDeltaSuffix
"$hw/events/device/+/twin/update/delta"
这个topic是云端向mapper来发起更新property的值用的,比如云端想设置设备的某个属性值为1

twin Document

topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETDocumentSuffix
"$hw/events/device/+/twin/update/document"
这个topic是更新twin的时候附带的,document没有仔细研究,猜测这里放的是类似日志的东西,也就是上一个值是啥,这次更新为啥。

get twin result

topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETGetResultSuffix
"$hw/events/device/+/twin/get/result"
这个topic是相应twin/get请求的,也就是mapper或者其他第三方应用可以通过twin/get topic来向edgecore查询device property的值,然后edgecore通过/twin/get/result把结果返回。

topic的内容
  • $hw/events/device/sensor-tag-instance-12/twin/get
{
    "event_id": "",
    "timestamp": 1598527389
}
  • $hw/events/node/edgenode1/membership/get/result
{
    "event_id": "",
    "timestamp": 1598527389979,
    "devices": [{
        "id": "sensor-tag-instance-10",
        "name": "sensor-tag-instance-10",
        "description": "TISimplelinkSensorTag"
    }, {
        "id": "sensor-tag-instance-11",
        "name": "sensor-tag-instance-11",
        "description": "TISimplelinkSensorTag"
    }, {
        "id": "camera",
        "name": "camera",
        "description": "camera"
    } ]
}
  • $hw/events/device/sensor-tag-instance-12/twin/update
{
    "event_id": "",
    "timestamp": 1598515893,
    "twin": {
        "temperature": {
            "actual": {
                "value": "111",
                "metadata": {
                    "timestamp": 1598515893
                }
            },
            "metadata": {
                "type": "int"
            }
        }
    }
}
  • $hw/events/device/sensor-tag-instance-12/twin/update/delta
{
    "event_id": "ccb9dbeb-f5c0-4f2b-9cb7-f023b688f586",
    "timestamp": 1598515893657,
    "twin": {
        "temperature": {
            "expected": {
                "value": "2",
                "metadata": {
                    "timestamp": 1598513321701
                }
            },
            "actual": {
                "value": "111",
                "metadata": {
                    "timestamp": 1598515893648
                }
            },
            "optional": false,
            "metadata": {
                "type": "int"
            }
        }
    },
    "delta": {
        "temperature": "2"
    }
}
  • $hw/events/device/sensor-tag-instance-12/twin/update/document
{
    "event_id": "",
    "timestamp": 1598515893647,
    "twin": {
        "temperature": {
            "last": {
                "expected": {
                    "value": "2",
                    "metadata": {
                        "timestamp": 1598513321701
                    }
                },
                "optional": false,
                "metadata": {
                    "type": "int"
                }
            },
            "current": {
                "expected": {
                    "value": "2",
                    "metadata": {
                        "timestamp": 1598513321701
                    }
                },
                "actual": {
                    "value": "111",
                    "metadata": {
                        "timestamp": 1598515893648
                    }
                },
                "optional": false,
                "metadata": {
                    "type": "int"
                }
            }
        }
    }
}
  • $hw/events/node/edgenode1/membership/updated
{
    "event_id": "16ebbe71-5b04-4e17-ae37-80091bafaffc",
    "timestamp": 1598513321706,
    "added_devices": [{
        "id": "sensor-tag-instance-12",
        "name": "sensor-tag-instance-12",
        "description": "TISimplelinkSensorTag",
        "twin": {
            "temperature": {
                "expected": {
                    "value": "2",
                    "metadata": {
                        "timestamp": 1598513321688
                    }
                },
                "optional": false,
                "metadata": {
                    "type": "int"
                }
            }
        }
    }],
    "removed_devices": null
}
  • $hw/events/node/edgenode1/membership/get
{
    "event_id": "",
    "timestamp": 1598527389
}
  • $hw/events/device/sensor-tag-instance-12/twin/get/result
{
    "event_id": "",
    "timestamp": 1598527389979,
    "twin": {
        "temperature": {
            "expected": {
                "value": "2",
                "metadata": {
                    "timestamp": 1598513321701
                }
            },
            "actual": {
                "value": "111",
                "metadata": {
                    "timestamp": 1598515893648
                }
            },
            "optional": false,
            "metadata": {
                "type": "int"
            }
        }
    }
}
  • $hw/events/device/sensor-tag-instance-12/state/update
{
    "event_id": "",
    "timestamp": 1598527716,
    "state": "online"
}
  • $hw/events/device/sensor-tag-instance-12/state/update/result
{
    "event_id": "c85f7e9c-4251-4386-99ce-869af0051af7",
    "timestamp": 1598527716708,
    "device": {
        "name": "sensor-tag-instance-12",
        "state": "online",
        "last_online": "2020-08-27 07:28:36"
    }
}
  • $ke/events/device/sensor-tag-instance-12/data/update
{
    "event_id": "123e4567-e89b-12d3-a456-426655440000",
    "timestamp": 1597213444,
    "data": {
        "propertyName1": {
            "value": "123",
            "metadata": {
                "timestamp": 1597213444, //+optional
                "type": "int"
            }
        },
        "propertyName2": {
            "value": "456",
            "metadata": {
                "timestamp": 1597213444,
                "type": "int"
            }
        }
    }
}
  • $hw/events/upload/

目前看,这个topic看上去对内容没有限制,传什么都可以,随便测试了些数据,从edgecore能看到以下日志:

Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441902   23779 client.go:94] OnSubMessageReceived receive msg from topic: $hw/events/upload/
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441950   23779 client.go:111] received msg from mqttserver, deliver to hub with resource JGh3L2V2ZW50cy91cGxvYWQv
  • SYS/dis/upload_records

目前看,这个topic看上去对内容也没有限制,传什么都可以,随便测试了些数据,从edgecore能看到以下日志:

Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441963   23779 client.go:94] OnSubMessageReceived receive msg from topic: SYS/dis/upload_records
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441972   23779 client.go:111] received msg from mqttserver, deliver to hub with resource SYS/dis/upload_records

推荐阅读