Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add types #60

Merged
merged 7 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
language: node_js
cache: npm
dist: bionic

stages:
- check
- test
- cov
branches:
only:
- master
- /^release\/.*$/

node_js:
- '10'
- 'lts/*'
- 'node'

stages:
- check

os:
- linux
Expand Down
12 changes: 11 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"description": "Responsible for providing an interface-datastore compliant api to pubsub",
"leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",
"main": "src/index.js",
"types": "dist/src/index.d.ts",
"scripts": {
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"lint": "aegir lint",
"release": "aegir release --target node",
"release-minor": "aegir release --target node --type minor",
Expand All @@ -25,6 +26,10 @@
"datastore",
"pubsub"
],
"files": [
"dist",
"src"
],
"author": "Vasco Santos <vasco.santos@moxy.studio>",
"license": "MIT",
"bugs": {
Expand All @@ -38,11 +43,16 @@
"uint8arrays": "^2.0.5"
},
"devDependencies": {
"@types/detect-node": "^2.0.0",
"@types/mocha": "^8.2.1",
"@types/sinon": "^9.0.10",
"aegir": "^31.0.3",
"detect-node": "^2.0.4",
"ipfs-core-types": "^0.3.0",
"it-pair": "^1.0.0",
"libp2p": "^0.30.9",
"libp2p-gossipsub": "^0.8.0",
"libp2p-interfaces": "^0.8.3",
"libp2p-record": "^0.10.0",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.2",
Expand Down
131 changes: 66 additions & 65 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@

const { Key, Adapter } = require('interface-datastore')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
const uint8ArrayEquals = require('uint8arrays/equals')

const errcode = require('err-code')
const debug = require('debug')
const log = debug('datastore-pubsub:publisher')
log.error = debug('datastore-pubsub:publisher:error')
const log = Object.assign(debug('datastore-pubsub:publisher'), {
error: debug('datastore-pubsub:publisher:error')
})

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('./types').Validator} Validator
* @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn
* @typedef {import('libp2p-interfaces/src/pubsub/message').Message} PubSubMessage
*/

// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
class DatastorePubsub extends Adapter {
/**
* Creates an instance of DatastorePubsub.
*
* @param {*} pubsub - pubsub implementation.
* @param {*} datastore - datastore instance.
* @param {*} peerId - peer-id instance.
* @param {Object} validator - validator functions.
* @param {(record: uint8Array, peerId: PeerId) => boolean} validator.validate - function to validate a record.
* @param {(received: uint8Array, current: uint8Array) => boolean} validator.select - function to select the newest between two records.
* @param {function(key, callback)} subscriptionKeyFn - optional function to manipulate the key topic received before processing it.
* @param {import('libp2p-interfaces/src/pubsub')} pubsub - pubsub implementation
* @param {import('interface-datastore').Datastore} datastore - datastore instance
* @param {PeerId} peerId - peer-id instance
* @param {Validator} validator - validator functions
* @param {SubscriptionKeyFn} [subscriptionKeyFn] - function to manipulate the key topic received before processing it
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
Expand Down Expand Up @@ -57,9 +64,9 @@ class DatastorePubsub extends Adapter {
*
* @param {Uint8Array} key - identifier of the value to be published.
* @param {Uint8Array} val - value to be propagated.
* @returns {Promise}
*/
async put (key, val) { // eslint-disable-line require-await
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async put (key, val) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'

Expand All @@ -79,15 +86,15 @@ class DatastorePubsub extends Adapter {
log(`publish value for topic ${stringifiedTopic}`)

// Publish record to pubsub
return this._pubsub.publish(stringifiedTopic, val)
await this._pubsub.publish(stringifiedTopic, val)
}

/**
* Try to subscribe a topic with Pubsub and returns the local value if available.
*
* @param {Uint8Array} key - identifier of the value to be subscribed.
* @returns {Promise<Uint8Array>}
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async get (key) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'
Expand All @@ -106,7 +113,8 @@ class DatastorePubsub extends Adapter {

// subscribe
try {
await this._pubsub.subscribe(stringifiedTopic, this._onMessage)
this._pubsub.on(stringifiedTopic, this._onMessage)
await this._pubsub.subscribe(stringifiedTopic)
} catch (err) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`

Expand All @@ -127,10 +135,16 @@ class DatastorePubsub extends Adapter {
unsubscribe (key) {
const stringifiedTopic = keyToTopic(key)

return this._pubsub.unsubscribe(stringifiedTopic, this._onMessage)
this._pubsub.removeListener(stringifiedTopic, this._onMessage)
return this._pubsub.unsubscribe(stringifiedTopic)
}

// Get record from local datastore
/**
* Get record from local datastore
*
* @private
* @param {Uint8Array} key
*/
async _getLocal (key) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
Expand Down Expand Up @@ -161,7 +175,11 @@ class DatastorePubsub extends Adapter {
return dsVal
}

// handles pubsub subscription messages
/**
* handles pubsub subscription messages
*
* @param {PubSubMessage} msg
*/
async _onMessage (msg) {
const { data, from, topicIDs } = msg
let key
Expand Down Expand Up @@ -200,7 +218,12 @@ class DatastorePubsub extends Adapter {
}
}

// Store the received record if it is better than the current stored
/**
* Store the received record if it is better than the current stored
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeIfSubscriptionIsBetter (key, data) {
let isBetter = false

Expand All @@ -217,20 +240,35 @@ class DatastorePubsub extends Adapter {
}
}

// Validate record according to the received validation function
/**
* Validate record according to the received validation function
*
* @param {Uint8Array} value
* @param {Uint8Array} peerId
*/
async _validateRecord (value, peerId) { // eslint-disable-line require-await
return this._validator.validate(value, peerId)
}

// Select the best record according to the received select function.
/**
* Select the best record according to the received select function
*
* @param {Uint8Array} receivedRecord
* @param {Uint8Array} currentRecord
*/
async _selectRecord (receivedRecord, currentRecord) {
const res = await this._validator.select(receivedRecord, currentRecord)

// If the selected was the first (0), it should be stored (true)
return res === 0
}

// Verify if the record received through pubsub is valid and better than the one currently stored
/**
* Verify if the record received through pubsub is valid and better than the one currently stored
*
* @param {Uint8Array} key
* @param {Uint8Array} val
*/
async _isBetter (key, val) {
// validate received record
let error, valid
Expand Down Expand Up @@ -261,64 +299,27 @@ class DatastorePubsub extends Adapter {
}

// if the same record, do not need to store
if (currentRecord.equals(val)) {
if (uint8ArrayEquals(currentRecord, val)) {
return false
}

// verify if the received record should replace the current one
return this._selectRecord(val, currentRecord)
}

// add record to datastore
/**
* add record to datastore
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeRecord (key, data) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)

await this._datastore.put(routingKey, data)
log(`record for ${keyToTopic(key)} was stored in the datastore`)
}

open () {
const errMsg = 'open function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

has (key) {
const errMsg = 'has function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

delete (key) {
const errMsg = 'delete function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

close () {
const errMsg = 'close function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

batch () {
const errMsg = 'batch function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

query () {
const errMsg = 'query function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}
}

exports = module.exports = DatastorePubsub
9 changes: 9 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

type ValidateFn = (record: Uint8Array, peerId: Uint8Array) => Promise<boolean> | boolean
type CompareFn = (received: Uint8Array, current: Uint8Array) => number
export type SubscriptionKeyFn = (key: Uint8Array) => Promise<Uint8Array> | Uint8Array

export interface Validator {
validate: ValidateFn,
select: CompareFn
}
33 changes: 27 additions & 6 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,42 @@ const errcode = require('err-code')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')

/**
* @typedef {import('interface-datastore').Key} Key
*/

const namespace = '/record/'

module.exports.encodeBase32 = (buf) => {
/**
* @param {Uint8Array} buf
*/
function encodeBase32 (buf) {
return uint8ArrayToString(buf, 'base32')
}

// converts a binary record key to a pubsub topic key.
module.exports.keyToTopic = (key) => {
/**
* converts a binary record key to a pubsub topic key
*
* @param {Uint8Array | string} key
*/
function keyToTopic (key) {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
// Encodes to "/record/base64url(key)"
if (typeof key === 'string' || key instanceof String) {
key = uint8ArrayFromString(key)
key = uint8ArrayFromString(key.toString())
}

const b64url = uint8ArrayToString(key, 'base64url')

return `${namespace}${b64url}`
}

// converts a pubsub topic key to a binary record key.
module.exports.topicToKey = (topic) => {
/**
* converts a pubsub topic key to a binary record key
*
* @param {string} topic
*/
function topicToKey (topic) {
if (topic.substring(0, namespace.length) !== namespace) {
throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE')
}
Expand All @@ -33,3 +48,9 @@ module.exports.topicToKey = (topic) => {

return uint8ArrayFromString(key, 'base64url')
}

module.exports = {
encodeBase32,
keyToTopic,
topicToKey
}
Loading