Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
wk8 committed Oct 6, 2018
1 parent 7dc549e commit 88d7613
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 54 deletions.
97 changes: 57 additions & 40 deletions manager/deallocator/deallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func registerServiceLevelResource(resourceDeallocator resourceDeallocator) {
// the Deallocator waits for services to fully shutdown (ie no containers left)
// and then proceeds to deallocate service-level resources (e.g. networks),
// and finally services themselves
// in particular, the Deallocator should be the only place where services, or
// service-level resources, are ever deleted!
type Deallocator struct {
store *store.MemoryStore

Expand Down Expand Up @@ -65,48 +67,36 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
i++
}

var allServices []*api.Service
resourceLists := make(map[reflect.Type][]serviceLevelResource)

eventsChan, eventsChanCancel, err := store.ViewAndWatch(deallocator.store,
func(readTx store.ReadTx) error {
func(readTx store.ReadTx) (err error) {
// look for services that are marked for deletion
// there's no index on the `PendingDelete` field,
// so we just iterate over all of them and filter manually
// this is okay since we only do this at leadership change
allServices, err := store.FindServices(readTx, store.All)
allServices, err = store.FindServices(readTx, store.All)

if err != nil {
log.G(ctx).WithError(err).Error("failed to list services in deallocator init")
return err
}

// let's populate our internal taskCounts
for _, service := range allServices {
if service.PendingDelete {
if err = deallocator.processPendingDeleteService(ctx, readTx, service); err != nil {
return err
}
}
}

// now we also need to look at all existing service-level resources
// that are marked for deletion
for _, resourceDeallocator := range resourceDeallocators {
for key, resourceDeallocator := range resourceDeallocators {
allResources, err := resourceDeallocator.enumerator(readTx)

if err != nil {
log.G(ctx).WithError(err).Errorf("failed to list %vs in deallocator init", resourceDeallocator.name)
return err
}

for _, resource := range allResources {
if resource.isPendingDelete() {
if err := deallocator.maybeDeallocateResource(ctx, nil, resourceDeallocator, resource.id(), nil); err != nil {
return err
}
}
}
resourceLists[key] = allResources
}

return nil
return
},
events...)

Expand All @@ -117,10 +107,29 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
}

defer func() {
close(deallocator.doneChan)
eventsChanCancel()
// TODO wkpo euh la on s'en sert pas!
close(deallocator.doneChan)
}()

// now let's populate our internal taskCounts
for _, service := range allServices {
if service.PendingDelete {
deallocator.processPendingDeleteService(ctx, service)
}
}

// and deallocate resources that are pending for deletion and aren't used any more
for key, resourceList := range resourceLists {
resourceDeallocator := resourceDeallocators[key]

for _, resource := range resourceList {
if resource.isPendingDelete() {
deallocator.maybeDeallocateResource(ctx, nil, resourceDeallocator, resource.id(), nil)
}
}
}

// now we just need to wait for events
for {
select {
Expand All @@ -134,21 +143,21 @@ func (deallocator *Deallocator) Run(ctx context.Context) error {
}
}

func (deallocator *Deallocator) processPendingDeleteService(ctx context.Context, readTx store.ReadTx, service *api.Service) error {
// TODO wkpo see the comment on TaskReaper.Stop()
// and TODO wkpo where is this used???
func (deallocator *Deallocator) Stop() {
close(deallocator.stopChan)
}

func (deallocator *Deallocator) processPendingDeleteService(ctx context.Context, service *api.Service) error {
var (
tasks []*api.Task
err error
)

viewFunc := func(r store.ReadTx) {
tasks, err = store.FindTasks(r, store.ByServiceID(service.ID))
}

if readTx == nil {
deallocator.store.View(viewFunc)
} else {
viewFunc(readTx)
}
deallocator.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
})

if err != nil {
log.G(ctx).WithError(err).Errorf("failed to retrieve the list of tasks for service %v", service.ID)
Expand Down Expand Up @@ -176,8 +185,10 @@ func (deallocator *Deallocator) deallocateService(ctx context.Context, service *

// then all of its service-level resources, provided no other service uses them
for _, resourceDeallocator := range resourceDeallocators {
for _, resourceID := range resourceDeallocator.resourceIDsExtractor(service) {
deallocator.maybeDeallocateResource(ctx, tx, resourceDeallocator, resourceID, ignoreServiceID)
for _, resource := range resourceDeallocator.resourcesExtractor(tx, service) {
if resource.isPendingDelete() {
deallocator.maybeDeallocateResource(ctx, tx, resourceDeallocator, resource.id(), ignoreServiceID)
}
}
}

Expand All @@ -192,9 +203,9 @@ func (deallocator *Deallocator) deallocateService(ctx context.Context, service *

// proceeds to deallocating a resource if there no longer are any
// services using it
func (deallocator *Deallocator) maybeDeallocateResource(ctx context.Context, tx store.Tx, resourceDeallocator resourceDeallocator, resourceID string, ignoreServiceID *string) error {
func (deallocator *Deallocator) maybeDeallocateResource(ctx context.Context, tx store.Tx, resourceDeallocator resourceDeallocator, resourceID string, ignoreServiceID *string) (err error) {
updateFunc := func(t store.Tx) error {
services, err := resourceDeallocator.servicesLocator(tx, resourceID)
services, err := resourceDeallocator.servicesLocator(t, resourceID)

if err != nil {
log.G(ctx).WithError(err).Errorf("could not fetch services using %v ID %v", resourceDeallocator.name, resourceID)
Expand All @@ -205,15 +216,21 @@ func (deallocator *Deallocator) maybeDeallocateResource(ctx context.Context, tx
len(services) == 1 && ignoreServiceID != nil && services[0].ID == *ignoreServiceID

if noMoreServices {
return resourceDeallocator.deleter(tx, resourceID)
return resourceDeallocator.deleter(t, resourceID)
}
return nil
}

if tx == nil {
return deallocator.store.Update(updateFunc)
err = deallocator.store.Update(updateFunc)
} else {
err = updateFunc(tx)
}
return updateFunc(tx)

if err != nil {
log.G(ctx).WithError(err).Errorf("DB error when deallocating %v ID %v", resourceDeallocator.name, resourceID)
}
return
}

func (deallocator *Deallocator) processNewEvent(ctx context.Context, event events.Event) error {
Expand All @@ -224,13 +241,13 @@ func (deallocator *Deallocator) processNewEvent(ctx context.Context, event event
if serviceWithCount, present := deallocator.services[serviceID]; present {
if serviceWithCount.taskCount <= 1 {
delete(deallocator.services, serviceID)
return deallocator.processPendingDeleteService(ctx, nil, serviceWithCount.service)
return deallocator.processPendingDeleteService(ctx, serviceWithCount.service)
}
serviceWithCount.taskCount--
}
return nil
case api.EventUpdateService:
return deallocator.processPendingDeleteService(ctx, nil, typedEvent.Service)
return deallocator.processPendingDeleteService(ctx, typedEvent.Service)
default:
// must be an event handled by a resource deallocator
resourceDeallocator, ok := resourceDeallocators[reflect.TypeOf(event)]
Expand Down
142 changes: 142 additions & 0 deletions manager/deallocator/deallocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package deallocator

import (
"context"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/store"
"github.com/stretchr/testify/require"
)

func TestDeallocatorInit(t *testing.T) {
// start up the memory store
s := store.NewMemoryStore(nil)
require.NotNil(t, s)
defer s.Close()

// create a service that's pending deletion, with no tasks remaining
// this one should be deleted by the deallocator
// additionally, that service is using a network that's also marked for
// deletion, and another that's not
service1ID := "service1"
network1ID := "network1"
network2ID := "network2"
service1 := newService(service1ID, true, network1ID, network2ID)
network1 := newNetwork(network1ID, true)
network2 := newNetwork(network2ID, false)

// now let's create another service that's also pending deletion, but still
// has one task associated with it (in any state) - and also uses a network
// that's also marked for deletion
// none of those should get deleted
service2ID := "service2"
network3ID := "network3"
service2 := newService(service2ID, true, network3ID)
network3 := newNetwork(network3ID, true)
task1 := &api.Task{
ID: "task1",
ServiceID: service2ID,
}

// let's also have a network that's pending deletion,
// but isn't used by any existing service
// this one should be gone after the init
network4ID := "network4"
network4 := newNetwork(network4ID, true)

// and finally a network that's not pending deletion, not
// used by any service
network5ID := "network5"
network5 := newNetwork(network5ID, false)

err := s.Update(func(tx store.Tx) error {
for _, service := range []*api.Service{service1, service2} {
require.NoError(t, store.CreateService(tx, service))
}
for _, network := range []*api.Network{network1, network2, network3, network4, network5} {
require.NoError(t, store.CreateNetwork(tx, network))
}
require.NoError(t, store.CreateTask(tx, task1))
return nil
})
require.NoError(t, err, "Error setting up test fixtures")

// create and start the deallocator
deallocator := New(s)

completed := make(chan struct{})
var returnValue error
go func() {
returnValue = deallocator.Run(context.Background())
// allows checking that `Run` does return after we've stopped
close(completed)
}()

// and then stop it immediately - we're just interested in the init
// phase for this test
deallocator.Stop()

// let's wait for it to stop - shouldn't take too long
timeout := time.NewTimer(5 * time.Second) // TODO wkpo
select {
case <-completed:
timeout.Stop()
case <-timeout.C:
t.Error("Waited for too long for the deallocator to stop, error from run")
}
require.NoError(t, returnValue)

// now let's check that the DB is in the state we expect
s.View(func(tx store.ReadTx) {
assert.Nil(t, store.GetService(tx, service1ID))
assert.Nil(t, store.GetNetwork(tx, network1ID))
assert.NotNil(t, store.GetNetwork(tx, network2ID))

assert.NotNil(t, store.GetService(tx, service2ID))
assert.NotNil(t, store.GetNetwork(tx, network3ID))
})
}

func newService(id string, pendingDelete bool, networkIDs ...string) *api.Service {
return &api.Service{
ID: id,
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: id,
},
Task: api.TaskSpec{
Networks: newNetworkConfigs(networkIDs...),
},
},
PendingDelete: pendingDelete,
}
}

func newNetwork(id string, pendingDelete bool) *api.Network {
return &api.Network{
ID: id,
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: id,
},
},
PendingDelete: pendingDelete,
}
}

func newNetworkConfigs(networkIDs ...string) []*api.NetworkAttachmentConfig {
networks := make([]*api.NetworkAttachmentConfig, len(networkIDs))

for i := 0; i < len(networkIDs); i++ {
networks[i] = &api.NetworkAttachmentConfig{
Target: networkIDs[i],
}
}

return networks
}

// TODO wkpo un test pour la vieille syntaxe de networks?
20 changes: 9 additions & 11 deletions manager/deallocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,18 @@ func init() {
}

wrappedNetworks = make([]serviceLevelResource, len(networks))
i := 0
for _, network := range networks {
wrappedNetworks[i] = networkWrapper{network}
i++
for i := 0; i < len(networks); i++ {
wrappedNetworks[i] = networkWrapper{networks[i]}
}
return
},
factory: func(event events.Event) serviceLevelResource {
return networkWrapper{event.(api.EventUpdateNetwork).Network}
},
servicesLocator: func(tx store.Tx, networkID string) ([]*api.Service, error) {
servicesLocator: func(tx store.ReadTx, networkID string) ([]*api.Service, error) {
return store.FindServices(tx, store.ByReferencedNetworkID(networkID))
},
resourceIDsExtractor: func(service *api.Service) (networkIDs []string) {
resourcesExtractor: func(tx store.ReadTx, service *api.Service) []serviceLevelResource {
spec := service.Spec
// see https://github.com/docker/swarmkit/blob/e2aafdd3453d2ab103dd97364f79ea6b857f9446/api/specs.proto#L80-L84
// we really should have a helper function on services to do this...
Expand All @@ -51,13 +49,13 @@ func init() {
networkConfigs = spec.Networks
}

networkIDs = make([]string, len(networkConfigs))
i := 0
networks := make([]serviceLevelResource, 0, len(networkConfigs))
for _, networkConfig := range networkConfigs {
networkIDs[i] = networkConfig.Target
i++
if network := store.GetNetwork(tx, networkConfig.Target); network != nil {
networks = append(networks, networkWrapper{network})
}
}
return
return networks
},
deleter: func(tx store.Tx, networkID string) error {
return store.DeleteNetwork(tx, networkID)
Expand Down
6 changes: 3 additions & 3 deletions manager/deallocator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ type resourceDeallocator struct {

// should return all services currently using the resource
// with the given ID regardless of their deletion status
servicesLocator func(tx store.Tx, resourceID string) ([]*api.Service, error)
servicesLocator func(tx store.ReadTx, resourceID string) ([]*api.Service, error)

// should extract the IDs of the resources contained in a service
resourceIDsExtractor func(service *api.Service) (resourceIDs []string)
// should extract the resources contained in a service
resourcesExtractor func(tx store.ReadTx, service *api.Service) (resources []serviceLevelResource)

// should delete the resource with the given ID
deleter func(tx store.Tx, resourceID string) error
Expand Down

0 comments on commit 88d7613

Please sign in to comment.