Skip to content

Commit

Permalink
Refactor and drop the complexity of the multi-interpreter runtime.
Browse files Browse the repository at this point in the history
Removes the "main" go routine being used to protect the main
interpreter now that the Python lifecycle is better understood.

Cleans up the single interpreter implementation as well to bring
them closer to one another in terms of implementation.
  • Loading branch information
voutilad committed Aug 8, 2024
1 parent 6704a1c commit 754af1c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 209 deletions.
236 changes: 49 additions & 187 deletions internal/impl/python/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,11 @@ import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"

"github.com/redpanda-data/benthos/v4/public/service"
py "github.com/voutilad/gogopython"
)

// Request the Python runtime Go routine perform an action.
type request int

const (
pythonStart request = iota // Start the runtime.
pythonStatus // Are you alive?
pythonStop // Stop and shutdown sub-interpreters.
pythonSpawn // Span a new sub-interpreter.
)

// Reply from the Python runtime Go routine in response to a Request.
type reply struct {
err error
interpreter *subInterpreter
main py.PyThreadStatePtr
}

// MultiInterpreterRuntime creates and manages multiple Python sub-interpreters.
type MultiInterpreterRuntime struct {
exe string // Python exe (binary).
Expand All @@ -38,11 +19,8 @@ type MultiInterpreterRuntime struct {
interpreters []*subInterpreter // Sub-interpreters.
tickets chan *InterpreterTicket // Tickets for sub-interpreters.

from chan reply // protected by mtx.
to chan request // protected by mtx.
chanMtx *ContextAwareMutex // Mutex to write protect the from and to channel ordering.

started atomic.Bool // Flag to signal if the Go routine is running.
mtx *ContextAwareMutex // Mutex to write protect the runtime state.
started bool
legacyMode bool // Running in legacy mode?
logger *service.Logger // Redpanda Connect logger service.
}
Expand All @@ -64,9 +42,7 @@ func NewMultiInterpreterRuntime(exe string, cnt int, legacyMode bool, logger *se
exe: exe,
home: home,
paths: paths,
from: make(chan reply),
to: make(chan request),
chanMtx: NewContextAwareMutex(),
mtx: NewContextAwareMutex(),
interpreters: make([]*subInterpreter, cnt),
tickets: make(chan *InterpreterTicket, cnt),
legacyMode: legacyMode,
Expand All @@ -84,37 +60,31 @@ func (r *MultiInterpreterRuntime) Start(ctx context.Context) error {
}
defer globalMtx.Unlock()

// We don't use CAS here as we're serialized via the global lock.
if !r.started.Load() {
// Launch our main interpreter go routine.
go r.mainPython()
if !r.started {
runtime.LockOSThread()
defer runtime.UnlockOSThread()

// Ask the main interpreter to start things up.
err = r.chanMtx.LockWithContext(ctx)
r.thread, err = loadPython(r.exe, r.home, r.paths)
if err != nil {
r.logger.Errorf("Failed to start Python interpreter.")
return err
}
defer r.chanMtx.Unlock()
r.to <- pythonStart
response := <-r.from
if response.err != nil {
return response.err
}
r.started.Store(true)
r.logger.Debug("Python interpreter started.")

// Start up sub-interpreters.
for idx := range len(r.interpreters) {
r.to <- pythonSpawn
response = <-r.from
if response.err != nil {
return response.err
sub, err := r.initSubInterpreter()
if err != nil {
r.logger.Error("Failed to create new sub-interpreter.")
return err
}

// Populate our ticket booth and interpreter list.
r.interpreters[idx] = response.interpreter
r.tickets <- &InterpreterTicket{idx: idx, id: response.interpreter.id}
r.interpreters[idx] = sub
r.tickets <- &InterpreterTicket{idx: idx, id: sub.id}
r.logger.Tracef("Initialized sub-interpreter %d.\n", sub.id)
}
r.started = true
r.logger.Debugf("Started %d sub-interpreters.", len(r.tickets))
}
return nil
Expand All @@ -128,31 +98,48 @@ func (r *MultiInterpreterRuntime) Stop(ctx context.Context) error {
}
defer globalMtx.Unlock()

if !r.started.Load() {
if !r.started {
return errors.New("not started")
}

// Ask main go routine to stop.
err = r.chanMtx.LockWithContext(ctx)
runtime.LockOSThread()
defer runtime.UnlockOSThread()

// Collect all the tickets before stopping the sub-interpreters.
tickets := make([]*InterpreterTicket, len(r.tickets))
for idx := range tickets {
ticket, err := r.Acquire(ctx)
if err != nil {
panic("cannot acquire ticket while stopping")
}
tickets[idx] = ticket
}

// We have all the tickets. Time to kill the sub-interpreters.
for _, ticket := range tickets {
sub := r.interpreters[ticket.idx]
// Restore the sub-interpreter thread state.
py.PyEval_RestoreThread(sub.thread)
py.PyThreadState_Clear(sub.thread)

// Clean up the ThreadState. Clear *must* be called before Delete.
py.PyInterpreterState_Clear(sub.state)
py.PyInterpreterState_Delete(sub.state)
r.logger.Tracef("Stopped sub-interpreter %d.\n", sub.id)
}

// Tear down the runtime.
err = unloadPython(r.thread)
if err != nil {
return err
}
defer r.chanMtx.Unlock()
r.to <- pythonStop
response := <-r.from
if response.err != nil {
return response.err
}

r.logger.Debug("Python interpreter stopped.")
return nil
}

func (r *MultiInterpreterRuntime) Acquire(ctx context.Context) (*InterpreterTicket, error) {
if !r.started.Load() {
return nil, errors.New("not started")
}

// Take a ticket from the pool.
select {
case ticket := <-r.tickets:
return ticket, nil
Expand All @@ -162,16 +149,13 @@ func (r *MultiInterpreterRuntime) Acquire(ctx context.Context) (*InterpreterTick
}

func (r *MultiInterpreterRuntime) Release(token *InterpreterTicket) error {
if !r.started.Load() {
return errors.New("not started")
}

// Double-check the token is valid.
if token.idx < 0 || token.idx > len(r.interpreters) {
return errors.New("invalid token: bad index")
}

// Return the ticket to the pool.
// Return the ticket to the pool. This should not block as the channel is
// buffered.
r.tickets <- token

return nil
Expand Down Expand Up @@ -204,10 +188,6 @@ func (r *MultiInterpreterRuntime) Apply(token *InterpreterTicket, _ context.Cont
// Map a function fn over all the interpreters, one at a time. Useful for
// initializing all interpreters to a given state.
func (r *MultiInterpreterRuntime) Map(ctx context.Context, f func(t *InterpreterTicket) error) error {
if !r.started.Load() {
return errors.New("not started")
}

// Acquire all tickets so we have sole control of the interpreter. Makes it
// easier to know if we applied the function to all sub-interpreters.
tickets := make([]*InterpreterTicket, len(r.tickets))
Expand Down Expand Up @@ -243,126 +223,11 @@ func (r *MultiInterpreterRuntime) Map(ctx context.Context, f func(t *Interpreter
return nil
}

// Teardown a Sub-Interpreter and delete its state. This will probably trigger a lot of Python
// cleanup under the hood.
//
// Note: This returns void because most of these calls are fatal.
func stopSubInterpreter(s *subInterpreter) {
// We should be running from the main Go routine. Load the original
// Python ThreadState so we can clean up.
py.PyEval_RestoreThread(s.thread)
py.PyThreadState_Clear(s.thread)

// Clean up the ThreadState. Clear *must* be called before Delete.
py.PyInterpreterState_Clear(s.state)
py.PyInterpreterState_Delete(s.state)
}

// Primary "run loop" for main Python interpreter.
//
// Responsible for managing the Python runtime, spawning new sub-interpreters,
// and cleaning up the mess.
//
// All communication to the main interpreter should be done via the to and from channels.
func (r *MultiInterpreterRuntime) mainPython() {
pythonStarted := false
keepGoing := true

// Just a guard rail.
initPythonOnce := sync.OnceValues(func() (py.PyThreadStatePtr, error) {
return loadPython(r.exe, r.home, r.paths)
})

// We need to stay pinned to the same OS thread as Python's C API heavily
// makes use of thread local storage. If we let Go reschedule us to a
// different OS thread, we can corrupt our state and crash.
runtime.LockOSThread()
defer runtime.UnlockOSThread()

for keepGoing {
msg := <-r.to
switch msg {
case pythonStart:
if !pythonStarted {
r.logger.Info("Starting Python interpreter.")
var err error
r.thread, err = initPythonOnce()
if err != nil {
keepGoing = false
r.logger.Errorf("Failed to start Python interpreter: %s", err)
}
pythonStarted = true
r.from <- reply{err: err, main: r.thread}
} else {
r.logger.Warn("Interpreter already started")
r.from <- reply{err: errors.New("main interpreter already started")}
}

case pythonStop:
if pythonStarted {
// No more run loop.
keepGoing = false

// Do we have any sub-interpreters running? If so, we need to stop them.
ctx := context.Background()
tickets := make([]*InterpreterTicket, len(r.tickets))
for idx := range tickets {
ticket, err := r.Acquire(ctx)
if err != nil {
panic("cannot acquire ticket while stopping")
}
tickets[idx] = ticket
}
for _, ticket := range tickets {
sub := r.interpreters[ticket.idx]
stopSubInterpreter(sub)
r.logger.Tracef("Stopped sub-interpreter %d.\n", sub.id)
}

// Reload the main thread state so we can exit Python. This
// requires taking and holding the big lock.
err := unloadPython(r.thread)
if err != nil {
// The chance we get here *without* an explosion is slim, but why not.
r.from <- reply{err: errors.New("failed to shutdown python")}
} else {
r.logger.Trace("Python stopped")
r.from <- reply{}
}

} else {
r.logger.Warn("Interpreter not running")
r.from <- reply{err: errors.New("main interpreter not running")}
}

case pythonSpawn:
if pythonStarted {
r.logger.Trace("Spawning a new sub-interpreter.")
sub, err := r.initSubInterpreter(r.logger)
if err != nil {
keepGoing = false
r.logger.Warn("Failed to create sub-interpreter.")
r.from <- reply{err: err}
} else {
r.from <- reply{interpreter: sub}
}
} else {
r.logger.Warn("Interpreter not running.")
r.from <- reply{err: errors.New("interpreter not running")}
}

case pythonStatus:
r.logger.Debug("Interpreter Go routine is alive.")
r.from <- reply{}
}
}
}

// Initialize a Sub-interpreter.
//
// Relies on global Python state and being run only from the OS thread that
// manages the runtime. Will potentially panic otherwise.
func (r *MultiInterpreterRuntime) initSubInterpreter(logger *service.Logger) (*subInterpreter, error) {
func (r *MultiInterpreterRuntime) initSubInterpreter() (*subInterpreter, error) {
// Some of these args are required if we want to use Numpy, etc.
var ts py.PyThreadStatePtr
interpreterConfig := py.PyInterpreterConfig{}
Expand All @@ -385,7 +250,6 @@ func (r *MultiInterpreterRuntime) initSubInterpreter(logger *service.Logger) (*s
status := py.Py_NewInterpreterFromConfig(&ts, &interpreterConfig)
if status.Type != 0 {
msg, _ := py.WCharToString(status.ErrMsg)
logger.Errorf("Failed to create new sub-interpreter: %s", msg)
return nil, errors.New(msg)
}

Expand All @@ -394,8 +258,6 @@ func (r *MultiInterpreterRuntime) initSubInterpreter(logger *service.Logger) (*s
id := py.PyInterpreterState_GetID(state)
py.PyEval_SaveThread()

logger.Tracef("Initialized sub-interpreter %d.\n", id)

return &subInterpreter{
state: state,
thread: ts,
Expand Down
Loading

0 comments on commit 754af1c

Please sign in to comment.