-
Notifications
You must be signed in to change notification settings - Fork 0
/
slither_server.py
284 lines (247 loc) · 10.5 KB
/
slither_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import logging
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from threading import Lock
from typing import Dict, List, Optional, Tuple, Type
from os.path import split
import lsprotocol.types as lsp
from crytic_compile.crytic_compile import CryticCompile
from pygls.lsp import METHOD_TO_OPTIONS
from pygls.protocol import LanguageServerProtocol
from pygls.server import LanguageServer
from slither import Slither
from slither.__main__ import (
_process as process_detectors_and_printers,
)
from slither.__main__ import (
get_detectors_and_printers,
)
from slither_lsp.app.feature_analyses.slither_diagnostics import SlitherDiagnostics
from slither_lsp.app.logging import LSPHandler
from slither_lsp.app.request_handlers import (
register_on_find_references,
register_on_get_incoming_calls,
register_on_get_outgoing_calls,
register_on_get_subtypes,
register_on_get_supertypes,
register_on_goto_definition,
register_on_goto_implementation,
register_on_prepare_call_hierarchy,
register_on_prepare_type_hierarchy,
)
from slither_lsp.app.types.analysis_structures import (
AnalysisResult,
SlitherDetectorResult,
SlitherDetectorSettings,
)
from slither_lsp.app.types.params import (
METHOD_TO_TYPES,
SLITHER_SET_DETECTOR_SETTINGS,
SLITHER_ANALYZE,
AnalysisRequestParams,
)
from slither_lsp.app.utils.file_paths import normalize_uri, uri_to_fs_path
# TODO(frabert): Maybe this should be upstreamed? https://github.com/openlawlibrary/pygls/discussions/338
METHOD_TO_OPTIONS[lsp.WORKSPACE_DID_CHANGE_WATCHED_FILES] = (
lsp.DidChangeWatchedFilesRegistrationOptions
)
class SlitherProtocol(LanguageServerProtocol):
# See https://github.com/openlawlibrary/pygls/discussions/441
@lru_cache
def get_message_type(self, method: str) -> Optional[Type]:
return METHOD_TO_TYPES.get(method, (None,))[0] or super().get_message_type(
method
)
@lru_cache
def get_result_type(self, method: str) -> Optional[Type]:
return METHOD_TO_TYPES.get(method, (None, None))[1] or super().get_result_type(
method
)
class SlitherServer(LanguageServer):
_logger: logging.Logger
_init_params: Optional[lsp.InitializeParams] = None
# Define our workspace parameters.
workspaces: Dict[str, AnalysisResult] = {}
# `workspace_in_progress[uri]` is locked if there's a compilation in progress for the workspace `uri`
workspace_in_progress: Dict[str, Lock] = defaultdict(Lock)
@property
def analyses(self) -> List[AnalysisResult]:
return list(self.workspaces.values())
# Define our slither diagnostics provider
detector_settings: SlitherDetectorSettings = SlitherDetectorSettings(
enabled=True, hidden_checks=[]
)
analysis_pool = ThreadPoolExecutor()
def __init__(self, logger: logging.Logger, *args):
super().__init__(protocol_cls=SlitherProtocol, *args)
self._logger = logger
self._logger.addHandler(LSPHandler(self))
self.slither_diagnostics = SlitherDiagnostics(self)
@self.feature(lsp.INITIALIZE)
def on_initialize(ls: SlitherServer, params):
ls._on_initialize(params)
@self.feature(lsp.INITIALIZED)
def on_initialized(ls: SlitherServer, params):
ls.show_message("slither-lsp initialized", lsp.MessageType.Debug)
@self.thread()
@self.feature(lsp.WORKSPACE_DID_CHANGE_WORKSPACE_FOLDERS)
def on_did_change_workspace_folder(ls: SlitherServer, params):
ls._on_did_change_workspace_folders(params)
@self.thread()
@self.feature(SLITHER_SET_DETECTOR_SETTINGS)
def on_set_detector_settings(ls: SlitherServer, params):
ls._on_set_detector_settings(params)
@self.thread()
@self.feature(SLITHER_ANALYZE)
def on_analyze(ls: SlitherServer, params):
ls._on_analyze(params)
register_on_goto_definition(self)
register_on_goto_implementation(self)
register_on_find_references(self)
register_on_prepare_call_hierarchy(self)
register_on_get_incoming_calls(self)
register_on_get_outgoing_calls(self)
register_on_prepare_type_hierarchy(self)
register_on_get_subtypes(self)
register_on_get_supertypes(self)
@property
def workspace_opened(self):
"""
If True, indicates a workspace folder has been opened.
If False, no workspace folder is opened and files in opened tabs will be targeted.
:return: None
"""
return len(self.workspaces.items()) > 0
def _on_initialize(self, params: lsp.InitializeParams) -> None:
"""
Sets initial data when the server is spun up, such as workspace folders.
:param params: The client's initialization parameters.
:param result: The server response to the client's initialization parameters.
:return: None
"""
# Set our workspace folder on initialization.
self._init_params = params
for workspace in params.workspace_folders or []:
self.queue_compile_workspace(normalize_uri(workspace.uri))
def _on_analyze(self, params: AnalysisRequestParams):
uris = [normalize_uri(uri) for uri in params.uris or self.workspaces.keys()]
for uri in uris:
path = uri_to_fs_path(uri)
workspace_name = split(path)[1]
if self.workspace_in_progress[uri].locked():
self.show_message(
f"Analysis for {workspace_name} is already in progress",
lsp.MessageType.Warning,
)
continue
self.queue_compile_workspace(uri)
def queue_compile_workspace(self, uri: str):
"""
Queues a workspace for compilation. `uri` should be normalized
"""
path = uri_to_fs_path(uri)
workspace_name = split(path)[1]
def compile():
detector_classes, _ = get_detectors_and_printers()
with self.workspace_in_progress[uri]:
self.show_message(
f"Compilation for {workspace_name} has started",
lsp.MessageType.Info,
)
try:
compilation = CryticCompile(path)
analysis = Slither(compilation)
_, detector_results, _, _ = process_detectors_and_printers(
analysis, detector_classes, []
)
# Parse detector results
if detector_results is not None and isinstance(
detector_results, list
):
detector_results = [
SlitherDetectorResult.from_dict(detector_result)
for detector_result in detector_results
]
else:
detector_results = None
analyzed_successfully = True
analysis_error = None
self.show_message(
f"Compilation for {workspace_name} has completed successfully",
lsp.MessageType.Info,
)
except Exception as err:
# If we encounter an error, set our status.
analysis = None
compilation = None
analyzed_successfully = False
analysis_error = err
detector_results = None
self.show_message(
f"Compilation for {workspace_name} has failed. See log for details.",
lsp.MessageType.Info,
)
self._logger.log(
logging.ERROR, "Compiling %s has failed: %s", path, err
)
self.workspaces[uri] = AnalysisResult(
succeeded=analyzed_successfully,
compilation=compilation,
analysis=analysis,
error=analysis_error,
detector_results=detector_results,
)
self._refresh_detector_output()
self.analysis_pool.submit(compile)
def _on_did_change_workspace_folders(
self, params: lsp.DidChangeWorkspaceFoldersParams
) -> None:
"""
Applies client-reported changes to the workspace folders.
:param params: The client's workspace change message parameters.
:return: None
"""
for added in params.event.added:
uri = normalize_uri(added.uri)
if not self.workspace_in_progress[uri].locked():
self.queue_compile_workspace(uri)
for removed in params.event.removed:
uri = normalize_uri(removed.uri)
with self.workspace_in_progress[uri]:
self.workspaces.pop(uri, None)
def _on_set_detector_settings(self, params: SlitherDetectorSettings) -> None:
"""
Sets the detector settings for the workspace, indicating how detector output should be presented.
:param params: The parameters provided for the set detector settings request.
:return: None
"""
# If our detector settings are not different than existing ones, we do not need to trigger any on-change events.
if params == self.detector_settings:
return
# Set our detector settings
self.detector_settings = params
# Refresh our detector output
self._refresh_detector_output()
def _refresh_detector_output(self):
"""
Refreshes language server state given new analyses output or detector settings.
:return: None
"""
# Update our diagnostics with new detector output.
self.slither_diagnostics.update(self.analyses, self.detector_settings)
def get_analyses_containing(
self, filename: str
) -> List[Tuple[Slither, CryticCompile]]:
def lookup(comp: CryticCompile):
try:
return comp.filename_lookup(filename)
except ValueError:
return None
return [
(analysis_result.analysis, analysis_result.compilation)
for analysis_result in self.analyses
if analysis_result.analysis is not None
and analysis_result.compilation is not None
and lookup(analysis_result.compilation) is not None
]