Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: replace Start() and Stop() with Run(context.Context) #120

Closed
db7 opened this issue Apr 4, 2018 · 11 comments
Closed

Proposal: replace Start() and Stop() with Run(context.Context) #120

db7 opened this issue Apr 4, 2018 · 11 comments
Assignees

Comments

@db7
Copy link
Collaborator

db7 commented Apr 4, 2018

Currently the Start() method of processors and views use errgroup to create goroutines for each partition and passes a context.Context to them. The Stop() method simply cancels the context to stop all goroutines.

I think it would be nice to use the same mechanism to control multiple processors and views running on the same program. For that I'd propose to replace the Start() and Stop() methods with a Run(context.Context) method. The usage would be something like this:

// create the context and cancel function
ctx, cancel := context.WithCancel(context.Background())

// create error group
g, ctx := errgroup.WithContext(ctx)

// create processors and views
p, _ := goka.NewProcessor(brokers, graph)
...

// start processor and views passing the context   
g.Go(func() error { return p.Run(ctx) })

// catch signals
go func() {
  wait := make(chan os.Signal, 1)
  signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
  <-wait   // wait for SIGINT/SIGTERM
  cancel() // gracefully stop processors and views
}()

if err := g.Wait(); err != nil {
  log.Fatalln(err)
}

Perhaps we could still support Start() and Stop() via some wrapper... something like this:

        p, err := goka.StartStopper(goka.NewProcessor(brokers, graph))
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	go func() {
		if err = p.Start(); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	p.Stop() // gracefully stop processor

Any opinions?

@db7 db7 added the enhancement label Apr 4, 2018
@db7 db7 self-assigned this Apr 4, 2018
@db7
Copy link
Collaborator Author

db7 commented Apr 30, 2018

Hey guys, what do you think of this change in the interface? Would it make the library easier to us? @SamiHiltunen @j0hnsmith @frairon @burdiyan @andrewmunro

I'd like to either create a PR for this or close the ticket.

(cc: @edganiukov we are now using errgroup and context libraries to do most of the goroutine management. It took long, but it's now implemented.)

@SamiHiltunen
Copy link
Contributor

I support this. It would simplify the lifecycle management.

@j0hnsmith
Copy link
Contributor

I like the idea of using a single context to stop multiple processors.

I actually have some time this week, I'll take a look maybe tomorrow if there's a general 👍

@andrewmunro
Copy link

I like it. Looks a lot cleaner than p1.stop(), p2.stop() etc etc. Also means there's less danger in you forgetting to stop one of your emitters/processors/views gracefully.

I wonder if you could even abstract this further for beginners using the framework, maybe that patterns module you were talking about elsewhere... 🤔

@burdiyan
Copy link
Contributor

burdiyan commented Apr 30, 2018

I actually used to wrap Start and Stop methods exactly like this using context.Context. But I found out that when you want to control the order in which multiple processors should stop, then you'll have a problem.

So if you have processor A and B, and you need to stop B before A using a single context doesn't give you that control.

I guess that's why standard library have io.Closer interface that many things like http.Server implement providing Close method.

It's also handy that defer statements are evaluated in LIFO order, so you close things in the revers order you "opened" them.

So, I personally would not implement Run with context, unless what I described above doesn't make sense for anybody :)

@edganiukov
Copy link

edganiukov commented Apr 30, 2018

@burdiyan if I am not mistaken, you can have such logic with context as well - using either different contexts for each processor or child-parent context pattern (child context will be canceled after parent).

@db7
Copy link
Collaborator Author

db7 commented Apr 30, 2018

@burdiyan that is an interesting point! AFAIK, we don't have that issue, but I can imagine having it at some point. So either we require the user to apply child contexts or we provide aStartStopper wrapper as described in the issue. Perhaps I just need a better name for it. I will think of something and propose a PR.

Thanks for the quick feedback from everybody! That's awesome.

@db7
Copy link
Collaborator Author

db7 commented Apr 30, 2018

PR #127 replaces Start()/Stop() with Run(context). If you got some spare time, I'd be happy to have reviews.

My feeling is that for simple examples, things get more complicated because one has to create a context and a cancel function. But I like the result in examples/3-messaging.

Next step to close this issue would be to come up with some simple wrapper to still provide the Start()/Stop() pattern (or something similar) that does not require context and cancel function.

@db7
Copy link
Collaborator Author

db7 commented May 1, 2018

PR #128 introduce Runset objects to start and stop sets of processors/views together. It internally creates a context and uses error groups to start goroutines.

Processors and views are "runnables" because they implement Run(context.Context) error with #127. A new function goka.Start() can start multiple runnables together, returning a Runset. For example, rs := goka.Start(proc1, proc2, view1, view2, proc3)

rs provides a Stop() method to stop all runnables together. If one of the runnables returns, all others are stopped. There are other two methods in Runset:

  • Wait() blocks until all runnables have terminated, returns a multi-error (if any).
  • Done() can be used to signal that the runset is going to stop.

Here is a complete example of how to use runsets (other variants are possible):

// create processors and views (they are runnables because have Run(ctx) method)
p, _ := goka.NewProcessor(brokers, DefineGroup(group, Input(topic, codec, cb)))
v, _ := goka.NewView(brokers, topic2, codec)

// start runnables creating a Runset.
rs := goka.Start(p,v)

// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rs.Done(): // wait for one of the runnables to return
case <-wait:      // wait for SIGINT/SIGTERM
	rs.Stop() // gracefully stop runnables
}

// wait for all runnables to return and collect error messages
if err := rs.Wait(); err != nil {
	log.Fatalln(err)
}

If the order of stopping the runnables is relevant, one can create multiple runsets (eg, one for each processors/view) and stop them accordingly.

@burdiyan do you think that this would be helpful and sufficient for your use cases? Do you have suggestions how to improve it?

@burdiyan
Copy link
Contributor

burdiyan commented May 2, 2018

I normally do signal trapping as part of top level errgroup, like this:

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error {
    done := make(chan os.Signal, 1)
    signal.Notify(done, syscall.SIGTERM, syscall.SIGINT)

    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-done:
        signal.Stop(done)
        return errors.New("signal received")
    }
})

g.Go(func() error {
    // Start monitoring HTTP server
})

g.Go(func() error {
    // Start goka processor.
})

g.Go(func() error {
    // Start another goka processor.
})

// Separate goroutine for shutdown logic.
g.Go(func() error {
    <-ctx.Done()

    // Stop processor 1.
    // Stop processor 2.
    // Shutdown the HTTP server.
    // Do other cleanups.
})

g.Wait() // plus error handling for errgroup.

This way the program ends cleanly if one of the "actors" in the errgroup returns error or signal is trapped.

I'm concerned about hiding context in goka.Start, because you normally need context if you need more flexibility or have other goroutines to handle.

I'd be fine with just having Run(context.Context) in processors and views and handle multiple processors manually.

The ordering problem for shutdown could be handled with derived context as discussed elsewhere.

@db7
Copy link
Collaborator Author

db7 commented May 15, 2018

The change has been implemented and merged.
The helper wrapper seems to be an overkill and I closed the PR for now.

Thanks for the feedback from everybody.

@db7 db7 closed this as completed May 15, 2018
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants