Overview
The Stream Plugin API enables developers to create plugins that pulls or subscribes to device or sensor data from data sources like REST APIs and external data streaming interfaces. This article aims to provide detailed information on the APIs available for Stream Plugins and how to build one.
Starting a Node.js Stream Plugin Project
To easily create a plugin project scaffolding for Stream plugins, Reekoh has provided a generator tool which can be used like the following:
cd path/to/your/project-folder
yo reekoh-node:stream
Building the Plugin
The file named app.js is where everything starts. To use the Stream Plugin API, start by creating a Stream instance as seen on the scaffolding's app.js file.
app.js
'use strict'
const reekoh = require('reekoh')
const plugin = new reekoh.plugins.Stream()
plugin
is now an instance of a Stream with the necessary properties, methods, and events for creating a Stream plugin. Details of the Stream instance properties, methods, and events are specified in the following sections.
Properties
Specified below are the properties of a Stream which are injected for use in your plugin.
- config {object} - The custom plugin configuration values as requested from and specified by the end user. For more information about plugin configurations, please see the Packaging & Submission section.
Events
Specified below are the events that a Stream is able to emit based upon operations that are happening on the platform pipelines.
'ready'
This event is emitted when the Stream instance has fully initialized. This is usually the event where one listens to and initialize a client that connects to the external interface to pull or subscribe to device/sensor data.
Sample Code
let client
plugin.on('ready', () => {
client = createClient(plugin.config.apiKey)
client.subscribe(() => {
plugin.log('Client is now subscribe to the streaming interface')
})
})
'command'
This event is emitted when the Stream plugin receives a command/message that is needed to be sent over to a target device. Implement this event when the external service exposes a bi-directional interface for device communications.
Arguments
- command {object} - The details of the command/message to be sent to a specific device. Command has the following properties/signature:
- device {string} - The ID of the target device.
- commandId {string} - The identifier for the command or message. Is used to write responses for the command when received from the device.
- command {string} - The actual command or message to be sent to the target device.
Sample Code
plugin.on('command', command => {
client.publishCommand(command.device, command.command, response => {
plugin.sendCommandResponse(command.commandId, response)
})
})
'sync'
This event is only available for Stream plugins that have "PULL" method of integration. This means that it pulls new data from the external data source using a predefined frequency or schedule (i.e., every 5 minutes). This is the event to listen to trigger the sync function that pulls down data and ingest into the pipeline.
Sample Code
plugin.on('sync', () => {
let execDateTime = Date.now()
plugin.getState().then(state => {
client
.getNewData(state.lastPullDateTime)
.then(data => {
data.forEach(datum => {
plugin.requestDeviceInfo(datum.device).then(deviceInfo => {
if (!deviceInfo) return plugin.logException(new Error(datum.device + ' is not registered.'))
Object.assign(datum, {
rkhDeviceInfo: deviceInfo
})
plugin.pipe(datum.device, datum)
})
})
plugin.setState({
lastPullDateTime: execDateTime
})
})
.catch(err => {
plugin.logException(err)
})
})
})
'adddevice'
This event is emitted when a new device is registered on the Reekoh device registry. If the external streaming interface/platform exposes a Device Registration/Provisioning API, then this is the event to listen to for sending the device details.
Arguments
- device {object} - the newly registered device on the Reekoh platform. This object has the following properties:
- _id {string} - the Device ID
- name {string} - the Device Name
- metadata {object} - the Device metadata
Sample Code
plugin.on('adddevice', device => {
client
.registerDevice(device)
.then(() => {
plugin.log('Registered Device')
plugin.log(device)
})
.catch(err => {
plugin.logException(err)
})
})
'updatedevice'
This event is emitted when a registered device in Reekoh has been updated. If the external streaming interface/platform exposes a Device Update API, then this is the event to listen to for sending the updated device details.
Arguments
- device {object} - the updated device on the Reekoh platform. This object has the following properties:
- _id {string} - the Device ID
- name {string} - the Device Name
- metadata {object} - the Device metadata
Sample Code
plugin.on('updatedevice', device => {
client
.updateDevice(device)
.then(() => {
plugin.log('Updated Device')
plugin.log(device)
})
.catch(err => {
plugin.logException(err)
})
})
'removedevice'
This event is emitted when a registered device in Reekoh has been deleted. If the external streaming interface/platform exposes a Device Delete API, then this is the event to listen to for deleting the device on the other platform.
Arguments
- device {object} - the deleted device on the Reekoh platform. This object has the following properties:
- _id {string} - the Device ID
- name {string} - the Device Name
- metadata {object} - the Device metadata
Sample Code
plugin.on('removedevice', device => {
client
.deleteDevice(device._id)
.then(() => {
plugin.log('Removed Device')
plugin.log(device)
})
.catch(err => {
plugin.logException(err)
})
})
Methods
Specified below are Stream API methods that you can use for your plugin to relay information and initiate operations on the platform.
pipe(data, [sequenceId])
Invoke this method to transmit the data through the next plugins in the pipeline for further processing or integration.
Arguments
- data {object} - the data to send through to the next stages in the pipeline.
- sequenceId {string} - Optional A sequence ID or any identifier for the packet that can be used for data de-duplication/sanitation. The system will store this sequence ID in a cache for a limited amount of time so that when another packet from the same device gets in that have the same sequence ID, it will be rejected to avoid data duplication.
Returns
- Promise - which is fulfilled once the data has been transmitted through the pipeline
Sample Code
client.on('data', data => {
plugin
.requestDeviceInfo(data.device) // Get device information from the system
.then(deviceInfo => {
// If device information is null, it means it's not registered.
if (!deviceInfo) return platform.logException(new Error(`${data.device} is not registered.`)
let sequenceId = data.sequenceId
delete data.sequenceId
// Always append the device information as rkhDeviceInfo
Object.assign(data, {
rkhDeviceInfo: deviceInfo
})
// Transmit the data through the pipeline
return plugin.pipe(data, sequenceId)
})
.catch(err => {
platform.logException(err)
})
})
sendCommandResponse(commandId, response)
Invoke this method to record a response to a command.
Arguments
- commandId {string} - The command ID from the system.
- response {string} - The response from the device after the command was sent to it.
Returns
- Promise - which is fulfilled when the response has been recorded on the system.
Sample Code
plugin.on('command', command => {
client.publishCommand(command.device, command.command, response => {
plugin.sendCommandResponse(command.commandId, response)
})
})
notifyConnection(deviceId)
Invoke this method to denote that a device has connected to the Stream. This will set the status of the device to Online on the platform.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
Returns
- Promise - which is fulfilled when the operation is done.
Sample Code
client.on('deviceConnection', device => {
plugin.notifyConnection(device.id)
})
notifyDisconnection(deviceId)
Invoke this method to denote that a device has disconnected from the Stream. This will set the status of the device to Offline.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
Returns
- Promise - which is fulfilled when the operation is done.
Sample Code
client.on('deviceDisconnection', conn => {
plugin.notifyDisconnection(device.id)
})
requestDeviceInfo(deviceId)
Invoke this method to get the device information from the platform's device registry. Always use this method to verify that the device is registered.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
Returns
- Promise(deviceInfo {object}) - Resolves the device information. Device information includes the _id, name, metadata and state properties of the device.
Sample Code
client.on('data', data => {
plugin
.requestDeviceInfo(data.device) // Get device information from the system
.then(deviceInfo => {
// TODO: Do something with the device information
})
.catch(err => {
platform.logException(err)
})
})
syncDevice(deviceInfo, [deviceGroup])
Invoke this method to sync/register/update a device into Reekoh's device registry. This enables some device management capabilities for Stream plugins if needed.
Arguments
- deviceInfo {object} - The device to register. The only required properties are _id or id and name. All other properties will be added to the device metadata. If the device is already existing, it will be updated. If the existing device has metadata, additional metadata will be appended from the deviceInfo.
- deviceGroup {string} - Optional The device group id where the device should belong to.
Returns
- Promise - which is fulfilled when the device has been submitted for registry.
Sample Code
client.on('device-activation', device => {
plugin.syncDevice({
_id: device.serialNumber, // _id or id can be supplied. This is required.
name: device.name, // This is required.
manufacturer: device.manufacturer // This will get appended to the device' metadata
})
})
removeDevice(deviceId)
Invoke this method to remove a device into the platform's device registry. This enables some device management capabilities for Stream plugins if needed.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
Returns
- Promise - which is fulfilled when the device has been submitted for removal.
Sample Code
client.on('device-deactivation', device => {
plugin.removeDevice(device.serialNumber)
})
setState(state)
Invoke this method to set the plugin's state. State can be used to store additional information or metadata for the plugin. It can also be used as cache for any information that needs to be stored temporarily.
Arguments
- state {any} - state to be stored. Can be anything - object, array, string, number etc.
Returns
- Promise - which is fulfilled when the state has been submitted for storage.
Sample Code
plugin.on('sync', () => {
let execDateTime = Date.now()
plugin.getState().then(state => {
client
.getNewData(state.lastPullDateTime)
.then(data => {
data.forEach(datum => {
plugin.requestDeviceInfo(datum.device).then(deviceInfo => {
if (!deviceInfo) return plugin.logException(new Error(datum.device + ' is not registered.'))
Object.assign(datum, {
rkhDeviceInfo: deviceInfo
})
plugin.pipe(datum.device, datum)
})
})
plugin.setState({
lastPullDateTime: execDateTime
})
})
.catch(err => {
plugin.logException(err)
})
})
})
getState()
Invoke this method to retrieve the contents of the plugin state.
Returns
- Promise (state {any}) - resolves or returns the contents of the plugin's state.
Sample Code
plugin.on('sync', () => {
let execDateTime = Date.now()
plugin.getState().then(state => {
client
.getNewData(state.lastPullDateTime)
.then(data => {
data.forEach(datum => {
plugin.requestDeviceInfo(datum.device).then(deviceInfo => {
if (!deviceInfo) return plugin.logException(new Error(datum.device + ' is not registered.'))
Object.assign(datum, {
rkhDeviceInfo: deviceInfo
})
plugin.pipe(datum.device, datum)
})
})
plugin.setState({
lastPullDateTime: execDateTime
})
})
.catch(err => {
plugin.logException(err)
})
})
})
setDeviceState(deviceId, state)
Invoke this method to set the device state. You can record anything as state of the device. It is essentially some kind of cache of information for the device.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
- state {any} - The state of the device. Can be anything - object, array, string, number etc.
Returns
- Promise - which is fulfilled when the device has been submitted for registry.
Sample Code
client.on('data', data => {
plugin
.requestDeviceInfo(data.device) // Get device information from the system
.then(deviceInfo => {
deviceInfo.state.previousReadings = state.previousReadings || []
deviceInfo.state.previousReadings.push(data)
return plugin.setDeviceState(data.device, deviceInfo.state).then(() => {
Object.assign(data, {
rkhDeviceInfo: deviceInfo
})
return plugin.pipe(data)
})
})
.then(() => {
plugin.pipe(data)
})
.catch(err => {
platform.logException(err)
})
})
setDeviceLocation(deviceId, latitude, longitude)
Invoke this method to set the device location. When ingesting location data, this will update the device location to reflect the real-time location reading.
Arguments
- deviceId {string} - the ID of the device as registered on the platform.
- latitude {number} - The latitude coordinate as per the reading.
- longitude {number} - The longitude coordinate as per the reading.
Returns
- Promise - which is fulfilled when the device location has been submitted.
Sample Code
client.on('data', data => {
plugin
.requestDeviceInfo(data.device) // Get device information from the system
.then(deviceInfo => {
deviceInfo.state.previousReadings = state.previousReadings || []
deviceInfo.state.previousReadings.push(data)
return plugin.setDeviceLocation(data.device, data.lat, data.lng).then(() => {
Object.assign(data, {
rkhDeviceInfo: deviceInfo
})
return plugin.pipe(data)
})
})
.catch(err => {
platform.logException(err)
})
})
log(logData)
Invoke this function to log any information. Can be useful for debugging. Logs are found under the Logs module or in each plugin instance in the Pipeline Studio.
Arguments
- logData {string | object} - The information to be logged.
Returns
- Promise - fulfilled when the log data has been submitted to the platform for recording.
Sample Code
client.on('data', data => {
plugin.log('Received data')
plugin.log(data)
// TODO: Do other things with the data
})
logException(err)
Invoke this function to log errors/exceptions. Can be useful for debugging. Error/exception logs are found under the Logs module or in each plugin instance in the Pipeline Studio.
Arguments
- err {error} - The error to be logged.
Returns
- Promise - fulfilled when the error data has been submitted to the platform for recording.
Sample Code
client.on('data', data => {
plugin.logException(new Error('This is an error.'))
// TODO: Do other things with the data
})
Reference Implementations
Listed below are some reference implementations of the Stream Plugin API.
These are some of the Stream Plugins developed and open-sourced by Reekoh. You may visit our Gitlab Page for more information. Contributions are most welcome.
Back to Top | < Previous |
Comments
0 comments
Please sign in to leave a comment.