Skip to content

Commit

Permalink
lang: interfaces, funcs: Port Func API to new Stream signature
Browse files Browse the repository at this point in the history
This removes the `Close() error` and replaces it with a more modern
Stream api that takes a context. This removes boilerplate and makes
integration with concurrent code easier. The only downside is that there
isn't an explicit cleanup step, but only one function was even using
that and it was possible to switch it to a defer in Stream.
  • Loading branch information
purpleidea committed Jun 27, 2023
1 parent b10b1e5 commit a9a0880
Show file tree
Hide file tree
Showing 43 changed files with 209 additions and 504 deletions.
36 changes: 7 additions & 29 deletions docs/function-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,51 +239,31 @@ use in the other methods.
// Init runs some startup code for this function.
func (obj *FooFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{}) // shutdown signal
return nil
}
```

### Close

```golang
Close() error
```

This is called to cleanup the function. It usually causes the stream to
shutdown. Even if `Stream()` decided to shutdown early, it might still get
called. It is usually called by the engine to tell the function to shutdown.

#### Example

```golang
// Close runs some shutdown code for this function and turns off the stream.
func (obj *FooFunc) Close() error {
close(obj.closeChan) // send a signal to tell the stream to close
return nil
}
```

### Stream

```golang
Stream() error
Stream(context.Context) error
```

`Stream` is where the real _work_ is done. This method is started by the
language function engine. It will run this function while simultaneously sending
it values on the `input` channel. It will only send a complete set of input
it values on the `Input` channel. It will only send a complete set of input
values. You should send a value to the output channel when you have decided that
one should be produced. Make sure to only use input values of the expected type
as declared in the `Info` struct, and send values of the similarly declared
appropriate return type. Failure to do so will may result in a panic and
sadness.
sadness. You must shutdown if the input context cancels. You must close the
`Output` channel if you are done generating new values and/or when you shutdown.

#### Example

```golang
// Stream returns the single value that was generated and then closes.
func (obj *FooFunc) Stream() error {
func (obj *FooFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
var result string
for {
Expand All @@ -300,7 +280,7 @@ func (obj *FooFunc) Stream() error {

result = fmt.Sprintf("the input is: %d", ix)

case <-obj.closeChan:
case <-ctx.Done():
return nil
}

Expand All @@ -309,7 +289,7 @@ func (obj *FooFunc) Stream() error {
V: result,
}:

case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
Expand Down Expand Up @@ -340,8 +320,6 @@ type FooFunc struct {
init *interfaces.Init

// this space can be used if needed

closeChan chan struct{} // shutdown signal
}
```

Expand Down
21 changes: 3 additions & 18 deletions docs/language-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ Please see the example functions in
### Stream

```golang
Stream() error
Stream(context.Context) error
```

Stream is called by the function engine when it is ready for your function to
Expand All @@ -691,23 +691,8 @@ value. Failure to produce at least one value will probably cause the function
engine to hang waiting for your output. This function must close the `Output`
channel when it has no more values to send. The engine will close the `Input`
channel when it has no more values to send. This may or may not influence
whether or not you close the `Output` channel.

#### Example

```golang
Please see the example functions in
[lang/funcs/core/](https://github.com/purpleidea/mgmt/tree/master/lang/funcs/core/).
```

### Close

```golang
Close() error
```

Close asks the particular function to shutdown its `Stream()` function and
return.
whether or not you close the `Output` channel. You must shutdown if the input
context cancels.

#### Example

Expand Down
16 changes: 4 additions & 12 deletions lang/funcs/contains_polyfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package funcs

import (
"context"
"fmt"

"github.com/purpleidea/mgmt/lang/interfaces"
Expand All @@ -44,8 +45,6 @@ type ContainsPolyFunc struct {
last types.Value // last value received to use for diff

result types.Value // last calculated output

closeChan chan struct{}
}

// String returns a simple name for this function. This is needed so this struct
Expand Down Expand Up @@ -357,12 +356,11 @@ func (obj *ContainsPolyFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *ContainsPolyFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}

// Stream returns the changing values that this func has over time.
func (obj *ContainsPolyFunc) Stream() error {
func (obj *ContainsPolyFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
Expand Down Expand Up @@ -393,20 +391,14 @@ func (obj *ContainsPolyFunc) Stream() error {
}
obj.result = result // store new result

case <-obj.closeChan:
case <-ctx.Done():
return nil
}

select {
case obj.init.Output <- obj.result: // send
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}

// Close runs some shutdown code for this function and turns off the stream.
func (obj *ContainsPolyFunc) Close() error {
close(obj.closeChan)
return nil
}
13 changes: 6 additions & 7 deletions lang/funcs/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ func TestLiveFuncExec0(t *testing.T) {
valueptrch := make(chan int) // which Nth value are we at?
killTimeline := make(chan struct{}) // ask timeline to exit

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// wait for close signals
wg.Add(1)
go func() {
Expand Down Expand Up @@ -617,7 +620,7 @@ func TestLiveFuncExec0(t *testing.T) {
if debug {
logf("Running func")
}
err := handle.Stream() // sends to output chan
err := handle.Stream(ctx) // sends to output chan
t.Logf("test #%d: stream exited with: %+v", index, err)
if debug {
logf("Exiting func")
Expand Down Expand Up @@ -740,12 +743,8 @@ func TestLiveFuncExec0(t *testing.T) {
t.Logf("test #%d: timeline finished", index)
close(argch)

t.Logf("test #%d: running Close", index)
if err := handle.Close(); err != nil {
t.Errorf("test #%d: FAIL", index)
t.Errorf("test #%d: could not close func: %+v", index, err)
return
}
t.Logf("test #%d: running cancel", index)
cancel()
}()

// read everything
Expand Down
17 changes: 5 additions & 12 deletions lang/funcs/core/datetime/now_fact.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package coredatetime

import (
"context"
"time"

"github.com/purpleidea/mgmt/lang/funcs/facts"
Expand All @@ -36,8 +37,7 @@ func init() {

// DateTimeFact is a fact which returns the current date and time.
type DateTimeFact struct {
init *facts.Init
closeChan chan struct{}
init *facts.Init
}

// String returns a simple name for this fact. This is needed so this struct can
Expand All @@ -62,12 +62,11 @@ func (obj *DateTimeFact) Info() *facts.Info {
// Init runs some startup code for this fact.
func (obj *DateTimeFact) Init(init *facts.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
return nil
}

// Stream returns the changing values that this fact has over time.
func (obj *DateTimeFact) Stream() error {
func (obj *DateTimeFact) Stream(ctx context.Context) error {
defer close(obj.init.Output) // always signal when we're done
// XXX: this might be an interesting fact to write because:
// 1) will the sleeps from the ticker be in sync with the second ticker?
Expand All @@ -87,22 +86,16 @@ func (obj *DateTimeFact) Stream() error {
startChan = nil // disable
case <-ticker.C: // received the timer event
// pass
case <-obj.closeChan:
case <-ctx.Done():
return nil
}

select {
case obj.init.Output <- &types.IntValue{ // seconds since 1970...
V: time.Now().Unix(), // .UTC() not necessary
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}

// Close runs some shutdown code for this fact and turns off the stream.
func (obj *DateTimeFact) Close() error {
close(obj.closeChan)
return nil
}
16 changes: 4 additions & 12 deletions lang/funcs/core/deploy/abspath_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package coredeploy

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -48,8 +49,6 @@ type AbsPathFunc struct {

path *string // the active path
result *string // last calculated output

closeChan chan struct{}
}

// String returns a simple name for this function. This is needed so this struct
Expand Down Expand Up @@ -90,7 +89,6 @@ func (obj *AbsPathFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *AbsPathFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
if obj.data == nil {
// programming error
return fmt.Errorf("missing function data")
Expand All @@ -99,7 +97,7 @@ func (obj *AbsPathFunc) Init(init *interfaces.Init) error {
}

// Stream returns the changing values that this func has over time.
func (obj *AbsPathFunc) Stream() error {
func (obj *AbsPathFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
Expand Down Expand Up @@ -144,22 +142,16 @@ func (obj *AbsPathFunc) Stream() error {
}
obj.result = &result // store new result

case <-obj.closeChan:
case <-ctx.Done():
return nil
}

select {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}

// Close runs some shutdown code for this function and turns off the stream.
func (obj *AbsPathFunc) Close() error {
close(obj.closeChan)
return nil
}
16 changes: 4 additions & 12 deletions lang/funcs/core/deploy/readfile_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package coredeploy

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -47,8 +48,6 @@ type ReadFileFunc struct {

filename *string // the active filename
result *string // last calculated output

closeChan chan struct{}
}

// String returns a simple name for this function. This is needed so this struct
Expand Down Expand Up @@ -89,7 +88,6 @@ func (obj *ReadFileFunc) Info() *interfaces.Info {
// Init runs some startup code for this function.
func (obj *ReadFileFunc) Init(init *interfaces.Init) error {
obj.init = init
obj.closeChan = make(chan struct{})
if obj.data == nil {
// programming error
return fmt.Errorf("missing function data")
Expand All @@ -98,7 +96,7 @@ func (obj *ReadFileFunc) Init(init *interfaces.Init) error {
}

// Stream returns the changing values that this func has over time.
func (obj *ReadFileFunc) Stream() error {
func (obj *ReadFileFunc) Stream(ctx context.Context) error {
defer close(obj.init.Output) // the sender closes
for {
select {
Expand Down Expand Up @@ -156,22 +154,16 @@ func (obj *ReadFileFunc) Stream() error {
}
obj.result = &result // store new result

case <-obj.closeChan:
case <-ctx.Done():
return nil
}

select {
case obj.init.Output <- &types.StrValue{
V: *obj.result,
}:
case <-obj.closeChan:
case <-ctx.Done():
return nil
}
}
}

// Close runs some shutdown code for this function and turns off the stream.
func (obj *ReadFileFunc) Close() error {
close(obj.closeChan)
return nil
}
Loading

0 comments on commit a9a0880

Please sign in to comment.