Streamr助你掌控自己的数据(2)——三种整合数据至Streamr的典型场景
博客說明
所有刊發內容均可轉載但是需要注明出處。
三種整合數據至Streamr的典型場景
本系列文檔主要介紹怎么通過Streamr管理自己的DATA,整個系列包括三篇教程文檔,分別是:教你5分鐘上傳數據至Streamr、三種整合數據至Streamr的典型場景、教你在Streamr市場上發布數據。所有文檔均參考Streamr blog。前兩篇主要偏向技術文檔,所以需要有一定的技術背景。第三篇不包含任何技術知識,大部分人都可以按照教程來完成相應的操作。
簡介
第一篇文檔主要介紹如何通過調用API接口上傳數據至Streamr,本篇教程文檔主要介紹了三種整合數據至Streamr網絡的典型示例。包括: 1. 直接從數據源推送數據至Streamr網絡; 2. 數據源和Streamr之間建立橋接(數據源向橋接點實時推送數據); 3. 數據源和Streamr之間建立橋接(橋接點向數據源發送輪詢請求)。
相關概念說明
在Streamr網絡中,每個數據點屬于一個Stream。每個數據點(或者成為事件、消息)包含了標有時間戳的一系列信息,比如,傳感器采集的數據、即時通訊的消息等。根據使用場景的不同,一個Stream可以包括一個源的數據或者是多個數據源的數據。Streamr網絡為用戶上傳數據提供API接口,用戶調用該API接口最簡單的方式是使用Streamr客戶端庫。當前的Streamr客戶端庫是JavaScript編寫的,其他語言的客戶端庫還在開發中。如果您不方便使用當前的Streamr客戶端庫,您也可以使用HTTP庫來調用API接口,相關方法請參見上一篇說明文檔。
場景一:直接從數據源推送數據至Streamr網絡
- 復雜度:低
- 延遲:低
- 可用性:中等
在這種模式下,數據源一產生新的數據,將被立即直接推送至Streamr網絡。這種模式是官方推薦的,但是受環境影響,這種模式有可能會不可用。這種模式需要用戶能夠控制產生數據的系統并且能夠決定該數據發送的地方。舉例來說,如果您是IoT設備生產商,您可以直接將Streamr客戶端直接植入這些IoT設備中,并且允許設備使用者上傳數據至Streamr網絡。大部分的工業設備具有良好的可配置特性,可以隨時將產生的數據接入Streamr網絡。然而,諸如汽車、手機、智能手環等消費級設備經常強制將用戶數據發送至設備制造商的云端,然后向用戶提供訪問該數據的API接口。在這種場景下,用戶可以使用橋接模式將數據接入到Streamr網絡(具體參見下一小節)。
下面以一個實際應用為例說明如何直接從數據源推送數據至Streamr網絡。這里主要介紹如何將IoT傳感器Ruuvi產生的測量數據實時上傳至Streamr。該設備通過低功耗藍牙將數據傳輸至一臺裝有Streamr客戶端庫的網關計算機。其中,網關計算機為每個設備單獨建立一個Stream,并且將設備產生的測量數據實時上傳至所創建的Stream。其示例代碼如下:
const StreamrClient = require('streamr-client')
const ruuvi = require('node-ruuvitag')
const API_KEY = 'MY-API-KEY'
const client = new StreamrClient({apiKey: API_KEY
})
const streams = {}
console.log('Listening for RuuviTags...')
ruuvi.on('found', tag => {// Create a Stream for this tag if it doesn't exist yetif (!streams[tag.id]) {streams[tag.id] = await client.getOrCreateStream({name: 'Ruuvi ' + tag.id})}tag.on('updated', async (data) => {const stream = streams[tag.id]try {// Produce the data point to the streamawait stream.produce(data)} catch (err) {console.error(err)}})
}) 場景二:數據源和Streamr之間建立橋接(數據源向橋接點實時推送數據)
- 復雜度:中等
- 延遲:低
- 可用性:中等
這種模式適用于您無法直接控制數據源的場景,用戶可以通過一系列API接口實時獲取數據。具體地,在這種模式下,數據源實時將新產生的數據推送給用戶,用戶收到通知后立即將該數據上傳至Streamr。
示例1
示例1主要介紹如何使用Bitfinex客戶端庫和Streamr客戶端庫,將Bitfinex上DATA/USD交易數據和Streamr建立連接。
const BFX = require('bitfinex-api-node')
const StreamrClient = require('streamr-client')
const STREAM_ID = 'MY-STREAM-ID'
const API_KEY = 'MY-API-KEY'
const bws = new BFX('', '', {version: 2,transform: true
}).ws
const client = new StreamrClient({apiKey: API_KEY
})
bws.on('open', () => {bws.subscribeTicker('DATUSD')
})
bws.on('ticker', (pair, ticker) => {console.log('Ticker:', ticker)client.produceToStream(STREAM_ID, ticker).then(() => console.log('Sent to Streamr!')).catch((err) => console.error(err))
}) 示例2
示例1主要從編程角度介紹如何建立相應的連接,如果您非程序員,那可以使用本示例中的可視化編輯器Streamr Editor來建立相應的連接。但是,使用該可視化編輯器也需要用戶了解一些API接口和協議等技術知識。下面展示一個通過MQTT API監測電車實時位置的過程,該過程已獲得赫爾辛基公共交通的授權。
# MQTT 模型參數
URL: mqtt://mqtt.hsl.fi
Topic: /hfp/v1/journey/ongoing/tram/#MQTT 模型數據JSON格式的字符串,通過JSON解析器模型解析為一個對象。從該對象中提取出'VP'字段,該字段包含的也是一個對象。從提取出的對象中獲取'desi','lat','long'和'veh'字段的值,然后經過數據處理之后通過SendToStream模型將該數據上傳至Streamr。 場景三:數據源和Streamr之間建立橋接(橋接點向數據源發送輪詢請求)
- 復雜度:中等
- 延遲:中等
- 可用性:好
大多數云服務提供商都會為用戶提供訪問數據的API接口,這種場景正是使用該API接口完成用戶數據上傳至Streamr。在這種場景下,當數據源產生新的數據時,用戶不會收到任何通知,這就意味著用戶需要重復向該API接口發送數據請求,這種方式成為輪詢。顯而易見,這種方式并不是一個獲取實時數據的最佳方式,原因如下:1. 在用戶請求該數據之前,該數據可能已經變化多次,意味著用戶會丟失一些數據信息;2. 會給API服務器端帶來不必要的負載,因為用戶請求的時機是不固定的,不一定恰巧在數據值發送變化的時候;3. 增加了時延(輪詢周期的一半時間)。
示例1
本示例將介紹如何使用OpenWeatherMap每隔15分鐘查詢瑞士楚格的天氣情況。對于天氣等這種數據,使用輪詢方式獲取是可行的,因為天氣狀況不會變化的太快。
const fetch = require('node-fetch')
const StreamrClient = require('streamr-client')
const OPENWEATHERMAP_API_KEY = 'MY-OPENWEATHERMAP-KEY'
const STREAMR_API_KEY = 'MY-STREAMR-KEY'
const POLL_INTERVAL = 15 * 60 * 1000 // 5 minutes
const location = 'Zug,Switzerland'
const client = new StreamrClient({apiKey: STREAMR_API_KEY
})
// Query data from OWM and produce the result to Streamr
function pollAndProduce(stream) {fetch(`https://api.openweathermap.org/data/2.5/weather?q=${location}&APPID=${OPENWEATHERMAP_API_KEY}&units=metric`).then((response) => response.json()).then((json) => {console.log(json)// Produce the data point to Streamrreturn stream.produce(json)}).catch((err) => {console.error(err)})
}
// Create a Stream for this location, if it doesn't exist
client.getOrCreateStream({name: `Weather in ${location}`,description: 'From openweathermap.org, updated every 15 minutes'
}).then((stream) => {console.log(`Target Stream id: ${stream.id}`)// Poll and produce nowpollAndProduce(stream)// Keep repeating every 15 minutessetInterval(() => pollAndProduce(stream), POLL_INTERVAL)
}).catch((err) => {console.log(err)
}) 示例2
示例2將介紹如何利用可視化編輯器完成上述功能。
圖中,Clock模型每隔15分鐘發出一次警報并且觸發HTTP Request模型向OpenWeatherMap發出獲取天氣數據的請求。然后,SendToStream模型將獲取到的數據上傳至Streamr。
參考文獻
- https://medium.com/streamrblog/three-patterns-for-integrating-your-data-to-streamr-2-of-3-38027dc91f9e
轉載于:https://www.cnblogs.com/Streamr-letsgo/p/9129707.html
總結
以上是生活随笔為你收集整理的Streamr助你掌控自己的数据(2)——三种整合数据至Streamr的典型场景的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个玲玲的微信网名
- 下一篇: node.js实现国标GB28181流媒