Options
All
  • Public
  • Public/Protected
  • All
Menu

streamr-client - v6.0.3

Streamr

Streamr JavaScript Client

Build status latest npm package version GitHub stars Discord Chat

This library allows you to easily interact with the Streamr Network from JavaScript-based environments, such as browsers and node.js. The library wraps a Streamr light node for publishing and subscribing to messages, as well as contains convenience functions for creating and managing streams.

If you are using the Streamr Client in Node, NodeJS version 16.13.x and NPM version 8.x is required

Please see the Streamr project docs for more detailed documentation.

Contents

Getting started

Subscribing

const streamId = 'streamr.eth/demos/helsinki-trams'

streamr.subscribe(streamId, (message) => {
// handle for individual messages
})

Publishing

// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.createStream({
id: '/foo/bar'
})

await stream.publish({ timestamp: Date.now() })

More examples can be found in the examples repo.


Setup

Installation

The client is available on npm and can be installed simply by:

npm install streamr-client

Importing streamr-client

To use with react please see streamr-client-react

If using TypeScript you can import the library with:

import { StreamrClient } from 'streamr-client'

If using Node.js you can import the library with:

const { StreamrClient } = require('streamr-client')

For usage in the browser include the latest build, e.g. by including a <script> tag pointing at a CDN:

<script src="https://unpkg.com/streamr-client@latest/streamr-client.web.js"></script>

Usage

Client creation

In Streamr, Ethereum accounts are used for identity. You can generate an Ethereum private key using any Ethereum wallet, or you can use the utility function StreamrClient.generateEthereumAccount(), which returns the address and private key of a fresh Ethereum account. A private key is not required if you are only subscribing to public streams on the Network.

const streamr = new StreamrClient({
auth: {
privateKey: 'your-private-key'
}
})

Authenticating with an Ethereum private key contained in an Ethereum (web3) provider (e.g. MetaMask):

const streamr = new StreamrClient({
auth: {
ethereum: window.ethereum,
}
})

You can also create an anonymous client instance that can interact with public streams:

const streamr = new StreamrClient()

Creating a stream

// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.createStream({
id: '/foo/bar'
})

console.log(stream.id) // e.g. `0x12345.../foo/bar`

You can also create a stream by defining the address in the provided id. Please note that the creation will only succeed if you specify the same address as provided for authentication when creating the streamr instance:

// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await client.createStream({
id: `${address}/foo/bar`
})

console.log(stream.id) // e.g. `0x12345.../foo/bar`

More information on Stream IDs can be found under the stream creation project docs

Subscribing to a stream

const subscription = await streamr.subscribe(
streamId,
(content, metadata) => { ... }
)

The callback's first parameter, content, will contain the value given to the publish method.

The second parameter metadata is of type StreamMessage. It contains metadata about the message, e.g. timestamp.

Unsubscribing from an existent subscription:

await streamr.unsubscribe(streamId)
// or, unsubscribe them all:
const streams = await streamr.unsubscribe()

Getting all streams the client is subscribed to:

const subscriptions = streamr.getSubscriptions()

Publishing to a stream

// Here's our example data point
const msg = {
temperature: 25.4,
humidity: 10,
happy: true
}

// Publish using the stream id only
await streamr.publish(streamId, msg)

// Publish with a specific timestamp as a Date object (default is now)
await streamr.publish(streamId, msg, new Date(1546300800123))

// Publish with a specific timestamp in ms
await streamr.publish(streamId, msg, 1546300800123)

// Publish with a specific timestamp as a ISO8601 string
await streamr.publish(streamId, msg, '2019-01-01T00:00:00.123Z')

// For convenience, stream.publish(...) equals streamr.publish(stream, ...)
await stream.publish(msg)

Requesting historical messages

By default subscribe will not request historical messages.

You can fetch historical messages with the resend method:

// Fetches the last 10 messages stored for the stream
const resend1 = await streamr.resend(
streamId,
{
last: 10,
},
onMessage
)

Alternatively you can fetch historical messages and subscribe to real-time messages:

// Fetches the last 10 messages and subscribes to the stream
const sub1 = await streamr.subscribe({
id: streamId,
resend: {
last: 10,
}
}, onMessage)

In order to fetch historical messages the stream needs to have storage enabled.

Resend from a specific timestamp up to the newest message:

const sub2 = await streamr.resend(
streamId,
{
from: {
timestamp: (Date.now() - 1000 * 60 * 5), // 5 minutes ago
sequenceNumber: 0, // optional
},
publisher: '0x12345...', // optional
}
)

Resend a range of messages:

const sub3 = await streamr.resend(
streamId,
{
from: {
timestamp: (Date.now() - 1000 * 60 * 10), // 10 minutes ago
},
to: {
timestamp: (Date.now() - 1000 * 60 * 5), // 5 minutes ago
},
// when using from and to the following parameters are optional
// but, if specified, both must be present
publisher: '0x12345...',
msgChainId: 'ihuzetvg0c88ydd82z5o',
}
)

If you choose one of the above resend options when subscribing, you can listen on the completion of this resend by doing the following:

const sub = await streamr.subscribe(options)
sub.onResent(() => {
console.log('Received all requested historical messages! Now switching to real time!')
})

Note that only one of the resend options can be used for a particular subscription.

Searching for streams

You can search for streams by specifying a search term:

const streams = await streamr.searchStreams('foo')

Alternatively or additionally to the search term, you can search for streams based on permissions.

To get all streams for which a user has any direct permission:

const streams = await streamr.searchStreams('foo', {
user: '0x12345...'
})

To get all streams for which a user has any permission (direct or public):

const streams = await streamr.searchStreams('foo', {
user: '0x12345...',
allowPublic: true
})

It is also possible to filter by specific permissions by using allOf and anyOf properties. The allOf property should be preferred over anyOf when possible due to better query performance.

If you want to find the streams you can subscribe to:

const streams = await streamr.searchStreams(undefined, {
user: '0x12345...',
allOf: [StreamPermission.SUBSCRIBE],
allowPublic: true
})

Interacting with the Stream object

The Stream type provides a convenient way to interact with a stream without having to repeatedly pass Stream IDs.

Getting existing streams

const stream = await streamr.getStream(streamId)

The method getOrCreateStream gets the stream if it exists, and if not, creates it:

// May require MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.getOrCreateStream({
id: streamId
})

Stream permissions

There are 5 different permissions:

  • StreamPermission.PUBLISH
  • StreamPermission.SUBSCRIBE
  • StreamPermission.EDIT
  • StreamPermission.DELETE
  • StreamPermission.GRANT

You can import the StreamPermission enum with:

const { StreamPermission } = require('streamr-client')

For each stream + user there can be a permission assignment containing a subset of those permissions. It is also possible to grant public permissions for streams (only StreamPermission.PUBLISH and StreamPermission.SUBSCRIBE). If a stream has e.g. a public subscribe permissions, it means that anyone can subscribe to that stream.

To grant a permission for a user:

// Requires MATIC tokens (Polygon blockchain gas token)
await stream.grantPermissions({
user: '0x12345...',
permissions: [StreamPermission.PUBLISH],
})

Or to grant a public permission:

await stream.grantPermissions({
public: true,
permissions: [StreamPermission.SUBSCRIBE]
})

To revoke a permission from a user:

// Requires MATIC tokens (Polygon blockchain gas token)
await stream.revokePermissions({
user: '0x12345...',
permissions: [StreamPermission.PUBLISH]
})

Or revoke public permission:

await stream.revokePermissions({
public: true,
permissions: [StreamPermission.SUBSCRIBE]
})

The method streamr.setPermissions can be used to set an exact set of permissions for one or more streams. Note that if there are existing permissions for the same users in a stream, the previous permissions are overwritten. Note that this method cannot be used from a stream, but via the StreamrClient instance:

// Requires MATIC tokens (Polygon blockchain gas token)
await streamr.setPermissions({
streamId,
assignments: [
{
user: '0x11111...',
permissions: [StreamPermission.EDIT]
}, {
user: '0x22222...'
permissions: [StreamPermission.GRANT]
}, {
public: true,
permissions: [StreamPermission.PUBLISH, StreamPermission.SUBSCRIBE]
}
]
})

You can query the existence of a permission with hasPermission(). Usually you want to use allowPublic: true flag so that also the existence of a public permission is checked:

await stream.hasPermission({
permission: StreamPermission.PUBLISH,
user: '0x12345...',
allowPublic: true
}

The full list of permissions for a stream can be queried by calling stream.getPermissions():

const permissions = await stream.getPermissions()

The returned value is an array of permissions containing an item for each user, and possibly one for public permissions:

permissions = [
{ user: '0x12345...', permissions: ['subscribe', 'publish'] },
{ public: true, permissions: ['subscribe']}
]

Updating a stream

To update the description of a stream:

// Requires MATIC tokens (Polygon blockchain gas token)
stream.description = 'New description!'
await stream.update()

Deleting a stream

To delete a stream:

// Requires MATIC tokens (Polygon blockchain gas token)
await stream.delete()

Enabling storage

You can enable storage on your streams to retain historical messages and access it later via resend. By default storage is not enabled on streams. You can enable it with:

const { StreamrClient, STREAMR_STORAGE_NODE_GERMANY } = require('streamr-client')
...
// assign a stream to a storage node
await stream.addToStorageNode(STREAMR_STORAGE_NODE_GERMANY)

Other operations with storage:

// remove the stream from a storage node
await stream.removeFromStorageNode(STREAMR_STORAGE_NODE_GERMANY)
// fetch the storage nodes for a stream
const storageNodes = stream.getStorageNodes()

Data Unions

⚠️ This code examples in this section are not up to date.

The Data Union framework is a data crowdsourcing and crowdselling solution. Working in tandem with the Streamr Network and Ethereum, the framework powers applications that enable people to earn by sharing valuable data. You can read more about it here

To deploy a new DataUnion with default deployment options:

const dataUnion = await streamr.deployDataUnion()

To get an existing (previously deployed) DataUnion instance:

const dataUnion = await streamr.getDataUnion('0x12345...')

Admin Functions

Admin functions require xDai tokens on the xDai network. To get xDai you can either use a faucet or you can reach out on the Streamr Discord #dev channel.

Adding members using admin functions is not at feature parity with the member function join. The newly added member will not be granted publish permissions to the streams inside the Data Union. This will need to be done manually using, streamr.grantPermissions(). Similarly, after removing a member using the admin function removeMembers, the publish permissions will need to be removed in a secondary step using revokePermissions().

Adding members:

const receipt = await dataUnion.addMembers([
'0x11111...',
'0x22222...',
'0x33333...',
])

Removing members:

const receipt = await dataUnion.removeMembers([
'0x11111...',
'0x22222...',
'0x33333...',
])

Checking if an address belongs to the Data Union:

const isMember = await dataUnion.isMember('0x12345...')

Send all withdrawable earnings to the member's address:

const receipt = await dataUnion.withdrawAllToMember('0x12345...')

Send all withdrawable earnings to the address signed off by the member:

const recipientAddress = '0x22222...'

const signature = await dataUnion.signWithdrawAllTo(recipientAddress)
const receipt = await dataUnion.withdrawAllToSigned(
'0x11111...', // member address
recipientAddress,
signature
)

Send some of the withdrawable earnings to the address signed off by the member

const signature = await dataUnion.signWithdrawAllTo(recipientAddress)
const receipt = await dataUnion.withdrawAllToSigned(
'0x12345...', // member address
recipientAddress,
signature
)

// Or to authorize a fixed amount:
const receipt = await dataUnion.withdrawAmountToSigned(
'0x12345...', // member address
recipientAddress,
100, // token amount, in wei
signature,
)

Setting a new admin fee:

// Any number between 0 and 1, inclusive
const receipt = await dataUnion.setAdminFee(0.4)

Query functions

These are available for everyone and anyone, to query publicly available info from a Data Union.

Get Data Union's statistics:

const stats = await dataUnion.getStats()

Get a member's stats:

const memberStats = await dataUnion.getMemberStats('0x12345...')

Get the withdrawable DATA tokens in the DU for a member:

// Returns a BigNumber
const balance = await dataUnion.getWithdrawableEarnings('0x12345...')

Getting the set admin fee:

const adminFee = await dataUnion.getAdminFee()

Getting admin's address:

const adminAddress = await dataUnion.getAdminAddress()

Getting the Data Union's version:

const version = await dataUnion.getVersion()
// Can be 0, 1 or 2
// 0 if the contract is not a data union

Withdraw options

The functions withdrawAll, withdrawAllTo, withdrawAllToMember, withdrawAllToSigned, withdrawAmountToSigned all can take an extra "options" argument. It's an object that can contain the following parameters. The provided values are the default ones, used when not specified or when the options parameter is not provided:

const receipt = await dataUnion.withdrawAll(
...,
{
sendToMainnet: true, // Whether to send the withdrawn DATA tokens to mainnet address (or sidechain address)
payForTransport: true, //Whether to pay for the withdraw transaction signature transport to mainnet over the bridge
waitUntilTransportIsComplete: true, // Whether to wait until the withdrawn DATA tokens are visible in mainnet
pollingIntervalMs: 1000, // How often requests are sent to find out if the withdraw has completed, in ms
retryTimeoutMs: 60000, // When to give up when waiting for the withdraw to complete, in ms
gasPrice: /*Network Estimate*/ // Ethereum Mainnet transaction gas price to use when transporting tokens over the bridge
}
)

These withdraw transactions are sent to the sidechain, so gas price shouldn't be manually set (fees will hopefully stay very low), but a little bit of sidechain native token is nonetheless required.

The return values from the withdraw functions also depend on the options.

If sendToMainnet: false, other options don't apply at all, and sidechain transaction receipt is returned as soon as the withdraw transaction is done. This should be fairly quick in the sidechain.

The use cases corresponding to the different combinations of the boolean flags:

transport wait Returns Effect
true true Transaction receipt (default) Self-service bridge to mainnet, client pays for mainnet gas
true false Transaction receipt Self-service bridge to mainnet (but skip the wait that double-checks the withdraw succeeded and tokens arrived to destination)
false true null Someone else pays for the mainnet gas automatically, e.g. the bridge operator (in this case the transaction receipt can't be returned)
false false AMB message hash Someone else pays for the mainnet gas, but we need to give them the message hash first

Deployment options

deployDataUnion can take an options object as the argument. It's an object that can contain the following parameters. All shown values are the defaults for each property:

const ownerAddress = await streamr.getAddress()

const dataUnion = await streamr.deployDataUnion({
owner: ownerAddress, // Owner / admin of the newly created Data Union
joinPartsAgent: [ownerAddress], // Able to add and remove members to/from the Data Union
dataUnionName: /* Generated if not provided */, // NOT stored anywhere, only used for address derivation
adminFee: 0, // Must be between 0...1
sidechainPollingIntervalMs: 1000, //How often requests are sent to find out if the deployment has completed
sidechainRetryTimeoutMs: 60000, // When to give up when waiting for the deployment to complete
confirmations: 1, // Blocks to wait after Data Union mainnet contract deployment to consider it final
gasPrice: /*Network Estimate*/ // Ethereum Mainnet gas price to use when deploying the Data Union mainnet contract
})

Streamr Core is added as a joinPartAgent by default so that joining with secret works using the member function join. If you don't plan to use join for "self-service joining", you can leave out Streamr Core agent by calling deployDataUnion e.g. with your own address as the sole joinPartAgent:

const dataUnion = await streamr.deployDataUnion({
joinPartAgents: [ownerAddress],
adminFee,
})

dataUnionName option exists purely for the purpose of predicting the addresses of Data Unions not yet deployed. Data Union deployment uses the CREATE2 opcode which means a Data Union deployed by a particular address with particular "name" will have a predictable address.

Utility functions

The static function StreamrClient.generateEthereumAccount() generates a new Ethereum private key and returns an object with fields address and privateKey.

const { address, privateKey } = StreamrClient.generateEthereumAccount()

In order to retrieve the client's address an async call must me made to streamr.getAddress

const address = await streamr.getAddress()

Advanced usage

Stream partitioning

Partitioning (sharding) enables streams to scale horizontally. This section describes how to use partitioned streams via this library. To learn the basics of partitioning, see the docs.

A note on stream ids and partitions

The public methods of the client generally support the following three ways of defining a stream:

// Stream id as a string:
const streamId = `${address}/foo/bar`

// Stream id + partition as a string
const streamId = `${address}/foo/bar#4`

// Stream id + partition as an object
const streamId = {
id: `${address}/foo/bar`,
partition: 4
}

Creating partitioned streams

By default, streams only have 1 partition when they are created. The partition count can be set to any number between 1 and 100. An example of creating a partitioned stream:

// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.createStream({
id: `/foo/bar`,
partitions: 10,
})
console.log(`Stream created: ${stream.id}. It has ${stream.partitions} partitions.`)

Publishing to partitioned streams

In most use cases, a user wants related messages (e.g. messages from a particular device) to be assigned to the same partition, so that the messages retain a deterministic order and reach the same subscriber(s) to allow them to compute stateful aggregates correctly.

You can specify the partition number as follows:

await streamr.publish({
id: `${address}/foo/bar`,
partition: 4
}, msg)

The library alternatively allows the user to choose a partition key, which simplifies publishing to partitioned streams by not requiring the user to assign a partition number explicitly. The same partition key always maps to the same partition. In an IoT use case, the device id can be used as partition key; in user interaction data it could be the user id, and so on.

The partition key can be given as an argument to the publish methods, and the library assigns a deterministic partition number automatically:

await stream.publish(
msg,
Date.now(),
msg.vehicleId // msg.vehicleId is the partition key here
)

Subscribing to partitioned streams

By default, the client subscribes to the first partition (partition 0) of a stream. Be aware: this behavior will change in the future so that it will subscribe to all partitions by default.

The partition number can be explicitly given in subscribe:

const sub = await streamr.subscribe({
id: streamId,
partition: 4
}, (content) => {
console.log('Got message %o', content)
})

If you want to subscribe to multiple partitions:

const onMessage = (content, streamMessage) => {
console.log('Got message %o from partition %d', content, streamMessage.getStreamPartition())
}

await Promise.all([2, 3, 4].map(async (partition) => {
await streamr.subscribe({
id: streamId,
partition,
}, onMessage)
}))

Disable message ordering and gap filling

If your use case tolerates missing messages and message arriving out-of-order, you can turn off message ordering and gap filling when creating a instance of the client:

const streamr = new StreamrClient({
auth: { ... },
orderMessages: false,
gapFill: false
})

Both of these properties should be disabled in tandem for message ordering and gap filling to be properly turned off.

By disabling message ordering your application won't perform any filling nor sorting, dispatching messages as they come (faster) but without granting their collective integrity.

Proxy publishing

In some cases the client might be interested in publishing messages without participating in the stream's message propagation. With this option the nodes can sign all messages they publish by themselves. Alternatively, a client could open a WS connection to a broker node and allow the broker to handle signing with its private key.

Proxy publishing is done on the network overlay level. This means that there is no need to know the IP address of the node that will be used as a proxy. Instead, the node needs to know the ID of the network node it wants to connect to. It is not possible to set publish proxies for a stream that is already being "traditionally" subscribed or published to and vice versa.

// Open publish proxy to a node on stream
await publishingClient.setPublishProxy(stream, '0x12345...')

// Open publish proxy to multiple nodes on stream
await publishingClient.setPublishProxies(stream, ['0x11111...', '0x22222...'])

// Remove publish proxy to a node on stream
await publishingClient.removePublishProxy(stream, '0x12345...')

// Remove publish proxy to multiple nodes on stream
await publishingClient.removePublishProxies(stream, ['0x11111...', '0x22222...'])

IMPORTANT: The node that is used as a proxy must have set the option on the network layer to accept incoming proxy connections.

Logging

The library supports debug for logging.

In node.js, start your app like this: DEBUG=StreamrClient* node your-app.js

In the browser, set localStorage.debug = 'StreamrClient*'

Generated using TypeDoc