-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[New Scheduler] Add DataManagementService #5063
Changes from all commits
31e25a0
0d24039
b5fad68
75e200a
30f62dc
6a18eb8
56a4f2c
c7ad646
ec5f93f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,328 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.openwhisk.core.service | ||
|
||
import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} | ||
import akka.util.Timeout | ||
import io.grpc.StatusRuntimeException | ||
import org.apache.openwhisk.common.Logging | ||
import org.apache.openwhisk.core.ConfigKeys | ||
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader} | ||
import org.apache.openwhisk.core.service.DataManagementService.retryInterval | ||
import pureconfig.loadConfigOrThrow | ||
|
||
import scala.collection.concurrent.TrieMap | ||
import scala.collection.mutable.{Map, Queue} | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.duration._ | ||
import scala.util.Success | ||
|
||
// messages received by the actor | ||
// it is required to specify a recipient directly for the retryable message processing | ||
case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true) | ||
case class RegisterInitialData(key: String, | ||
value: String, | ||
failoverEnabled: Boolean = true, | ||
recipient: Option[ActorRef] = None) | ||
|
||
case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true) | ||
case class UnregisterData(key: String) | ||
case class UpdateDataOnChange(key: String, value: String) | ||
|
||
// messages sent by the actor | ||
case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader]) | ||
case class FinishWork(key: String) | ||
case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done]) | ||
case class Done() | ||
case class AlreadyExist() | ||
|
||
/** | ||
* This service is in charge of storing given data to ETCD. | ||
* In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD. | ||
* So it guarantees the data is eventually stored. | ||
*/ | ||
class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)( | ||
implicit logging: Logging, | ||
actorSystem: ActorSystem) | ||
extends Actor { | ||
private implicit val ec = context.dispatcher | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is used by both schedulers and invokers to store data to ETCD.
Dependent modules are Queue, ContainerProxy, CreationJobManager, etc. |
||
implicit val requestTimeout: Timeout = Timeout(5.seconds) | ||
private[service] val dataCache = TrieMap[String, String]() | ||
private val operations = Map.empty[String, Queue[Any]] | ||
private var inProgressKeys = Set.empty[String] | ||
private val watcherName = "data-management-service" | ||
|
||
private val worker = workerFactory(context) | ||
|
||
override def receive: Receive = { | ||
case FinishWork(key) => | ||
// send waiting operation to worker if there is any, else update the inProgressKeys | ||
val ops = operations.get(key) | ||
if (ops.nonEmpty && ops.get.nonEmpty) { | ||
val operation = ops.get.dequeue() | ||
worker ! operation | ||
} else { | ||
inProgressKeys = inProgressKeys - key | ||
operations.remove(key) // remove empty queue from the map to free memory | ||
} | ||
|
||
// normally these messages will be sent when queues are created. | ||
case request: ElectLeader => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leader election happens when a queue is created. |
||
if (inProgressKeys.contains(request.key)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer. |
||
logging.info(this, s"save a request $request into a buffer") | ||
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request) | ||
} else { | ||
worker ! request | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actual works would be delegated to ETCDWorker. |
||
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
case request: RegisterInitialData => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actions under the same namespace share some data such as namespace throttling data. |
||
// send WatchEndpoint first as the put operation will be retried until success if failed | ||
if (request.failoverEnabled) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it. |
||
watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent)) | ||
if (inProgressKeys.contains(request.key)) { | ||
logging.info(this, s"save request $request into a buffer") | ||
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request) | ||
} else { | ||
worker ! request | ||
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
case request: RegisterData => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will overwrite the existing data in ETCD. |
||
// send WatchEndpoint first as the put operation will be retried until success if failed | ||
if (request.failoverEnabled) | ||
watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent)) | ||
if (inProgressKeys.contains(request.key)) { | ||
// the new put|delete operation will erase influences made by older operations like put&delete | ||
// so we can remove these old operations | ||
logging.info(this, s"save request $request into a buffer") | ||
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value => | ||
value match { | ||
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false | ||
case _ => true | ||
} | ||
} | ||
queue.enqueue(request) | ||
operations.update(request.key, queue) | ||
} else { | ||
worker ! request | ||
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
case request: WatcherClosed => | ||
if (inProgressKeys.contains(request.key)) { | ||
// The put|delete operations against the same key will overwrite the previous results. | ||
// For example, if we put a value, delete it and put a new value again, the final result will be the new value. | ||
// So we can remove these old operations | ||
logging.info(this, s"save request $request into a buffer") | ||
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value => | ||
value match { | ||
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false | ||
case _ => true | ||
} | ||
} | ||
queue.enqueue(request) | ||
operations.update(request.key, queue) | ||
} else { | ||
worker ! request | ||
inProgressKeys = inProgressKeys + request.key | ||
} | ||
|
||
// It is required to close the watcher first before deleting etcd data | ||
// It is supposed to receive the WatcherClosed message after the watcher is stopped. | ||
case msg: UnregisterData => | ||
watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true) | ||
|
||
case WatchEndpointRemoved(_, key, value, false) => | ||
self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup | ||
|
||
// It should not receive "prefixed" data | ||
case WatchEndpointRemoved(_, key, value, true) => | ||
logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}") | ||
|
||
case msg: UpdateDataOnChange => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To reduce the loads against ETCD, it does not store data if there is no change in the value. |
||
dataCache.get(msg.key) match { | ||
case Some(cached) if cached == msg.value => | ||
logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.") | ||
|
||
case Some(cached) if cached != msg.value => | ||
dataCache.update(msg.key, msg.value) | ||
self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup | ||
|
||
case None => | ||
dataCache.put(msg.key, msg.value) | ||
self ! RegisterData(msg.key, msg.value) | ||
|
||
} | ||
} | ||
} | ||
|
||
object DataManagementService { | ||
val retryInterval: FiniteDuration = loadConfigOrThrow[FiniteDuration](ConfigKeys.dataManagementServiceRetryInterval) | ||
|
||
def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging, | ||
actorSystem: ActorSystem): Props = { | ||
Props(new DataManagementService(watcherService, workerFactory)) | ||
} | ||
} | ||
|
||
private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext, | ||
actorSystem: ActorSystem, | ||
logging: Logging) | ||
extends Actor { | ||
|
||
private val dataManagementService = context.parent | ||
private var lease: Option[Lease] = None | ||
leaseService ! GetLease | ||
|
||
override def receive: Receive = { | ||
case msg: Lease => | ||
lease = Some(msg) | ||
|
||
// leader election + endpoint management | ||
case request: ElectLeader => | ||
lease match { | ||
case Some(l) => | ||
etcdClient | ||
.electLeader(request.key, request.value, l) | ||
.andThen { | ||
case Success(msg) => | ||
request.recipient ! ElectionResult(msg) | ||
dataManagementService ! FinishWork(request.key) | ||
} | ||
.recover { | ||
// if there is no lease, reissue it and retry immediately | ||
case t: StatusRuntimeException => | ||
logging.warn(this, s"a lease is expired while leader election, reissue it: $t") | ||
lease = None | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
|
||
// it should retry forever until the data is stored | ||
case t: Throwable => | ||
logging.warn(this, s"unexpected error happened: $t, retry storing data") | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
case None => | ||
logging.warn(this, s"lease not found, retry storing data") | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
|
||
// only endpoint management | ||
case request: RegisterData => | ||
lease match { | ||
case Some(l) => | ||
etcdClient | ||
.put(request.key, request.value, l.id) | ||
.andThen { | ||
case Success(_) => | ||
dataManagementService ! FinishWork(request.key) | ||
} | ||
.recover { | ||
// if there is no lease, reissue it and retry immediately | ||
case t: StatusRuntimeException => | ||
logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t") | ||
lease = None | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
|
||
// it should retry forever until the data is stored | ||
case t: Throwable => | ||
logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}") | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
case None => | ||
logging.warn(this, s"lease not found, retry storing data ${request.key}") | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
|
||
// it stores the data iif there is no such one | ||
case request: RegisterInitialData => | ||
lease match { | ||
case Some(l) => | ||
etcdClient | ||
.putTxn(request.key, request.value, 0, l.id) | ||
.map { res => | ||
dataManagementService ! FinishWork(request.key) | ||
if (res.getSucceeded) { | ||
logging.info(this, s"initial data storing succeeds for ${request.key}") | ||
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done()))) | ||
} else { | ||
logging.info(this, s"data is already stored for: $request, cancel the initial data storing") | ||
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist()))) | ||
} | ||
} | ||
.recover { | ||
// if there is no lease, reissue it and retry immediately | ||
case t: StatusRuntimeException => | ||
logging.warn( | ||
this, | ||
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t") | ||
lease = None | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
|
||
// it should retry forever until the data is stored | ||
case t: Throwable => | ||
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}") | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
case None => | ||
logging.warn(this, s"lease not found, retry storing data for ${request.key}") | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(request, retryInterval) | ||
} | ||
|
||
case msg: WatcherClosed => | ||
etcdClient | ||
.del(msg.key) | ||
.andThen { | ||
case Success(_) => | ||
dataManagementService ! FinishWork(msg.key) | ||
} | ||
.recover { | ||
// if there is no lease, reissue it and retry immediately | ||
case t: StatusRuntimeException => | ||
logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t") | ||
lease = None | ||
leaseService ! GetLease | ||
sendMessageToSelfAfter(msg, retryInterval) | ||
|
||
// it should retry forever until the data is stored | ||
case t: Throwable => | ||
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}") | ||
sendMessageToSelfAfter(msg, retryInterval) | ||
} | ||
|
||
} | ||
|
||
private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = { | ||
actorSystem.scheduler.scheduleOnce(retryInterval, self, msg) | ||
} | ||
} | ||
|
||
object EtcdWorker { | ||
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext, | ||
actorSystem: ActorSystem, | ||
logging: Logging): Props = { | ||
Props(new EtcdWorker(etcdClient, leaseService)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will include test cases into this PR after setting up the CI pipeline for scheduler components.