Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: --fail-fast argument for dbt run and dbt test #1649

Closed
JasonGluck opened this issue Jul 31, 2019 · 11 comments · Fixed by #2224
Closed

Feature Request: --fail-fast argument for dbt run and dbt test #1649

JasonGluck opened this issue Jul 31, 2019 · 11 comments · Fixed by #2224
Labels
enhancement New feature or request good_first_issue Straightforward + self-contained changes, good for new contributors!

Comments

@JasonGluck
Copy link
Contributor

Feature

--fail-fast argument for dbt run and dbt test

Feature description

An optional argument, --fail-fast, for dbt run and dbt test which will stop model builds or test runs upon a single failure. It may also be worth adding a parameter which will define the amount of failures to allow before stopping.

Who will this benefit?

All dbt users. Will be useful during the development process. Can also help reduce the amount of queries to the database.

Questions

How should this work with test severity?

@drewbanin
Copy link
Contributor

Thanks @JasonGluck! I think we might have built this at a hackathon once, but I don't think the code ever made it into prod :/

I think the operative line of code is right here. I imagine we could add some logic there which consulted the value of the --fail-fast flag. If --fail-fast is provided, then dbt should "raise" regardless of the value supplied in raise_on_first_error().

Is this something you'd be interested in contributing a PR for?

@drewbanin drewbanin added enhancement New feature or request good_first_issue Straightforward + self-contained changes, good for new contributors! labels Jul 31, 2019
@drewbanin
Copy link
Contributor

Dupe of #871 - closing that one.

@jars
Copy link

jars commented Sep 10, 2019

This would save our team a lot of time!

@Raalsky
Copy link
Contributor

Raalsky commented Mar 15, 2020

I'm currently thinking about the behavior of GraphRunnableTask.call_runner in case of dbt test and modifying it with fail-fast based on discussion in #1675 (and setting self._raise_next_tick based on result.fail). Let me know if I'm wrong with that but it seems that during testing the job_queue structure is flat (as expected) with no dependents between nodes. But if we're waiting for self._raise_next_tick to be considered (throw _raise_set_error()) it will be called only during submitting tasks to threads pool (before of tests execution) and after the end of all tasks (tests) execution in. So we currently changing nothing than disabling summary list after the execution.

Now I'm looking for place where we can gentle raise in-between testing. Of course I really appreciate any help with that 🍺

My current state of work could be found at: raalsky/fail-fast

I'm also thinking about testing these parts of codebase in some sort of providing a desired ordered list of nodes to job_queue (which is really brutal in some sense 🌵 ).

Of course If @sjwhitworth is still working on it just let me know 😉

@drewbanin
Copy link
Contributor

drewbanin commented Mar 18, 2020

hey @Raalsky - this is a really good point! Tagging @beckjake to advise -- I'm definitely seeing the behavior you're describing when I run your branch locally :)

Jake, what's the best way to bubble up test failures to fail-fast? Or, should we reconsider how we feed test nodes into the runner?

@beckjake
Copy link
Contributor

What a great point! That's definitely a hole in the process. I think the key issue is using job_queue.join() which ultimately does this on a queue.PriorityQueue:

with self.all_tasks_done:
    while self.unfinished_tasks:
        self.all_tasks_done.wait()

We could do this (but probably abstract it so we aren't reaching so far into internals):

with self.job_queue.inner.all_tasks_done:
    while self.job_queue.inner.unfinished_tasks:
        self._raise_next_tick()
        self.job_queue.inner.all_tasks_done.wait()

I would probably do that by writing a join_with(self, func) method inside the graph queue code and calling it like self.job_queue.join_with(self._raise_next_tick). It's going to be a bit messy no matter what, I suspect.

@Raalsky
Copy link
Contributor

Raalsky commented Mar 18, 2020

Hi @drewbanin and @beckjake
I much appreciate your suggestions 🚀 . I'll be back in a few days with updates.

@Raalsky
Copy link
Contributor

Raalsky commented Mar 19, 2020

@beckjake
Based on implementation of PriorityQueue (Queue) I think the reformat of job_queue.join() might be insufficient. In the following:

with self.job_queue.inner.all_tasks_done:
    while self.job_queue.inner.unfinished_tasks:
        tell_the_world_is_ending()
        self.job_queue.inner.all_tasks_done.wait()

I think the tell_the_world_is_ending() will be called only once during dbt test. The reason is located in implementation of job_queue.task_done() which is called at the end of execution of every task (test):

    def task_done(self):
        '''Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        '''
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

We're blocking on self.job_queue.inner.all_tasks_done.wait() which will be called by task_done() throw self.all_tasks_done.notify_all() (as it's simple Mutex) only when there is noting to process actually (so basically after the end of all tests execution).

I think we should consider also rewrite of task_done() with some mechanism of notify_after_something_was_done in addition to .join().
OR
Mixing the task submitting part with some sort of wait (maybe based on number of threads). Of course it's only direction:

        while not nothing_to_be_processed:
            node = self.job_queue.get()
            self._raise_set_error()
            runner = self.get_runner(node)
            ...
            self._submit(pool, args, callback)
            # Only an approximation of queue size!
            if self.job_queue.qsize() >= self.config.threads:
                 self.job_queue.wait_until_something_was_done()

@Raalsky
Copy link
Contributor

Raalsky commented Mar 19, 2020

Keep in mind it's not only about dbt test but also in dbt run when we have a bunch of independent tasks (aka. large "layer") for example transforming sources etc.

@Raalsky
Copy link
Contributor

Raalsky commented Mar 19, 2020

I've pushed an MVP to Raalsky/fail-fast (need a lot in carrying about concurrency 😭 ) but it works pretty close to what I've expected.

@beckjake
Copy link
Contributor

Oh, I didn't realize that task_done only calls notify_all when there are no unfinished tasks, I thought it was every time. That's a bummer! I looked through your branch and I like your changes, that seems like a solid solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good_first_issue Straightforward + self-contained changes, good for new contributors!
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants