Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE[581] Add Built-in Execution Context Variables #654

Merged
merged 18 commits into from
Aug 28, 2024

Conversation

halalala222
Copy link
Contributor

Pass Build-in Execution Context Variables as command args.
example:

handlerOn:
  success:
    command: "echo ${DAGU_REQUEST_ID} succeed"

image
image

Copy link
Collaborator

@yohamta yohamta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Looking good! I thought that we should replace those system environment variables in the DAG definition to intermediate representation during DAG building stage and replace back them to the original when running the node(step), but the code change does not seem to include that part. Wouldn't it be necessary?


return os.Setenv(dag.RequestIDEnvKey, dagCtx.DaguRequestID)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we set the environment variables only for the child process rather than the entire agent process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: I thought that we should replace those system environment variables in the DAG definition to intermediate representation during DAG building stage and replace back them to the original when running the node(step), but the code change does not seem to include that part. Wouldn't it be necessary?

A: I attempted to implement this feature this afternoon, but I encountered a problem. All the environment variables such as ${DAG_REQUEST_ID} and ${GOPATH}, as well as other configured environment variables, are converted to an intermediate state {{DAG_REQUEST_ID}}. In the executor, {{DAG_REQUEST_ID}} is converted back into environment variables like ${DAG_REQUEST_ID}. However, in the executor command, when using echo requestId: ${DAG_REQUEST_ID}, the logs show the result as below. I found that to handle the processing of environment variables in echo, the transformation needs to be done within step.commandWithArgs.
image
Therefore, I do not perform the conversion and instead handle it by adding environment variables directly using os.SetEnv before util.SplitCommandWithParse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Shouldn't we set the environment variables only for the child process rather than the entire agent process?

A: I am very sorry.I misunderstood, so I will rethink and adjust my code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no worries! Please keep up the great work. FYI, here's the code to set the environment variables: command.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder. I forgot about this piece of code this afternoon. I now understand how to adjust it.

Copy link
Contributor Author

@halalala222 halalala222 Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yohamta hi! Your provided approach is quite good, but I've noticed it may lead to some problems:

  1. The step in multiple DAGs needs to be incremented because if there are multiple DAGs running simultaneously, ${STEP_1_DAG_SCHEDULER_LOG_PATH} could lead to conflicts.
  2. When spawning the child process, we should make sure to pass these variables using their original keys (like DAG_SCHEDULER_LOG_PATH etc)
    There could be conflicts in environment variables if multiple steps are running.
    for example :
    A: Dag test , running step1 , pass ${STEP_1_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH
    B: Dag anotherOneTest, running step2 , pass ${STEP_2_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH
    Running the steps of the two DAGs(A and B) simultaneously could result in conflicts with the DAG_SCHEDULER_LOG_PATH.

I believe converting each step's DAG_SCHEDULER_LOG_PATH to internal, step-specific versions is a good approach.Perhaps we can pass variables to {STEP_1_DAG_SCHEDULER_LOG_PATH} environment variable before uitil.SplitCommandWithParse() function
Upon executing the uitil.SplitCommandWithParse() function can substitute the corresponding ${STEP_1_DAG_SCHEDULER_LOG_PATH} environment variable with its own specific data.

  1. If we are to convert DAG_SCHEDULER_LOG_PATH to ${STEP_ID_DAG_SCHEDULER_LOG_PATH}, we need to specifically determine whether it is the designated environment variable, such as DAG_SCHEDULER_LOG_PATH. In this case, if the scenario is only used for echoing the corresponding build context variables in the command, we can directly replace the specific ${DAG_SCHEDULER_LOG_PATH}. Are there any other considerations for different scenarios?

I'm not sure if my explanation is clear, but if you have any new ideas, we can continue the discussion. Thank you for providing such a great approach.

Copy link
Collaborator

@yohamta yohamta Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @halalala222, thank you for thinking about this so thoroughly. I have a few thoughts on your points.

The step in multiple DAGs needs to be incremented because if there are multiple DAGs running simultaneously, ${STEP_1_DAG_SCHEDULER_LOG_PATH} could lead to conflicts.
When spawning the child process, we should make sure to pass these variables using their original keys (like DAG_SCHEDULER_LOG_PATH etc)
There could be conflicts in environment variables if multiple steps are running.
for example :
A: Dag test , running step1 , pass ${STEP_1_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH
B: Dag anotherOneTest, running step2 , pass ${STEP_2_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH
Running the steps of the two DAGs(A and B) simultaneously could result in conflicts with the DAG_SCHEDULER_LOG_PATH.

You're absolutely right that if we use some global value, it would lead to conflicts. However, I think this should be local. Setting an environment variable inside a process does not affect other processes. We can just assign a new internal id field to each step, starting from 1, every time, during the DAG building step. This value would be strictly scoped within the agent process, so we don't have to worry about conflicts with other concurrent workflow executions.

Regarding your example:

A: Dag test, running step1, pass ${STEP_1_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH
B: Dag anotherOneTest, running step2, pass ${STEP_2_DAG_SCHEDULER_LOG_PATH} to DAG_SCHEDULER_LOG_PATH

Running the steps of the two DAGs (A and B) simultaneously wouldn't result in conflicts with the DAG_SCHEDULER_LOG_PATH because each would be in its own process.

If we are to convert DAG_SCHEDULER_LOG_PATH to ${STEP_ID_DAG_SCHEDULER_LOG_PATH}, we need to specifically determine whether it is the designated environment variable, such as DAG_SCHEDULER_LOG_PATH. In this case, if the scenario is only used for echoing the corresponding build context variables in the command, we can directly replace the specific ${DAG_SCHEDULER_LOG_PATH}. Are there any other considerations for different scenarios?

Yes, there are other scenarios to consider. For example, users might want to use the environment variable inside a Python script, where we can't replace the string. So I think just replacing the command arguments doesn't work for all use cases.

Thank you for bringing up these important considerations. Let me know if you have any thoughts on this or if there's anything else we should discuss.

However, I've been thinking that this might be a more complex implementation than we initially thought, with potential impacts in various areas of the codebase. Sorry for the issue description was not very clear in details. Given this complexity, it might be better to put this aside for now and focus on other issues, for example paging parameters for DAG list API. What do you think?

Thank you for your hard work and deep thoughts on this PR. It is very helpful for me to clarify the problem and potential solutions significantly. Your contributions are truly valuable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yohamta Thank you for your reply! I think so, it might be better to put aside this for now and focus on DAG list API pagination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yohamta hi!I will start working on this issue again!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @halalala222, sure! Thank you very much! That would be awesome!

@halalala222
Copy link
Contributor Author

command exec go run main.go

steps:
  - name: test5_steps1
    dir: /home/liooooo/draft/cmdTest
    command: go run main.go
// main.go
func main() {
	requestID := os.Getenv("DAGU_REQUEST_ID")
	executionLogPath := os.Getenv("DAGU_EXECUTION_LOG_PATH")
	schedulerLogPath := os.Getenv("DAGU_SCHEDULER_LOG_PATH")

	if requestID == "" {
		fmt.Printf("Request ID is empty\n")
	} else {
		fmt.Printf("Request ID is %s\n", requestID)
	}

	if executionLogPath == "" {
		fmt.Printf("Execution log path is empty\n")
	} else {
		fmt.Printf("Execution log path is %s\n", executionLogPath)
	}

	if schedulerLogPath == "" {
		fmt.Printf("Scheduler log path is empty\n")
	} else {
		fmt.Printf("Scheduler log path is %s\n", schedulerLogPath)
	}
}

1. Concurrent execution of multiple DAGs.

GkDOiSyDEl
test2
4dBFPEhLYI
tbaQ28yKom
test10
eTQsAhDAY0
qznZ1VtwBF
test11
rmXesN1zD4
eOw3fIOW6e
test3
4QSkCcwQVr
90bfb538-85f9-4d93-9a24-cbcbf60b4bae
test4
IFmq8FSMQM
eQPA2Ri19t
test5
ztB8XLxEkF
xlweDIccij
test6
ZuIrLc39TL
9Py1nLRSWE
test7
zbFtsZRCjz
0toEOnrQkx
test8
NfZOKydcbu
A8YEy9Qg8s
test9
9N7WMho8KN
MSbxwIj519

Concurrent execution of multiple steps within the same DAG.

ROkovvU5dl
step1
4A65p0Plfl
step2
xJA9aJU6b6
step3
5xvBsh4MXL
step4
85D13PpsOL
step5
CfWVXPmLCY
step6
b4dec93a-9de1-4941-b5e7-46384d49966f
step7
sUB2MnZfzv
step8
B6x00DNDSZ
step9
IUbdaq6GJK
step10
PTmy02ZvjF
step11
1cf852d7-0f37-4aee-bacb-bc7d25b594aa

@halalala222
Copy link
Contributor Author

Command echo

steps:
  - name: step1
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}
  - name: step2
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}
  - name: step3
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}
  - name: step4
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}
  - name: step5
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}
  - name: step6
    command: echo ${DAGU_REQUEST_ID} , ${DAGU_EXECUTION_LOG_PATH} , ${DAGU_SCHEDULER_LOG_PATH}

image
image
image
image
image
image

@halalala222
Copy link
Contributor Author

@yohamta Hi, I've updated my PR.

Copy link
Collaborator

@yohamta yohamta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for the amazing work! Overall looking good! However, there're some changes I would like to suggest.

cmd.Dir = step.Dir
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, step.Variables...)
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, os.Getenv(stepSpecialExecutionLogPathKey)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to something like this?

Suggested change
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, os.Getenv(stepSpecialExecutionLogPathKey)))
// Set environment variables from the DAG context.
dagContext, err := dag.GetContext(ctx)
if err != nil {
return nil, err
}
cmd.Env = append(cmd.Env, dagContext.Envs...)

Comment on lines 364 to 366
if err = os.Setenv(util.GenerateStepSpecialExecutionLogPathKey(n.id), n.data.State.Log); err != nil {
return err
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use DAG context to pass down the special environment variables?

Suggested change
if err = os.Setenv(util.GenerateStepSpecialExecutionLogPathKey(n.id), n.data.State.Log); err != nil {
return err
}
// Execute runs the command synchronously and returns error if any.
func (n *Node) Execute(ctx context.Context) error {
dagContext, err := dag.GetContext(ctx)
if err !=nil {
return err
}
// set special environment variables
globalStepLogEnvKey := dag.GenGlobalStepLogEnvKey(n.Step.ID)
if err = os.Setenv(globalStepLogEnvKey, n.data.State.Log); err != nil {
return err
}
dagContext.Envs = append(
dagContext.Envs,
dag.StepLogPathEnvKey + "=" + n.data.State.Log,
)
ctx = dag.WithContext(ctx, dagContext)
// ......

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!!!!! Thank you!

@@ -225,3 +228,21 @@ func AddYamlExtension(file string) string {
}
return file
}

func GenerateStepSpecialExecutionLogPathKey(stepID int) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define this function within the dag package? I wonder that if it could be smaller scoped, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Thank you!
I will modify my code during the day tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yohamta Hi! Hi, I've updated my PR.

Copy link
Collaborator

@yohamta yohamta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!👍✨✨✨🚀🚀🚀 Thank you very much!

@yohamta yohamta merged commit f1bfcdd into dagu-org:main Aug 28, 2024
4 of 5 checks passed
Copy link

codecov bot commented Aug 28, 2024

Codecov Report

Attention: Patch coverage is 41.66667% with 21 lines in your changes missing coverage. Please review.

Project coverage is 64.72%. Comparing base (765fa19) to head (2e737f7).
Report is 3 commits behind head on main.

Files Patch % Lines
internal/dag/context.go 0.00% 13 Missing ⚠️
internal/agent/agent.go 63.63% 2 Missing and 2 partials ⚠️
internal/dag/scheduler/node.go 66.66% 2 Missing and 2 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #654      +/-   ##
==========================================
- Coverage   65.12%   64.72%   -0.41%     
==========================================
  Files          53       53              
  Lines        4278     4306      +28     
==========================================
+ Hits         2786     2787       +1     
  Misses       1263     1263              
- Partials      229      256      +27     
Files Coverage Δ
internal/agent/agent.go 72.94% <63.63%> (-1.47%) ⬇️
internal/dag/scheduler/node.go 84.25% <66.66%> (-0.69%) ⬇️
internal/dag/context.go 0.00% <0.00%> (ø)

... and 5 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 765fa19...2e737f7. Read the comment docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants