-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resync Theia Manager at startup #150
Conversation
3f7dc0c
to
397f39c
Compare
/theia-test-e2e |
1 similar comment
/theia-test-e2e |
pkg/util/clickhouse/clickhouse.go
Outdated
) | ||
|
||
var ( | ||
openSql = sql.Open | ||
createK8sClient = k8s.CreateK8sClient | ||
) | ||
|
||
func SetupConnection() (connect *sql.DB, err error) { | ||
url, err := getClickHouseURL() | ||
func SetupConnection(client *kubernetes.Interface) (connect *sql.DB, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func SetupConnection(client *kubernetes.Interface) (connect *sql.DB, err error) { | |
func SetupConnection(client kubernetes.Interface) (connect *sql.DB, err error) { |
Go interfaces are already pointers. Should avoid using pointers to interfaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Shawn for the reviewing! Updated.
pkg/util/clickhouse/clickhouse.go
Outdated
@@ -91,17 +91,22 @@ func GetSecret(client kubernetes.Interface, namespace string) (username string, | |||
return username, password, nil | |||
} | |||
|
|||
func getClickHouseURL() (url string, err error) { | |||
func getClickHouseURL(clientPtr *kubernetes.Interface) (url string, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks!
@@ -194,6 +199,10 @@ func (c *NPRecommendationController) Run(stopCh <-chan struct{}) { | |||
return | |||
} | |||
|
|||
// The key can be anything as we only have single item. | |||
c.gcQueue.Add("key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe create a const for the key in this package and use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the key to a specific gcKey to allow separation, thanks!
} | ||
|
||
// Add scheduled/running NPR back to resycn list | ||
nprList, err := c.ListNetworkPolicyRecommendation(env.GetTheiaNamespace()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a race case when a job is deleted between ListNetworkPolicyRecommendation
and c.addPeriodicSync
is called? Shouldn't be of much issue though as even that's the case, during the next resync the cleanup can be done w/o issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine as the PeriodicSync just add the NPR key to the main workqueue, syncNPRecommendation
will return nil if it does not find the NPR. As the theia manager is on, the deletion process will take care of the SparkApplication, db entires and also PeriodicSyncSet.
// handleStaleResources handles the stale Spark Applications and database entries. | ||
// It will delete the dangling resources without a matching NetworkPolicyRecommendation | ||
// and add the running NetworkPolicyRecommendation back to the periodical watch list. | ||
func (c *NPRecommendationController) handleStaleResources() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of think that we should separate sync / retry of the two things we're doing there. Currently populating resync won't be done if errors occur when deleting stale SA or db entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently handleStaleResources
will be retried as I use a gcworker
to run it and add back the key upon error. But it's true if deleting stale SA or db entries never succeeds then populating resync will not be executed. Would you prefer to add another worker to take care of the populating resync thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can persist which part of GC has successfully finished in gcworker
, and only retry the part that has not. Also, if everything is done, should gcworker
quit rather than blocking on the queue (returning false on L299)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great, updated!
require.NoError(t, err) | ||
|
||
// Check the SparkApplication and database entries of jobName1 do not exist | ||
cmd = fmt.Sprintf("kubectl get sparkapplication %s -n flow-visibility", jobName1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe wraps this and and L215 in some retries just in case theia manager gets slow in removing them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks! Also add some retries for npr deletion as I found some failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
removeStaleSparkApp: true, | ||
addResync: true, | ||
}) | ||
go wait.Until(c.gcworker, time.Second, stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only add gcKey to gcQueue when theia-manager is started? I'm wondering why do we use wait.Util?
Won't the func processNextGcWorkItem get blocked at line #333?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the key will be added back to the queue when there is an error in handleStaleResources
, I apply the same logic as the other work queues. And as processNextGcWorkItem
will return false when handleStaleResources
succeed, the goroutine will finish at that time and won't be blocked. It should be possible to replace this by a simpler while loop, but I'm not sure if it is a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks YunTang, you're right. Using wait.Until makes the goroutine running until the stopCh. Updated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: Yanjun Zhou <zhouya@vmware.com>
8a134e0
to
8bdcffe
Compare
/theia-test-e2e |
Theia Manager needs to be resynchronized with the SparkApplications and
db entries when it recovers from the downtime. This commit
Fixes #135
Signed-off-by: Yanjun Zhou zhouya@vmware.com