Skip to content

Commit

Permalink
fix: Use Docker event listener to detect microservices (#1499)
Browse files Browse the repository at this point in the history
Signed-off-by: Rastislav Szabo <raszabo@cisco.com>
  • Loading branch information
rastislavs authored and ondrej-fabry committed Oct 7, 2019
1 parent f387a16 commit f15298a
Showing 1 changed file with 77 additions and 126 deletions.
203 changes: 77 additions & 126 deletions plugins/linux/nsplugin/descriptor/microservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ const (
// MicroserviceDescriptorName is the name of the descriptor for microservices.
MicroserviceDescriptorName = "microservice"

// how often in seconds to refresh the microservice state data
dockerRefreshPeriod = 3 * time.Second
dockerRetryPeriod = 5 * time.Second
// docker API keywords
dockerTypeContainer = "container"
dockerStateRunning = "running"
dockerActionStart = "start"
dockerActionStop = "stop"
)

// MicroserviceDescriptor watches Docker and notifies KVScheduler about newly
Expand Down Expand Up @@ -168,91 +170,6 @@ func (d *MicroserviceDescriptor) GetMicroserviceStateData(msLabel string) (ms *M
return ms, found
}

// handleMicroservices handles microservice changes.
func (d *MicroserviceDescriptor) handleMicroservices(ctx *microserviceCtx) {
var err error
var newest int64
var containers []docker.APIContainers
var nextCreated []string

// First check if any microservice has terminated.
d.msStateLock.Lock()
for container := range d.microServiceByID {
details, err := d.dockerClient.InspectContainer(container)
if err != nil || !details.State.Running {
d.processTerminatedMicroservice(container)
}
}
d.msStateLock.Unlock()

// Now check if previously created containers have transitioned to the state "running".
for _, container := range ctx.created {
details, err := d.dockerClient.InspectContainer(container)
if err == nil {
if details.State.Running {
d.detectMicroservice(details)
} else if details.State.Status == "created" {
nextCreated = append(nextCreated, container)
}
} else {
d.log.Debugf("Inspect container ID %v failed: %v", container, err)
}
}
ctx.created = nextCreated

// Inspect newly created containers
listOpts := docker.ListContainersOptions{
All: true,
Filters: map[string][]string{},
}
// List containers and filter all older than 'since' ID
if ctx.since != "" {
listOpts.Filters["since"] = []string{ctx.since}
}
containers, err = d.dockerClient.ListContainers(listOpts)
if err != nil {
// If 'since' container was not found, list all containers (404 is required to support older docker version)
if dockerErr, ok := err.(*docker.Error); ok && (dockerErr.Status == 500 || dockerErr.Status == 404) {
// Reset filter and list containers again
d.log.Debugf("clearing 'since' %s", ctx.since)
ctx.since = ""
delete(listOpts.Filters, "since")
containers, err = d.dockerClient.ListContainers(listOpts)
}
if err != nil {
// If there is other error, return it
d.log.Errorf("Error listing docker containers: %v", err)
return
}
}

for _, container := range containers {
if ctx.lastInspected != 0 {
d.log.Debugf("processing new container %v with state %v", container.ID, container.State)
}
if container.State == "running" && container.Created > ctx.lastInspected {
// Inspect the container to get the list of defined environment variables.
details, err := d.dockerClient.InspectContainer(container.ID)
if err != nil {
d.log.Debugf("Inspect container %v failed: %v", container.ID, err)
continue
}
d.detectMicroservice(details)
}
if container.State == "created" {
ctx.created = append(ctx.created, container.ID)
}
if container.Created > newest {
newest = container.Created
ctx.since = container.ID
}
}

if newest > ctx.lastInspected {
ctx.lastInspected = newest
}
}

// detectMicroservice inspects container to see if it is a microservice.
// If microservice is detected, processNewMicroservice() is called to process it.
func (d *MicroserviceDescriptor) detectMicroservice(container *docker.Container) {
Expand Down Expand Up @@ -301,7 +218,6 @@ func (d *MicroserviceDescriptor) processNewMicroservice(microserviceLabel string
Key: nsmodel.MicroserviceKey(ms.Label),
Value: &prototypes.Empty{},
Metadata: nil,

})
}
}
Expand Down Expand Up @@ -331,6 +247,36 @@ func (d *MicroserviceDescriptor) processTerminatedMicroservice(id string) {
}
}

// setStateInSync sets internal state to "in sync" and signals the state transition.
func (d *MicroserviceDescriptor) setStateInSync() {
d.msStateLock.Lock()
d.msStateInSync = true
d.msStateLock.Unlock()
d.msStateInSyncCond.Broadcast()
}

// processStartedContainer processes a started Docker container - inspects whether it is a microservice.
// If it is, notifies scheduler about a new microservice.
func (d *MicroserviceDescriptor) processStartedContainer(id string) {
container, err := d.dockerClient.InspectContainer(id)
if err != nil {
d.log.Warnf("Error by inspecting container %s: %v", id, err)
return
}
d.detectMicroservice(container)
}

// processStoppedContainer processes a stopped Docker container - if it is a microservice,
// notifies scheduler about its termination.
func (d *MicroserviceDescriptor) processStoppedContainer(id string) {
d.msStateLock.Lock()
defer d.msStateLock.Unlock()

if _, found := d.microServiceByID[id]; found {
d.processTerminatedMicroservice(id)
}
}

// trackMicroservices is running in the background and maintains a map of microservice labels to container info.
func (d *MicroserviceDescriptor) trackMicroservices(ctx context.Context) {
d.wg.Add(1)
Expand All @@ -339,51 +285,56 @@ func (d *MicroserviceDescriptor) trackMicroservices(ctx context.Context) {
d.log.Debugf("Microservice tracking ended")
}()

msCtx := &microserviceCtx{}
// subscribe to Docker events
listener := make(chan *docker.APIEvents, 10)
err := d.dockerClient.AddEventListener(listener)
if err != nil {
d.log.Warnf("Failed to add Docker event listener: %v", err)
d.setStateInSync() // empty set of microservices is considered
return
}

// list currently running containers
listOpts := docker.ListContainersOptions{
All: true,
}
containers, err := d.dockerClient.ListContainers(listOpts)
if err != nil {
d.log.Warnf("Failed to list Docker containers: %v", err)
d.setStateInSync() // empty set of microservices is considered
return
}
for _, container := range containers {
if container.State == dockerStateRunning {
details, err := d.dockerClient.InspectContainer(container.ID)
if err != nil {
d.log.Warnf("Error by inspecting container %s: %v", container.ID, err)
continue
}
d.detectMicroservice(details)
}
}

var clientOk bool
// mark state data as in-sync
d.setStateInSync()

timer := time.NewTimer(0)
// process Docker events
for {
select {
case <-timer.C:
if err := d.dockerClient.Ping(); err != nil {
if clientOk {
d.log.Errorf("Docker ping check failed: %v", err)
}
clientOk = false

// Sleep before another retry.
timer.Reset(dockerRetryPeriod)
break
case ev, ok := <-listener:
if !ok {
return
}

if !clientOk {
d.log.Infof("Docker ping check OK")
/*if info, err := d.dockerClient.Info(); err != nil {
d.log.Errorf("Retrieving docker info failed: %v", err)
timer.Reset(dockerRetryPeriod)
continue
} else {
d.log.Infof("Docker connection established: server version: %v (%v %v %v)",
info.ServerVersion, info.OperatingSystem, info.Architecture, info.KernelVersion)
}*/
if ev.Type == dockerTypeContainer {
if ev.Action == dockerActionStart {
d.processStartedContainer(ev.Actor.ID)
}
if ev.Action == dockerActionStop {
d.processStoppedContainer(ev.Actor.ID)
}
}
clientOk = true

d.handleMicroservices(msCtx)

// Sleep before another refresh.
timer.Reset(dockerRefreshPeriod)
case <-d.ctx.Done():
return
}

// mark state data as in-sync - if connection to docker is failing,
// empty set of microservices is considered
d.msStateLock.Lock()
d.msStateInSync = true
d.msStateLock.Unlock()
d.msStateInSyncCond.Broadcast()
}
}

0 comments on commit f15298a

Please sign in to comment.