Skip to content

Commit

Permalink
add files
Browse files Browse the repository at this point in the history
  • Loading branch information
pat-s committed Oct 23, 2024
1 parent 1b5ee05 commit 027939f
Show file tree
Hide file tree
Showing 17 changed files with 162 additions and 135 deletions.
11 changes: 6 additions & 5 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"time"

"github.com/rs/zerolog/log"
"google.golang.org/grpc/metadata"
grpc_metadata "google.golang.org/grpc/metadata"

"go.woodpecker-ci.org/woodpecker/v2/pipeline"
backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/shared/constant"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
Expand All @@ -50,11 +51,11 @@ func NewRunner(workEngine rpc.Peer, f rpc.Filter, h string, state *State, backen
}
}

func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:contextcheck
func (r *Runner) Run(runnerCtx, shutdownCtx context.Context, metadata *metadata.Metadata) error { //nolint:contextcheck
log.Debug().Msg("request next execution")

meta, _ := metadata.FromOutgoingContext(runnerCtx)
ctxMeta := metadata.NewOutgoingContext(context.Background(), meta)
meta, _ := grpc_metadata.FromOutgoingContext(runnerCtx)
ctxMeta := grpc_metadata.NewOutgoingContext(context.Background(), meta)

// get the next workflow from the queue
workflow, err := r.client.Next(runnerCtx, r.filter)
Expand Down Expand Up @@ -150,7 +151,7 @@ func (r *Runner) Run(runnerCtx, shutdownCtx context.Context) error { //nolint:co
"repo": repoName,
"pipeline_number": pipelineNumber,
}),
).Run(runnerCtx)
).Run(runnerCtx, metadata)

state.Finished = time.Now().Unix()

Expand Down
2 changes: 1 addition & 1 deletion cli/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func execWithAxis(ctx context.Context, c *cli.Command, file, repoPath string, ax
pipeline.WithDescription(map[string]string{
"CLI": "exec",
}),
).Run(ctx)
).Run(ctx, metadata)
}

// convertPathForWindows converts a path to use slash separators
Expand Down
13 changes: 7 additions & 6 deletions cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import (
grpc_credentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
grpc_metadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.woodpecker-ci.org/woodpecker/v2/agent"
agent_rpc "go.woodpecker-ci.org/woodpecker/v2/agent/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/shared/logger"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
Expand All @@ -63,7 +64,7 @@ var (
shutdownCtx = context.Background()
)

func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
func run(ctx context.Context, c *cli.Command, backends []types.Backend, metadata *metadata.Metadata) error {
agentCtx, ctxCancel := context.WithCancelCause(ctx)
stopAgentFunc = func(err error) {
msg := "shutdown of whole agent"
Expand Down Expand Up @@ -160,7 +161,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
client := agent_rpc.NewGrpcClient(ctx, conn)
agentConfigPersisted := atomic.Bool{}

grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname))
grpcCtx := grpc_metadata.NewOutgoingContext(grpcClientCtx, grpc_metadata.Pairs("hostname", hostname))

// check if grpc server version is compatible with agent
grpcServerVersion, err := client.Version(grpcCtx) //nolint:contextcheck
Expand Down Expand Up @@ -290,7 +291,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
}

log.Debug().Msg("polling new steps")
if err := runner.Run(agentCtx, shutdownCtx); err != nil {
if err := runner.Run(agentCtx, shutdownCtx, metadata); err != nil {
log.Error().Err(err).Msg("runner done with error")
return err
}
Expand All @@ -305,7 +306,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
return serviceWaitingGroup.Wait()
}

func runWithRetry(backendEngines []types.Backend) func(ctx context.Context, c *cli.Command) error {
func runWithRetry(backendEngines []types.Backend, metadata *metadata.Metadata) func(ctx context.Context, c *cli.Command) error {
return func(ctx context.Context, c *cli.Command) error {
if err := logger.SetupGlobalLogger(ctx, c, true); err != nil {
return err
Expand All @@ -317,7 +318,7 @@ func runWithRetry(backendEngines []types.Backend) func(ctx context.Context, c *c
retryDelay := c.Duration("connect-retry-delay")
var err error
for i := 0; i < retryCount; i++ {
if err = run(ctx, c, backendEngines); status.Code(err) == codes.Unavailable {
if err = run(ctx, c, backendEngines, metadata); status.Code(err) == codes.Unavailable {
log.Warn().Err(err).Msg(fmt.Sprintf("cannot connect to server, retrying in %v", retryDelay))
time.Sleep(retryDelay)
} else {
Expand Down
5 changes: 3 additions & 2 deletions cmd/agent/core/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import (
"github.com/urfave/cli/v3"

backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/shared/logger"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
"go.woodpecker-ci.org/woodpecker/v2/version"
)

func RunAgent(ctx context.Context, backends []backend.Backend) {
func RunAgent(ctx context.Context, backends []backend.Backend, metadata *metadata.Metadata) {
app := &cli.Command{}
app.Name = "woodpecker-agent"
app.Version = version.String()
app.Usage = "woodpecker agent"
app.Action = runWithRetry(backends)
app.Action = runWithRetry(backends, metadata)
app.Commands = []*cli.Command{
{
Name: "ping",
Expand Down
5 changes: 4 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/kubernetes"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/local"
backendTypes "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
)

Expand All @@ -33,9 +34,11 @@ var backends = []backendTypes.Backend{
local.New(),
}

var metadataList = &metadata.Metadata{}

func main() {
ctx := utils.WithContextSigtermCallback(context.Background(), func() {
log.Info().Msg("termination signal is received, shutting down agent")
})
core.RunAgent(ctx, backends)
core.RunAgent(ctx, backends, metadataList)
}
11 changes: 6 additions & 5 deletions pipeline/backend/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/urfave/cli/v3"

backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
"go.woodpecker-ci.org/woodpecker/v2/shared/utils"
)

Expand Down Expand Up @@ -169,7 +170,7 @@ func (e *docker) SetupWorkflow(ctx context.Context, conf *backend.Config, taskUU
return nil
}

func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID string) error {
func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)

config := e.toConfig(step)
Expand Down Expand Up @@ -247,7 +248,7 @@ func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID str
return e.client.ContainerStart(ctx, containerName, container.StartOptions{})
}

func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)

containerName := toContainerName(step)
Expand All @@ -270,7 +271,7 @@ func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID stri
}, nil
}

func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) {
func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)

logs, err := e.client.ContainerLogs(ctx, toContainerName(step), container.LogsOptions{
Expand All @@ -294,7 +295,7 @@ func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID stri
return rc, nil
}

func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID string) error {
func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)

containerName := toContainerName(step)
Expand All @@ -310,7 +311,7 @@ func (e *docker) DestroyStep(ctx context.Context, step *backend.Step, taskUUID s
return nil
}

func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string) error {
func (e *docker) DestroyWorkflow(ctx context.Context, conf *backend.Config, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")

for _, stage := range conf.Stages {
Expand Down
11 changes: 6 additions & 5 deletions pipeline/backend/dummy/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/urfave/cli/v3"

backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/metadata"
)

type dummy struct {
Expand Down Expand Up @@ -87,7 +88,7 @@ func (e *dummy) SetupWorkflow(_ context.Context, _ *backend.Config, taskUUID str
return nil
}

func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID string) error {
func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)

// internal state checks
Expand All @@ -114,7 +115,7 @@ func (e *dummy) StartStep(_ context.Context, step *backend.Step, taskUUID string
return nil
}

func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)

_, exist := e.kv.Load("task_" + taskUUID)
Expand Down Expand Up @@ -172,7 +173,7 @@ func (e *dummy) WaitStep(ctx context.Context, step *backend.Step, taskUUID strin
}, nil
}

func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) {
func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)

_, exist := e.kv.Load("task_" + taskUUID)
Expand All @@ -196,7 +197,7 @@ func (e *dummy) TailStep(_ context.Context, step *backend.Step, taskUUID string)
return io.NopCloser(strings.NewReader(dummyExecStepOutput(step))), nil
}

func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID string) error {
func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("stop step %s", step.Name)

_, exist := e.kv.Load("task_" + taskUUID)
Expand All @@ -217,7 +218,7 @@ func (e *dummy) DestroyStep(_ context.Context, step *backend.Step, taskUUID stri
return nil
}

func (e *dummy) DestroyWorkflow(_ context.Context, _ *backend.Config, taskUUID string) error {
func (e *dummy) DestroyWorkflow(_ context.Context, _ *backend.Config, taskUUID string, metadata *metadata.Metadata) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")

_, exist := e.kv.Load("task_" + taskUUID)
Expand Down
Loading

0 comments on commit 027939f

Please sign in to comment.