Skip to content

Commit

Permalink
Merge pull request #103 from cabal-club/channel-archiving
Browse files Browse the repository at this point in the history
Implement channel archiving
  • Loading branch information
cblgh authored Mar 6, 2021
2 parents 19d60c7 + 2e35f26 commit ad4f750
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
6 changes: 6 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var createMessagesView = require('./views/messages')
var createTopicsView = require('./views/topics')
var createUsersView = require('./views/users')
var createModerationView = require('./views/moderation')
var createArchivingView = require('./views/channel-archiving')
var swarm = require('./swarm')

var DATABASE_VERSION = 1
Expand All @@ -22,6 +23,7 @@ var TOPICS = 't'
var USERS = 'u'
var MODERATION_AUTH = 'mx'
var MODERATION_INFO = 'my'
var ARCHIVES = 'a'

module.exports = Cabal
module.exports.databaseVersion = DATABASE_VERSION
Expand Down Expand Up @@ -96,13 +98,17 @@ function Cabal (storage, key, opts) {
sublevel(this.db, MODERATION_AUTH, { valueEncoding: json }),
sublevel(this.db, MODERATION_INFO, { valueEncoding: json })
))
this.kcore.use('archives', createArchivingView(
this,
sublevel(this.db, ARCHIVES, { valueEncoding: json })))

this.messages = this.kcore.api.messages
this.channels = this.kcore.api.channels
this.memberships = this.kcore.api.memberships
this.topics = this.kcore.api.topics
this.users = this.kcore.api.users
this.moderation = this.kcore.api.moderation
this.archives = this.kcore.api.archives
}

inherits(Cabal, events.EventEmitter)
Expand Down
152 changes: 152 additions & 0 deletions views/channel-archiving.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
const EventEmitter = require('events').EventEmitter
const pump = require('pump')
const Writable = require('readable-stream').Writable
const makeView = require('kappa-view')

/* view data structure
1. archive!<channelname>: '<pubkey of archivist>@<sequence of message for archivist's feed>'
example:
archive!default: 'feed..b01@1337'
2. unarchive!<channelname>: '<pubkey of archivist>@<sequence of message for archivist's feed>'
example:
archive!default: 'feed..b01@1337'
message schema:
{
key: <pubkey of archivist>,
seq: <sequence of archive action in log>,
value: {
type: 'channel/archive' or 'channel/unarchive',
content: {
channel: <channel>,
reason: <optional string reason for archiving || ''>
}
}
}
*/

function getAuthorizedKeys (kcore, cb) {
return Promise.all([getKeys('admin'), getKeys('mod')]).then(res => cb(res[0].concat(res[1]).map(row => row.id)))

// collect pubkeys for cabal-wide mods or admins
function getKeys (flag) {
return new Promise((resolve, reject) => {
function processResult (err, result) {
if (err) return resolve([])
resolve(result)
}
kcore.api.moderation.listByFlag({ flag, channel: '@' }, processResult)
})
}
}

module.exports = function (cabal, lvl) {
var events = new EventEmitter()

return makeView(lvl, function (db) {
return {
maxBatch: 500,
map: function (msgs, next) {
// 1. go over each msg
// 2. check if it's an archive/unarchive msg (skip if not)
// 3. accumulate a level PUT for message type, level DEL for opposite
// e.g. if channel/archive -> PUT archive:<channel>, DEL unarchive:<channel>
// 4. write it all to leveldb as a batch
const ops = []
msgs.forEach(function (msg) {
if (!sanitize(msg)) { return }
const channel = msg.value.content.channel
const reason = msg.value.content.reason || ''
const key = msg.key
if (/^channel\/(un)?archive$/.test(msg.value.type)) {
const activeType = (msg.value.type === "channel/archive") ? "archive" : "unarchive"
const oppositeType = (activeType === "archive") ? "unarchive" : "archive"

ops.push({
type: 'put',
key: `${activeType}!${channel}!${key}`,
value: `${key}@${msg.seq}`
})

ops.push({
type: 'del',
key: `${oppositeType}!${channel}!${key}`
})

events.emit(activeType, channel, reason, key)
}
})
if (ops.length) db.batch(ops, next)
else next()
},
api: {
// get the list of channels explicitly unarchived by key, typically the local user
getUnarchived: function (core, peerkey, cb) {
this.ready(function () {
const channels = []
db.createKeyStream({
gt: 'unarchive!!',
lt: 'unarchive!~'
})
.on('data', function (row) {
// structure of `row`: unarchive!<channel>!<key>
const [_, channel, key] = row.split('!') // drop 'unarchive' when splitting on !
if (key === peerkey) { channels.push(channel) }
})
.once('end', function () {
cb(null, channels)
})
.once('error', cb)
})
},
// get the list of currently archived channels
get: function (core, cb) {
this.ready(() => {
// query mod view to determine if archiving will be applied locally (archiver is a mod or an admin)
getAuthorizedKeys(core, authorizedKeys => {
const channels = []
db.createKeyStream({
gt: 'archive!!',
lt: 'archive!~'
})
.on('data', (row) => {
// structure of `row`: archive!<channel>!<key>
const [_, channel, key] = row.split('!') // drop 'archive' when splitting on !
if (authorizedKeys.indexOf(key) >= 0) { channels.push(channel) }
})
.once('end', () => {
// the local user's **unarchived** channels take precedence over other's archived channels
cabal.getLocalKey((err, localKey) => {
if (err) return cb(null, channels)
core.api.archives.getUnarchived(localKey, (err, unarchivedChannels) => {
unarchivedChannels = unarchivedChannels || []
unarchivedChannels.forEach(ch => {
const i = channels.indexOf(ch)
// if the local user has unarchived a previously archived channel:
// remove the archived channel from the result
if (i >= 0) { channels.splice(i, 1) }
})
cb(null, channels)
})
})
})
.once('error', cb)
})
})
},
events: events
}
}
})
}

// Either returns a well-formed chat message, or null.
function sanitize (msg) {
if (typeof msg !== 'object') return null
if (typeof msg.value !== 'object') return null
if (typeof msg.value.content !== 'object') return null
if (typeof msg.value.timestamp !== 'number') return null
if (typeof msg.value.type !== 'string') return null
if (typeof msg.value.content.channel !== 'string') return null
return msg
}

0 comments on commit ad4f750

Please sign in to comment.