-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Elasticsearch bulk helper closing splitter stream #140
Comments
@delvedor could you take a look? |
What should be the default behavior in this case? Are you ok in dropping logs? Or should this be kept in memory? |
The bulk request should just drop the logs in my opinion, as that request has exceeded it's max retry count. The issue is that once all nodes in the connection pool are declared DEAD the stream will be disposed/destroyed and pino-elasticsearch will not continue, ever. I've found a hacky solution to this problem: I'll fetch my laptop and try to make a PR so you can see what I mean |
Hello! Even if all the nodes are declared dead, the client will still try to send requests against the originally configured list of nodes.
I'm not sure what you mean here. Is the helper throwing any error?
If the maxRetries is reached, the The bulk helper continues to work as long as the input stream does not error (meaning the |
Thank you @delvedor but in my case that's not what's happening really. The splitterstream disposes after a document is dropped and the max retries are done plus all nodes being down. |
I've rewritten the lib.js file to suit our needs. 'use strict'
/* eslint no-prototype-builtins: 0 */
const split = require('split2')
const { Client, Connection } = require('@elastic/elasticsearch')
function initializeBulkHandler(opts, client, splitter) {
const esVersion = Number(opts['es-version']) || 7
const index = opts.index || 'pino'
const buildIndexName = typeof index === 'function' ? index : null
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
const opType = esVersion >= 7 ? opts.op_type : undefined
const b = client.helpers.bulk({
datasource: splitter,
flushBytes: opts['flush-bytes'] || 1000,
flushInterval: opts['flush-interval'] || 30000,
refreshOnCompletion: getIndexName(),
onDocument (doc) {
const date = doc.time || doc['@timestamp']
if (opType === 'create') {
doc['@timestamp'] = date
}
return {
index: {
_index: getIndexName(date),
_type: type,
op_type: opType
}
}
},
onDrop (doc) {
const error = new Error('Dropped document')
error.document = doc
splitter.emit('insertError', error)
}
})
b.then(
(stats) => splitter.emit('insert', stats),
(err) => splitter.emit('error', err)
)
// Reset the ondestroy
splitter.destroy = function (err) {
client.connectionPool.resurrect({ name: 'elasticsearch-js', requestId: '696969' })
initializeBulkHandler(opts, client, splitter)
}
function getIndexName (time = new Date().toISOString()) {
if (buildIndexName) {
return buildIndexName(time)
}
return index.replace('%{DATE}', time.substring(0, 10))
}
}
function pinoElasticSearch (opts) {
if (opts['bulk-size']) {
process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead')
delete opts['bulk-size']
}
const splitter = split(function (line) {
let value
try {
value = JSON.parse(line)
} catch (error) {
this.emit('unknown', line, error)
return
}
if (typeof value === 'boolean') {
this.emit('unknown', line, 'Boolean value ignored')
return
}
if (value === null) {
this.emit('unknown', line, 'Null value ignored')
return
}
if (typeof value !== 'object') {
value = {
data: value,
time: setDateTimeString(value)
}
} else {
if (value['@timestamp'] === undefined) {
value.time = setDateTimeString(value)
}
}
function setDateTimeString (value) {
if (typeof value === 'object' && value.hasOwnProperty('time')) {
if (
(typeof value.time === 'string' && value.time.length) ||
(typeof value.time === 'number' && value.time >= 0)
) {
return new Date(value.time).toISOString()
}
}
return new Date().toISOString()
}
return value
}, { autoDestroy: true })
const client = new Client({
node: opts.node,
auth: opts.auth,
cloud: opts.cloud,
ssl: { rejectUnauthorized: opts.rejectUnauthorized },
Connection: opts.Connection || Connection
})
initializeBulkHandler(opts, client, splitter)
return splitter
}
module.exports = pinoElasticSearch |
I must add I am only using a single node in this setup. So maybe this is only occurring when the pool only has 1 single endpoint |
Currently we're experiencing issues of logging completely
stopping completely
after our elastic ingest node is rebooting.After 2 days of debugging and testing I've concluded the splitter transform stream is getting destroyed by the async iterator in the
bulk helper
of theelasticsearch-js
library.So the cause:
pino-elasticsearch
uses thebulk helper
fromelasticsearch-js
, once the maxRetries are exceeded and all nodes are declared asDEAD
, the stream will be disposed because of the bulk helper'sfor await
on the splitter.I've been trying to create a patch PR, but my knowledge of JS streams is next to none.
This splitter's Transport stream is being used as the main stream for pino so once that one's disposed the logs will cease to continue even if the nodes come back up
The text was updated successfully, but these errors were encountered: