Skip to content

Commit

Permalink
[Code] add NodeRepositoriesService to watch new repositories on local…
Browse files Browse the repository at this point in the history
… node (#44677) (#44966)

* add NodeRepositoriesService to watch new repositories on local node.
* catch exceptions from cluster state listener
* add TODO for repo clean ups
* do not start reclone scheduler in cluster mode
  • Loading branch information
Yang Yang authored Sep 6, 2019
1 parent cffc08a commit 5d6ab10
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import { ClusterNodeEndpoint } from './cluster_node_endpoint';
* - serve request locally if the requested resource is on the local node, otherwise reject it
*/
export class ClusterNodeAdapter implements ServiceHandlerAdapter {
private readonly clusterService: ClusterService;
private readonly clusterMembershipService: ClusterMembershipService;
readonly clusterService: ClusterService;
readonly clusterMembershipService: ClusterMembershipService;
private readonly schedulerService: ResourceSchedulerService;
private readonly handlers: Map<any, any> = new Map<any, any>();
// used to forward requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class ClusterResourceLocator implements ResourceLocator {
constructor(
private readonly clusterService: ClusterService,
private readonly clusterMembershipService: ClusterMembershipService,
// @ts-ignore
private readonly schedulerService: ResourceSchedulerService
) {}

Expand Down Expand Up @@ -54,12 +55,12 @@ export class ClusterResourceLocator implements ResourceLocator {
);
}

/**
* Return undefined to let NodeRepositoriesService enqueue the clone job in cluster mode.
*/
async allocate(req: Request, resource: string): Promise<Endpoint | undefined> {
// make the cluster service synchronize the meta data and allocate new resources to nodes
await this.clusterService.pollClusterState();
// allocate the repository to nodes
await this.schedulerService.allocateUnassigned();
// the resource should be assigned to a node for now, if possible
return this.locate(req, resource);
return undefined;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import util from 'util';
import { ClusterMetadata } from './cluster_meta';
import { EsClient } from '../../lib/esqueue';
import { RepositoryObjectClient } from '../../search';
Expand Down Expand Up @@ -79,7 +80,11 @@ export class ClusterService {

private async callClusterStateListeners(event: ClusterStateEvent) {
for (const applier of this.clusterStateListeners) {
await applier.onClusterStateChanged(event);
try {
await applier.onClusterStateChanged(event);
} catch (e) {
this.logger.error(`Failed to apply cluster state ${util.inspect(event)}`);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { Logger } from '../../log';
import { ConsoleLoggerFactory } from '../../utils/console_logger_factory';
import { NodeRepositoriesService } from './node_repositories_service';
import { ClusterService } from './cluster_service';
import { ClusterMembershipService } from './cluster_membership_service';
import { CodeNode, CodeNodes } from './code_nodes';
import { emptyAsyncFunc } from '../../test_utils';
import { CloneWorker } from '../../queue';
import { ClusterStateEvent } from './cluster_state_event';
import { ClusterState } from './cluster_state';
import { ClusterMetadata } from './cluster_meta';
import { Repository } from '../../../model';
import { ResourceAssignment, RoutingTable } from './routing_table';

const log: Logger = new ConsoleLoggerFactory().getLogger(['test']);

afterEach(() => {
sinon.restore();
});

const cloneWorker = ({
enqueueJob: emptyAsyncFunc,
} as any) as CloneWorker;

const clusterService = {} as ClusterService;

const testNodes = [
{ id: 'node1', address: 'http://node1' } as CodeNode,
{ id: 'node2', address: 'http://node2' } as CodeNode,
];

const testRepos = [
{ uri: 'test1', url: 'http://test1' } as Repository,
{ uri: 'test2', url: 'http://test2' } as Repository,
];

test('Enqueue clone job after new repository is added to the local node', async () => {
const enqueueJobSpy = sinon.spy(cloneWorker, 'enqueueJob');

const clusterMembershipService = {
localNode: testNodes[0],
} as ClusterMembershipService;

const nodeService = new NodeRepositoriesService(
log,
clusterService,
clusterMembershipService,
cloneWorker
);

// event with no new repositories
let event = new ClusterStateEvent(ClusterState.empty(), ClusterState.empty());
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.called).toBeFalsy();
expect(nodeService.localRepos.size).toBe(0);

// event with a new repository
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.calledOnce).toBeTruthy();
expect(nodeService.localRepos.size).toBe(1);

// event with removed repository
event = new ClusterStateEvent(ClusterState.empty(), event.current);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.calledOnce).toBeTruthy();
expect(nodeService.localRepos.size).toBe(0);

// event with two added repositories
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0], testRepos[1]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
{ nodeId: testNodes[0].id, resource: testRepos[1].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(3);
expect(nodeService.localRepos.size).toBe(2);

// event with removed repository
event = new ClusterStateEvent(ClusterState.empty(), event.current);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(3);
expect(nodeService.localRepos.size).toBe(0);

// event with two added repositories, one for the other node
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0], testRepos[1]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
{ nodeId: testNodes[1].id, resource: testRepos[1].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(4);
expect(nodeService.localRepos.size).toBe(1);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import util from 'util';
import { ClusterService, ClusterStateListener } from './cluster_service';
import { ClusterStateEvent } from './cluster_state_event';
import { ClusterMembershipService } from './cluster_membership_service';
import { CloneWorker } from '../../queue';
import { Repository, RepositoryUri } from '../../../model';
import { Logger } from '../../log';
import { RepoState } from '../../../public/actions';

export class NodeRepositoriesService implements ClusterStateListener {
// visible for test
readonly localRepos = new Map<RepositoryUri, LocalRepository>();
private readonly localNodeId = this.clusterMembershipService.localNode.id;

constructor(
private readonly log: Logger,
private readonly clusterService: ClusterService,
private readonly clusterMembershipService: ClusterMembershipService,
private readonly cloneWorker: CloneWorker
) {}

public async start() {
/**
* we can add locally exists repositories to localRepos when the service is started to avoid unnecessarily add clone
* tasks for them, but for now it's OK because clone job is idempotent.
*/
this.clusterService.addClusterStateListener(this);
}

public async stop() {}

async onClusterStateChanged(event: ClusterStateEvent): Promise<void> {
// compare repositories in the cluster state with repositories in the local node, and remove
const repos = event.current.getNodeRepositories(this.clusterMembershipService.localNode.id);
const localNewRepos = repos.filter(repo => !this.localRepos.has(repo.uri));
const localRemovedRepos = Array.from(this.localRepos.values()).filter(
repo =>
event.current.routingTable.getNodeIdByRepositoryURI(repo.metadata.uri) !== this.localNodeId
);
for (const localNewRepo of localNewRepos) {
this.log.info(
`Repository added to node [${this.localNodeId}]: ${util.inspect(localNewRepo)}`
);
await this.cloneWorker.enqueueJob({ url: localNewRepo.url }, {});
this.localRepos.set(localNewRepo.uri, {
metadata: localNewRepo,
currentState: RepoState.CLONING,
});
}
// TODO remove the stale local repo after the Kibana HA is ready
for (const localRemovedRepo of localRemovedRepos) {
this.log.info(
`Repository removed from node [${this.localNodeId}]: ${util.inspect(
localRemovedRepo.metadata
)}`
);
this.localRepos.delete(localRemovedRepo.metadata.uri);
}
}
}

interface LocalRepository {
metadata: Repository;
currentState: RepoState;
}
9 changes: 6 additions & 3 deletions x-pack/legacy/plugins/code/server/init_workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ export function initWorkers(
);

// Initialize schedulers.
const cloneScheduler = new CloneScheduler(cloneWorker, serverOptions, esClient, log);
const updateScheduler = new UpdateScheduler(updateWorker, serverOptions, esClient, log);
const indexScheduler = new IndexScheduler(indexWorker, serverOptions, esClient, log);
updateScheduler.start();
indexScheduler.start();
// Check if the repository is local on the file system.
// This should be executed once at the startup time of Kibana.
cloneScheduler.schedule();
return { indexScheduler, updateScheduler };
// Ignored in cluster mode, leave it to the node level control loop
if (!serverOptions.clusterEnabled) {
const cloneScheduler = new CloneScheduler(cloneWorker, serverOptions, esClient, log);
cloneScheduler.schedule();
}
return { indexScheduler, updateScheduler, cloneWorker, deleteWorker, indexWorker, updateWorker };
}
24 changes: 21 additions & 3 deletions x-pack/legacy/plugins/code/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import { initLocalService } from './init_local';
import { initQueue } from './init_queue';
import { initWorkers } from './init_workers';
import { ClusterNodeAdapter } from './distributed/cluster/cluster_node_adapter';
import { NodeRepositoriesService } from './distributed/cluster/node_repositories_service';

export class CodePlugin {
private isCodeNode = false;
Expand All @@ -66,6 +67,7 @@ export class CodePlugin {
private updateScheduler: UpdateScheduler | null = null;
private lspService: LspService | null = null;
private codeServices: CodeServices | null = null;
private nodeService: NodeRepositoriesService | null = null;

constructor(initializerContext: PluginInitializerContext) {
this.log = {} as Logger;
Expand Down Expand Up @@ -153,10 +155,15 @@ export class CodePlugin {
server,
this.log
);
const codeServices = new CodeServices(
new ClusterNodeAdapter(codeServerRouter, this.log, this.serverOptions, esClient)
const clusterNodeAdapter = new ClusterNodeAdapter(
codeServerRouter,
this.log,
this.serverOptions,
esClient
);

const codeServices = new CodeServices(clusterNodeAdapter);

this.queue = initQueue(server, this.log, esClient);

const { gitOps, lspService } = initLocalService(
Expand All @@ -169,7 +176,7 @@ export class CodePlugin {
);
this.lspService = lspService;
this.gitOps = gitOps;
const { indexScheduler, updateScheduler } = initWorkers(
const { indexScheduler, updateScheduler, cloneWorker } = initWorkers(
server,
this.log,
esClient,
Expand All @@ -182,6 +189,14 @@ export class CodePlugin {
this.indexScheduler = indexScheduler;
this.updateScheduler = updateScheduler;

this.nodeService = new NodeRepositoriesService(
this.log,
clusterNodeAdapter.clusterService,
clusterNodeAdapter.clusterMembershipService,
cloneWorker
);
await this.nodeService.start();

// Execute index version checking and try to migrate index data if necessary.
await tryMigrateIndices(esClient, this.log);

Expand Down Expand Up @@ -240,6 +255,9 @@ export class CodePlugin {
if (this.codeServices) {
await this.codeServices.stop();
}
if (this.nodeService) {
await this.nodeService.stop();
}
}

private async initNonCodeNode(url: string, core: CoreSetup) {
Expand Down

0 comments on commit 5d6ab10

Please sign in to comment.