Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow log to pack raw nodes #1213

Merged
merged 4 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Features and Enhancements
- Add POST support to the ``/api/v1/model/norm`` HTTP API endpoint. (`#1207 <https://github.com/vertexproject/synapse/pull/1207>`_)
- Add ``getPropNorm()`` and ``getTypeNorm()`` Telepath API endpoints to the Cortex and CoreApi. (`#1207 <https://github.com/vertexproject/synapse/pull/1207>`_)
- Add list ``length()`` and ``index()`` methods to Storm types. (`#1208 <https://github.com/vertexproject/synapse/pull/1208>`_)
- Add ``--nodes-only`` to the Cmdr ``log`` command to only record raw nodes. (`#1213 <https://github.com/vertexproject/synapse/pull/1213>`_)

Bugfixes
--------
Expand Down
101 changes: 57 additions & 44 deletions synapse/cmds/cortex.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import queue
import shlex
import pprint
import logging

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.cli as s_cli
import synapse.lib.cmd as s_cmd
import synapse.lib.node as s_node
import synapse.lib.time as s_time
import synapse.lib.msgpack as s_msgpack
Expand All @@ -15,50 +17,34 @@


class Log(s_cli.Cmd):
'''
Add a storm log to the local command session.

Syntax:
log (--on|--off) [--splices-only] [--format (mpk|jsonl)] [--path /path/to/file]

Required Arguments:
--on: Enables logging of storm messages to a file.
--off: Disables message logging and closes the current storm file.

Optional Arguments:
--splices-only: Only records splices. Does not record any other messages.
--format: The format used to save messages to disk. Defaults to msgpack (mpk).
--path: The path to the log file. This will append messages to a existing file.
'''Add a storm log to the local command session.

Notes:
By default, the log file contains all messages received from the execution of
a Storm query by the current CLI. By default, these messages are saved to a
file located in ~/.syn/stormlogs/storm_(date).(format).
Notes:
By default, the log file contains all messages received from the execution of
a Storm query by the current CLI. By default, these messages are saved to a
file located in ~/.syn/stormlogs/storm_(date).(format).

Examples:
# Enable logging all messages to mpk files (default)
log --on
Examples:
# Enable logging all messages to mpk files (default)
log --on

# Disable logging and close the current file
log --off
# Disable logging and close the current file
log --off

# Enable logging, but only log splices. Log them as jsonl instead of mpk.
log --on --splices-only --format jsonl
# Enable logging, but only log splices. Log them as jsonl instead of mpk.
log --on --splices-only --format jsonl

# Enable logging, but log to a custom path:
log --on --path /my/aweome/log/directory/storm20010203.mpk
# Enable logging, but log to a custom path:
log --on --path /my/aweome/log/directory/storm20010203.mpk

# Log only the node messages which come back from a storm cmd execution.
log --on --nodes-only --path /my/awesome/log/directory/stormnodes20010203.mpk
'''
_cmd_name = 'log'
_cmd_syntax = (
('--on', {'type': 'flag'}),
('--off', {'type': 'flag'}),
('--path', {'type': 'valu'}),
('--format', {'type': 'enum',
'defval': 'mpk',
'enum:vals': ['mpk', 'jsonl']}),
('--splices-only', {'defval': False})
_cmd_syntax = ( # type: ignore
('line', {'type': 'glob'}),
)

splicetypes = (
'tag:add',
'tag:del',
Expand All @@ -68,6 +54,25 @@ class Log(s_cli.Cmd):
'prop:del',
)

def _make_argparser(self):

parser = s_cmd.Parser(prog='log', outp=self, description=self.__doc__)
muxp = parser.add_mutually_exclusive_group(required=True)
muxp.add_argument('--on', action='store_true', default=False,
help='Enables logging of storm messages to a file.')
muxp.add_argument('--off', action='store_true', default=False,
help='Disables message logging and closes the current storm file.')
parser.add_argument('--format', choices=('mpk', 'jsonl'), default='mpk', type=str.lower,
help='The format used to save messages to disk. Defaults to msgpack (mpk).')
parser.add_argument('--path', type=str, default=None,
help='The path to the log file. This will append messages to a existing file.')
optmux = parser.add_mutually_exclusive_group()
optmux.add_argument('--splices-only', action='store_true', default=False,
help='Only records splices. Does not record any other messages.')
optmux.add_argument('--nodes-only', action='store_true', default=False,
help='Only record the packed nodes returned by storm.')
return parser

def __init__(self, cli, **opts):
s_cli.Cmd.__init__(self, cli, **opts)
# Give ourselves a local ref to locs since we're stateful.
Expand All @@ -93,9 +98,14 @@ def queueLoop(self):
def save(self, mesg):
fd = self.locs.get('log:fd')
spliceonly = self.locs.get('log:splicesonly')
nodesonly = self.locs.get('log:nodesonly')
if fd and not fd.closed:
if spliceonly and mesg[0] not in self.splicetypes:
return
if nodesonly:
if mesg[0] != 'node':
return
mesg = mesg[1]
try:
buf = self.encodeMsg(mesg)
except Exception as e:
Expand Down Expand Up @@ -144,9 +154,10 @@ def openLogFd(self, opts):
if opath:
self.printf('Must call --off to disable current file before starting a new file.')
return
fmt = opts.get('format')
path = opts.get('path')
splice_only = opts.get('splices-only')
fmt = opts.format
path = opts.path
nodes_only = opts.nodes_only
splice_only = opts.splices_only
if not path:
ts = s_time.repr(s_common.now(), True)
fn = f'storm_{ts}.{fmt}'
Expand All @@ -161,21 +172,23 @@ def openLogFd(self, opts):
self.locs['log:fmt'] = fmt
self.locs['log:queue'] = q
self.locs['log:thr'] = self.queueLoop()
self.locs['log:nodesonly'] = nodes_only
self.locs['log:splicesonly'] = splice_only
self._cmd_cli.on('storm:mesg', self.onStormMesg)

async def runCmdOpts(self, opts):
on = opts.get('on')
off = opts.get('off')

if bool(on) == bool(off):
self.printf('Pick one of "--on" or "--off".')
line = opts.get('line', '')

try:
opts = self._make_argparser().parse_args(shlex.split(line))
except s_exc.ParserExit as e:
return

if on:
if opts.on:
return self.openLogFd(opts)

if off:
if opts.off:
return self.closeLogFd()

class PsCmd(s_cli.Cmd):
Expand Down
27 changes: 25 additions & 2 deletions synapse/tests/test_cmds_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,40 @@ async def test_log(self):
objs = list(genr)
self.eq(objs[0][0], 'node:add')

outp = self.getTestOutp()
cmdr = await s_cmdr.getItemCmdr(core, outp=outp)
# Our default format is mpk
fp = os.path.join(dirn, 'loggyMcNodeFace.mpk')
await cmdr.runCmdLine(f'log --on --nodes-only --path {fp}')
fp = cmdr.locs.get('log:fp')
await cmdr.runCmdLine('storm [test:str="I am a message!" :tick=1999 +#oh.my] ')
await cmdr.runCmdLine('log --off')
await cmdr.fini()

self.true(os.path.isfile(fp))
with s_common.genfile(fp) as fd:
genr = s_encoding.iterdata(fd, close_fd=False, format='mpk')
objs = list(genr)
self.eq(objs[0][0], ('test:str', 'I am a message!'))

outp = self.getTestOutp()
cmdr = await s_cmdr.getItemCmdr(core, outp=outp)
await cmdr.runCmdLine('log --on --off')
await cmdr.fini()
self.true(outp.expect('Pick one'))
self.true(outp.expect('log: error: argument --off: not allowed with argument --on'))

outp = self.getTestOutp()
cmdr = await s_cmdr.getItemCmdr(core, outp=outp)
await cmdr.runCmdLine('log')
await cmdr.fini()
self.true(outp.expect('Pick one'))
self.true(outp.expect('log: error: one of the arguments --on --off is required'))

outp = self.getTestOutp()
cmdr = await s_cmdr.getItemCmdr(core, outp=outp)
await cmdr.runCmdLine('log --on --splices-only --nodes-only')
await cmdr.fini()
e = 'log: error: argument --nodes-only: not allowed with argument --splices-only'
self.true(outp.expect(e))

async def test_ps_kill(self):

Expand Down