-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: async rpc (#1706) #1735
Conversation
@@ -20,7 +20,7 @@ | |||
DEFAULT_THREADS = 1 | |||
DEFAULT_PROFILES_DIR = os.path.join(os.path.expanduser('~'), '.dbt') | |||
PROFILES_DIR = os.path.expanduser( | |||
os.environ.get('DBT_PROFILES_DIR', DEFAULT_PROFILES_DIR) | |||
os.getenv('DBT_PROFILES_DIR', DEFAULT_PROFILES_DIR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed all os.environ.get
-> os.getenv
for grep-ability reasons when I was adding some secret magic threading-related flags to the RPC server.
return iter(self.results) | ||
|
||
def __getitem__(self, idx): | ||
return self.results[idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes make results a lot like the old thing where we returned result.results
, which meant I didn't have to update a million tests. Also, it just seemed like a nice thing.
15596b9
to
d4fe9b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just.... chef's kiss 👨🍳
I need to do another pass or two through the code here, but I found some minor things that are worth tweaking while smoke testing some requests.
- it looks like some types of request validation errors only show up in the
poll
method, but I would expect them to fail at request-time. One such example: submitting arun
task without specifying a name. I'd expect the request to fail, but it's accepted and the failure is only apparent when wepoll
for the request ("handle_request() missing 1 required positional argument: 'name'"
). Is this a minor change, or is there a good reason why it should work the way it's implemented here? - There are some superfluous log lines that we should suppress:
- lot's of Snowflake OCSP Responder noise (
This connection is in OCSP Fail Open Mode. TLS Certificates would be checked...
) [channel=snowflake.connector.connection
,snowflake.connector.cursor
,snowflake.connector.json_result
] - some boto3-looking lines: (
Starting new HTTPS connection (1): ...
) [channel=botocore.vendored.requests.packages.urllib3.connectionpool
]
- lot's of Snowflake OCSP Responder noise (
- the
elapsed
time inps
shows up as negative -- I see values like-14349.432432
. Would that also impact garbage collection / task reaping? - I'm seeing a 500 error when I hit the
poll
method for one particular project. I'm not sure exactly what's going wrong, but I can investigate further. This happens forrun_project
tasks, but notrun
tasks. I think this has something to do with control characters in the log lines. Some of those are json-serializable, but others are not? This one in particular causes problems:
{
"timestamp": "2019-09-11T02:09:43.924055Z",
"message": "�[33mWARNING: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.\nThere are 1 unused configuration paths:\n- models.client.attribution.transform\n�[0m",
...
^ this warning occurs during compilation, but it appears to be served up correctly by calls to status
. I think this has something to do with logging because a poll
request for this task will succeed unless I specify logs: true
while the task is running. When the task completes, poll
ing the task appears to always result in a 500 error. I saw this on BigQuery FWIW, but it may be project-specific. Thoughts: might be worth turning off colorama for structured logs; would also be good to use a big-ol try-catch so we knew where this 500 was coming from.
Let me have a think about garbage collection / reaping / the gc endpoint / etc. Will follow up here with some additional thoughts. Overall though, you did a really incredible job here @beckjake :)
def _task_bootstrap(task, queue, kwargs): | ||
def _task_bootstrap( | ||
task: RPCTask, | ||
queue, # typing: Queue[Tuple[QueueMessageType, Any]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between this and the other type hints?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very annoying one. Basically, the built-in stdlib queue.Queue
doesn't have any type-related information attached to it. So typeshed
, which is included in mypy
, provides its own queue.Queue
for mypy
to use that does support type information. It's all very silly, as it means that Queue[...]
isn't valid syntax so I can't use the name: Type
syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
roger! Thanks for the explanation
It's not a minor change really, as tasks define their arguments in |
Oops, this is a timezone bug! I think it would, yes. |
I can't reproduce this issue. I made a project that emits the same warning by adding a dummy dbt_project.yml line, and then did |
Ok! Let's do that in this PR |
17da8ac
to
af5e227
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM once the tests are passing! The new validation code works great :)
Add __len__, __iter__, and __getitem__ to ExecutionResult/FreshnessResult - tests and some internal callers are now happy without modifying them
Also disable RPC tests on windows managing windows idiosyncrasies is just too much
Pass werkzeug logging through suppress all non-dbt logs in the RPC response filter out all non-dbt/werkzeug logs below WARNING (dbt and dbt rpc) lower all remaining non-dbt/werkzeug logs to DEBUG (dbt and dbt rpc) - attach old_level in extras if we do Handle some more protocol exceptions gracefully
If the rpc server gets a non-serializable response component, call str() on it
b1a89e4
to
2cf2c82
Compare
Fixes #1706
Implement async tasks, mostly as described in #1706, with one big exception: On errors, an error object is returned directly instead of
"status": "error"
- it was just much easier to implement that way.All the identified async methods now return a
request_token
instead of their results.Adds a
poll
method that takes one mandatory argument, therequest_token
, and returns aPollResult
on success, or the same error as before if the polled task got an error.PollResult
always has astatus
field, which can be one of['not started', 'initializing', 'running', 'success']
success
, it also comes with request-type-specific result data.By default, garbage collection is checked on every json-rpc method:
maxsize
defaults to 1000. If there are more thanmaxsize
tasks, the oldest (number_of_tasks
-maxsize
) that have finished are cleaned upauto_reap_age
defaults to 30 days.reapsize
defaults to 500. If there are more thanreapsize
tasks, all tasks that are older than the currentauto_reap_age
are garbage collectedAll those numbers are totally invented by me and are very much arbitrary.
I also added a
gc
method that forces garbage collection and allows you to tweak GC settings:task_ids
parameter, defaults to None and is a list of task IDs to garbage collect directlybefore
parameter, defaults to None and is a datestamp. All tasks that finished after the datestamp will be garbage collectedsettings
parameter, defaults to None and accepts an object with the below schema. This schema sets the internal GC tuning values.GCSettings schema: