diff --git a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/description/DeployAppengineDescription.groovy b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/description/DeployAppengineDescription.groovy
index 11c1958e117..572629739ce 100644
--- a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/description/DeployAppengineDescription.groovy
+++ b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/description/DeployAppengineDescription.groovy
@@ -37,4 +37,5 @@ class DeployAppengineDescription extends AbstractAppengineCredentialsDescription
   List<String> configFiles
   Boolean promote
   Boolean stopPreviousVersion
+  String containerImageUrl // app engine flex only
 }
diff --git a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/ops/DeployAppengineAtomicOperation.groovy b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/ops/DeployAppengineAtomicOperation.groovy
index ee5320e23da..269c244aa92 100644
--- a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/ops/DeployAppengineAtomicOperation.groovy
+++ b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/ops/DeployAppengineAtomicOperation.groovy
@@ -53,21 +53,32 @@ class DeployAppengineAtomicOperation implements AtomicOperation<DeploymentResult
 
   DeployAppengineDescription description
   boolean usesGcs
+  boolean containerDeployment
 
   DeployAppengineAtomicOperation(DeployAppengineDescription description) {
     this.description = description
-    this.usesGcs = description.repositoryUrl.startsWith("gs://")
+    this.containerDeployment = description.containerImageUrl?.trim()
+    this.usesGcs = !this.containerDeployment && description.repositoryUrl.startsWith("gs://")
   }
 
   /**
    * curl -X POST -H "Content-Type: application/json" -d '[ { "createServerGroup": { "application": "myapp", "stack": "stack", "freeFormDetails": "details", "repositoryUrl": "https://github.com/organization/project.git", "branch": "feature-branch", "credentials": "my-appengine-account", "configFilepaths": ["app.yaml"] } } ]' "http://localhost:7002/appengine/ops"
    * curl -X POST -H "Content-Type: application/json" -d '[ { "createServerGroup": { "application": "myapp", "stack": "stack", "freeFormDetails": "details", "repositoryUrl": "https://github.com/organization/project.git", "branch": "feature-branch", "credentials": "my-appengine-account", "configFilepaths": ["app.yaml"], "promote": true, "stopPreviousVersion": true } } ]' "http://localhost:7002/appengine/ops"
    * curl -X POST -H "Content-Type: application/json" -d '[ { "createServerGroup": { "application": "myapp", "stack": "stack", "freeFormDetails": "details", "repositoryUrl": "https://github.com/organization/project.git", "branch": "feature-branch", "credentials": "my-appengine-account", "configFilepaths": ["runtime: python27\napi_version: 1\nthreadsafe: true\nmanual_scaling:\n  instances: 5\ninbound_services:\n - warmup\nhandlers:\n - url: /.*\n   script: main.app"],} } ]' "http://localhost:7002/appengine/ops"
+   * curl -X POST -H "Content-Type: application/json" -d '[ { "createServerGroup": { "application": "myapp", "stack": "stack", "freeFormDetails": "details", "credentials": "my-appengine-account", "containerImageUrl": "gcr.io/my-project/my-image:my-tag", "configFiles": ["env: flex\nruntime: custom\nmanual_scaling:\n  instances: 1\nresources:\n  cpu: 1\n  memory_gb: 0.5\n  disk_size_gb: 10"] } } ]' "http://localhost:7002/appengine/ops"
    */
   @Override
   DeploymentResult operate(List priorOutputs) {
     def  baseDir = description.credentials.localRepositoryDirectory
-    def  directoryPath = getFullDirectoryPath(baseDir, description.repositoryUrl)
+
+    String directoryId
+    if (containerDeployment) {
+      directoryId = description.containerImageUrl
+    } else {
+      directoryId = description.repositoryUrl
+    }
+
+    def directoryPath = getFullDirectoryPath(baseDir, directoryId)
 
     /*
     * We can't allow concurrent deploy operations on the same local repository.
@@ -77,7 +88,13 @@ class DeployAppengineAtomicOperation implements AtomicOperation<DeploymentResult
     return AppengineMutexRepository.atomicWrapper(directoryPath, {
       task.updateStatus BASE_PHASE, "Initializing creation of version..."
       def result = new DeploymentResult()
-      def newVersionName = deploy(cloneOrUpdateLocalRepository(directoryPath, 1))
+      String newVersionName
+      if (containerDeployment) {
+        createEmptyDirectory(directoryPath)
+        newVersionName = deploy(directoryPath)
+      } else {
+        newVersionName = deploy(cloneOrUpdateLocalRepository(directoryPath, 1))
+      }
       def region = description.credentials.region
       result.serverGroupNames = Arrays.asList("$region:$newVersionName".toString())
       result.serverGroupNameByRegion[region] = newVersionName
@@ -85,6 +102,16 @@ class DeployAppengineAtomicOperation implements AtomicOperation<DeploymentResult
     })
   }
 
+  void createEmptyDirectory(String path) {
+    File directory = new File(path)
+    if (directory.exists()) {
+      directory.deleteDir()
+    }
+    if (!directory.mkdirs()) {
+      throw new AppengineOperationException("Failed to create directory: $path")
+    }
+  }
+
   String cloneOrUpdateLocalRepository(String directoryPath, Integer retryCount) {
     def repositoryUrl = description.repositoryUrl
     def directory = new File(directoryPath)
@@ -145,7 +172,9 @@ class DeployAppengineAtomicOperation implements AtomicOperation<DeploymentResult
                                                                          description.stack,
                                                                          description.freeFormDetails,
                                                                          false)
-    def writtenFullConfigFilePaths = writeConfigFiles(description.configFiles, repositoryPath, applicationDirectoryRoot)
+    def imageUrl = description.containerImageUrl
+    def configFiles = description.configFiles
+    def writtenFullConfigFilePaths = writeConfigFiles(configFiles, repositoryPath, applicationDirectoryRoot)
     def repositoryFullConfigFilePaths =
       (description.configFilepaths?.collect { Paths.get(repositoryPath, applicationDirectoryRoot ?: '.', it).toString() } ?: []) as List<String>
     def deployCommand = ["gcloud"]
@@ -158,6 +187,9 @@ class DeployAppengineAtomicOperation implements AtomicOperation<DeploymentResult
     deployCommand << (description.stopPreviousVersion ? "--stop-previous-version": "--no-stop-previous-version")
     deployCommand << "--project=$project"
     deployCommand << "--account=$accountEmail"
+    if (containerDeployment) {
+      deployCommand << "--image-url=$imageUrl"
+    }
 
     task.updateStatus BASE_PHASE, "Deploying version $versionName..."
     try {
diff --git a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/validators/DeployAppengineDescriptionValidator.groovy b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/validators/DeployAppengineDescriptionValidator.groovy
index bb1491f7c01..7fea1b86f37 100644
--- a/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/validators/DeployAppengineDescriptionValidator.groovy
+++ b/clouddriver-appengine/src/main/groovy/com/netflix/spinnaker/clouddriver/appengine/deploy/validators/DeployAppengineDescriptionValidator.groovy
@@ -39,7 +39,9 @@ class DeployAppengineDescriptionValidator extends DescriptionValidator<DeployApp
       return
     }
 
-    if (!description.repositoryUrl.startsWith("gs://")) {
+    boolean isContainerDeployment = description.containerImageUrl?.trim()
+
+    if (!isContainerDeployment && !description.repositoryUrl.startsWith("gs://")) {
       if (!helper.validateGitCredentials(description.credentials.gitCredentials,
                                          description.gitCredentialType,
                                          description.credentials.name,
@@ -49,10 +51,15 @@ class DeployAppengineDescriptionValidator extends DescriptionValidator<DeployApp
       helper.validateNotEmpty(description.branch, "branch")
     }
 
+    if (isContainerDeployment) {
+      helper.validateNotEmpty(description.containerImageUrl, "containerImageUrl")
+    } else {
+      helper.validateNotEmpty(description.repositoryUrl, "repositoryUrl")
+    }
+
     helper.validateApplication(description.application, "application")
     helper.validateStack(description.stack, "stack")
     helper.validateDetails(description.freeFormDetails, "freeFormDetails")
-    helper.validateNotEmpty(description.repositoryUrl, "repositoryUrl")
 
     if (!(description.configFilepaths || description.configFiles)) {
       helper.validateNotEmpty(description.configFilepaths, "configFilepaths")
diff --git a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicator.groovy b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicator.groovy
index e6c2af16391..45f10119cd9 100644
--- a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicator.groovy
+++ b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicator.groovy
@@ -18,6 +18,9 @@ package com.netflix.spinnaker.clouddriver.aws.health
 
 import com.amazonaws.AmazonClientException
 import com.amazonaws.AmazonServiceException
+import com.amazonaws.services.ec2.model.AmazonEC2Exception
+import com.netflix.spectator.api.Counter
+import com.netflix.spectator.api.Registry
 import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider
 import com.netflix.spinnaker.clouddriver.aws.security.NetflixAmazonCredentials
 import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider
@@ -32,6 +35,7 @@ import org.springframework.scheduling.annotation.Scheduled
 import org.springframework.stereotype.Component
 import org.springframework.web.bind.annotation.ResponseStatus
 
+import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.atomic.AtomicReference
 
 @Component
@@ -39,22 +43,37 @@ class AmazonHealthIndicator implements HealthIndicator {
 
   private static final Logger LOG = LoggerFactory.getLogger(AmazonHealthIndicator)
 
-  @Autowired
-  AccountCredentialsProvider accountCredentialsProvider
+  private final AccountCredentialsProvider accountCredentialsProvider
+  private final AmazonClientProvider amazonClientProvider
+
+  private final AtomicReference<Exception> lastException = new AtomicReference<>(null)
+  private final AtomicReference<Boolean> hasInitialized = new AtomicReference<>(null)
+
+  private final AtomicLong errors;
 
   @Autowired
-  AmazonClientProvider amazonClientProvider
+  AmazonHealthIndicator(AccountCredentialsProvider accountCredentialsProvider,
+                        AmazonClientProvider amazonClientProvider,
+                        Registry registry) {
+    this.accountCredentialsProvider = accountCredentialsProvider
+    this.amazonClientProvider = amazonClientProvider
 
-  private final AtomicReference<Exception> lastException = new AtomicReference<>(null)
+    this.errors = registry.gauge("health.amazon.errors", new AtomicLong(0))
+  }
 
   @Override
   Health health() {
+    if (hasInitialized.get() == Boolean.TRUE) {
+      // avoid being marked unhealthy once connectivity to all accounts has been verified at least once
+      return new Health.Builder().up().build()
+    }
+
     def ex = lastException.get()
     if (ex) {
       throw ex
     }
 
-    new Health.Builder().up().build()
+    return new Health.Builder().unknown().build()
   }
 
   @Scheduled(fixedDelay = 120000L)
@@ -71,13 +90,16 @@ class AmazonHealthIndicator implements HealthIndicator {
           }
           ec2.describeAccountAttributes()
         } catch (AmazonServiceException e) {
-          throw new AmazonUnreachableException(e)
+          throw new AmazonUnreachableException("Failed to describe account attributes for '${credentials.name}'",  e)
         }
       }
+      hasInitialized.set(Boolean.TRUE)
       lastException.set(null)
+      errors.set(0)
     } catch (Exception ex) {
       LOG.error "Unhealthy", ex
       lastException.set(ex)
+      errors.set(1)
     }
   }
 
diff --git a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicatorSpec.groovy b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicatorSpec.groovy
index 3decf083f07..5d1c745e0ef 100644
--- a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicatorSpec.groovy
+++ b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/health/AmazonHealthIndicatorSpec.groovy
@@ -19,12 +19,16 @@ package com.netflix.spinnaker.clouddriver.aws.health
 import com.amazonaws.AmazonServiceException
 import com.amazonaws.services.ec2.AmazonEC2
 import com.amazonaws.services.ec2.model.DescribeAccountAttributesResult
+import com.netflix.spectator.api.Counter
+import com.netflix.spectator.api.Registry
 import com.netflix.spinnaker.clouddriver.aws.TestCredential
 import com.netflix.spinnaker.clouddriver.aws.security.AmazonClientProvider
 import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider
 import org.springframework.boot.actuate.health.Status
 import spock.lang.Specification
 
+import java.util.concurrent.atomic.AtomicLong
+
 class AmazonHealthIndicatorSpec extends Specification {
 
   def "health fails when amazon appears unreachable"() {
@@ -40,7 +44,12 @@ class AmazonHealthIndicatorSpec extends Specification {
     def mockAmazonClientProvider = Stub(AmazonClientProvider) {
       getAmazonEC2(*_) >> mockEc2
     }
-    def indicator = new AmazonHealthIndicator(accountCredentialsProvider: holder, amazonClientProvider: mockAmazonClientProvider)
+    def counter = new AtomicLong(0)
+    def mockRegistry = Stub(Registry) {
+      gauge(_, _) >> counter
+    }
+
+    def indicator = new AmazonHealthIndicator(holder, mockAmazonClientProvider, mockRegistry)
 
     when:
     indicator.checkHealth()
@@ -48,6 +57,7 @@ class AmazonHealthIndicatorSpec extends Specification {
 
     then:
     thrown AmazonHealthIndicator.AmazonUnreachableException
+    counter.get() == 1
   }
 
   def "health succeeds when amazon is reachable"() {
@@ -63,7 +73,13 @@ class AmazonHealthIndicatorSpec extends Specification {
     def mockAmazonClientProvider = Stub(AmazonClientProvider) {
       getAmazonEC2(*_) >> mockEc2
     }
-    def indicator = new AmazonHealthIndicator(accountCredentialsProvider: holder, amazonClientProvider: mockAmazonClientProvider)
+
+    def counter = new AtomicLong(0)
+    def mockRegistry = Stub(Registry) {
+      gauge(_, _) >> counter
+    }
+
+    def indicator = new AmazonHealthIndicator(holder, mockAmazonClientProvider, mockRegistry)
 
     when:
     indicator.checkHealth()
@@ -71,5 +87,6 @@ class AmazonHealthIndicatorSpec extends Specification {
 
     then:
     health.status == Status.UP
+    counter.get() == 0
   }
 }