Skip to content

Commit

Permalink
Add subscriptions support to makeRemoteExectuableSchema (#563)
Browse files Browse the repository at this point in the history
Added subscriptions support to makeRemoteExectuableSchema
  • Loading branch information
schickling authored and freiksenet committed Jan 3, 2018
1 parent 8e8e348 commit 342bece
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 26 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"editor.insertSpaces": true,
"editor.rulers": [110],
"editor.wordWrapColumn": 110,
"prettier.semi": true,
"files.trimTrailingWhitespace": true,
"files.insertFinalNewline": true,
"prettier.singleQuote": true,
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

### vNEXT

* ...
* Added GraphQL Subscriptions support for schema stitching and `makeRemoteExecutableSchema` [PR #563](https://github.com/apollographql/graphql-tools/pull/563)

### v2.15.0

Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@
"dependencies": {
"apollo-utilities": "^1.0.1",
"deprecated-decorator": "^0.1.6",
"graphql-subscriptions": "^0.5.6",
"uuid": "^3.1.0"
},
"peerDependencies": {
"graphql": "^0.11.0 || ^0.12.0"
},
"devDependencies": {
"@types/chai": "4.0.4",
"@types/chai": "4.0.10",
"@types/graphql": "0.11.7",
"@types/mocha": "^2.2.44",
"@types/node": "^8.0.47",
Expand All @@ -77,6 +78,6 @@
"rimraf": "^2.6.2",
"source-map-support": "^0.5.0",
"tslint": "^5.8.0",
"typescript": "2.6.1"
"typescript": "2.6.2"
}
}
2 changes: 1 addition & 1 deletion src/stitching/linkToFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function makePromise<R>(observable: Observable<R>): Promise<R> {
});
}

function execute(
export function execute(
link: ApolloLink,
operation: GraphQLRequest,
): Observable<FetchResult> {
Expand Down
83 changes: 77 additions & 6 deletions src/stitching/makeRemoteExecutableSchema.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { printSchema, Kind, ValueNode } from 'graphql';
import linkToFetcher from './linkToFetcher';

// This import doesn't actually import code - only the types.
// Don't use ApolloLink to actually construct a link here.
import { ApolloLink } from 'apollo-link';
Expand All @@ -20,13 +17,26 @@ import {
ExecutionResult,
print,
buildSchema,
printSchema,
Kind,
ValueNode,
GraphQLResolveInfo,
} from 'graphql';
import linkToFetcher, { execute } from './linkToFetcher';
import isEmptyObject from '../isEmptyObject';
import { IResolvers, IResolverObject } from '../Interfaces';
import { makeExecutableSchema } from '../schemaGenerator';
import resolveParentFromTypename from './resolveFromParentTypename';
import defaultMergedResolver from './defaultMergedResolver';
import { checkResultAndHandleErrors } from './errors';
import { PubSub, PubSubEngine } from 'graphql-subscriptions';

export type ResolverFn = (
rootValue?: any,
args?: any,
context?: any,
info?: GraphQLResolveInfo,
) => AsyncIterator<any>;

export type Fetcher = (operation: FetcherOperation) => Promise<ExecutionResult>;

Expand All @@ -41,10 +51,12 @@ export default function makeRemoteExecutableSchema({
schema,
link,
fetcher,
createPubSub,
}: {
schema: GraphQLSchema | string;
link?: ApolloLink;
fetcher?: Fetcher;
createPubSub?: () => PubSubEngine;
}): GraphQLSchema {
if (!fetcher && link) {
fetcher = linkToFetcher(link);
Expand All @@ -59,13 +71,16 @@ export default function makeRemoteExecutableSchema({
typeDefs = printSchema(schema);
}

// prepare query resolvers
const queryResolvers: IResolverObject = {};
const queryType = schema.getQueryType();
const queries = queryType.getFields();
const queryResolvers: IResolverObject = {};
Object.keys(queries).forEach(key => {
queryResolvers[key] = createResolver(fetcher);
});
let mutationResolvers: IResolverObject = {};

// prepare mutation resolvers
const mutationResolvers: IResolverObject = {};
const mutationType = schema.getMutationType();
if (mutationType) {
const mutations = mutationType.getFields();
Expand All @@ -74,12 +89,31 @@ export default function makeRemoteExecutableSchema({
});
}

// prepare subscription resolvers
const subscriptionResolvers: IResolverObject = {};
const subscriptionType = schema.getSubscriptionType();
if (subscriptionType) {
const pubSub = createPubSub ? createPubSub() : new PubSub();
const subscriptions = subscriptionType.getFields();
Object.keys(subscriptions).forEach(key => {
subscriptionResolvers[key] = {
subscribe: createSubscriptionResolver(key, link, pubSub),
};
});
}

// merge resolvers into resolver map
const resolvers: IResolvers = { [queryType.name]: queryResolvers };

if (!isEmptyObject(mutationResolvers)) {
resolvers[mutationType.name] = mutationResolvers;
}

if (!isEmptyObject(subscriptionResolvers)) {
resolvers[subscriptionType.name] = subscriptionResolvers;
}

// add missing abstract resolvers (scalar, unions, interfaces)
const typeMap = schema.getTypeMap();
const types = Object.keys(typeMap).map(name => typeMap[name]);
for (const type of types) {
Expand Down Expand Up @@ -108,7 +142,8 @@ export default function makeRemoteExecutableSchema({
type instanceof GraphQLObjectType &&
type.name.slice(0, 2) !== '__' &&
type !== queryType &&
type !== mutationType
type !== mutationType &&
type !== subscriptionType
) {
const resolver = {};
Object.keys(type.getFields()).forEach(field => {
Expand Down Expand Up @@ -142,6 +177,42 @@ function createResolver(fetcher: Fetcher): GraphQLFieldResolver<any, any> {
};
}

function createSubscriptionResolver(
name: string,
link: ApolloLink,
pubSub: PubSubEngine,
): ResolverFn {
return (root, args, context, info) => {
const fragments = Object.keys(info.fragments).map(
fragment => info.fragments[fragment],
);
const document = {
kind: Kind.DOCUMENT,
definitions: [info.operation, ...fragments],
};

const operation = {
query: document,
variables: info.variableValues,
context: { graphqlContext: context },
};
const observable = execute(link, operation);

const observer = {
next(value: any) {
pubSub.publish(`remote-schema-${name}`, value.data);
},
error(err: Error) {
pubSub.publish(`remote-schema-${name}`, { errors: [err] });
},
};

observable.subscribe(observer);

return pubSub.asyncIterator(`remote-schema-${name}`);
};
}

function createPassThroughScalar({
name,
description,
Expand Down
48 changes: 48 additions & 0 deletions src/test/testMakeRemoteExecutableSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* tslint:disable:no-unused-expression */

import { expect } from 'chai';
import { forAwaitEach } from 'iterall';
import { GraphQLSchema, ExecutionResult, subscribe, parse } from 'graphql';
import {
subscriptionSchema,
subscriptionPubSubTrigger,
subscriptionPubSub,
makeSchemaRemoteFromLink,
} from '../test/testingSchemas';

describe('remote subscriptions', () => {
let schema: GraphQLSchema;
before(async () => {
schema = await makeSchemaRemoteFromLink(subscriptionSchema);
});

it('should work', done => {
const mockNotification = {
notifications: {
text: 'Hello world',
},
};

const subscription = parse(`
subscription Subscription {
notifications {
text
}
}
`);

let notificationCnt = 0;
subscribe(schema, subscription).then(results =>
forAwaitEach(
results as AsyncIterable<ExecutionResult>,
(result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
!notificationCnt++ ? done() : null;
},
),
);

subscriptionPubSub.publish(subscriptionPubSubTrigger, mockNotification);
});
});
78 changes: 62 additions & 16 deletions src/test/testingSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {
GraphQLSchema,
graphql,
print,
subscribe,
Kind,
GraphQLScalarType,
ValueNode,
ExecutionResult,
} from 'graphql';
import { ApolloLink, Observable } from 'apollo-link';
import { makeExecutableSchema } from '../schemaGenerator';
Expand Down Expand Up @@ -559,25 +561,69 @@ export const subscriptionSchema: GraphQLSchema = makeExecutableSchema({
resolvers: subscriptionResolvers,
});

const hasSubscriptionOperation = ({ query }: { query: any }): boolean => {
for (let definition of query.definitions) {
if (definition.kind === 'OperationDefinition') {
const operation = definition.operation;
if (operation === 'subscription') {
return true;
}
}
}
return false;
};

// Pretend this schema is remote
async function makeSchemaRemoteFromLink(schema: GraphQLSchema) {
export async function makeSchemaRemoteFromLink(schema: GraphQLSchema) {
const link = new ApolloLink(operation => {
return new Observable(observer => {
const { query, operationName, variables } = operation;
const { graphqlContext } = operation.getContext();
graphql(
schema,
print(query),
null,
graphqlContext,
variables,
operationName,
)
.then(result => {
observer.next(result);
observer.complete();
})
.catch(observer.error.bind(observer));
(async () => {
const { query, operationName, variables } = operation;
const { graphqlContext } = operation.getContext();
try {
if (!hasSubscriptionOperation(operation)) {
const result = await graphql(
schema,
print(query),
null,
graphqlContext,
variables,
operationName,
);
observer.next(result);
observer.complete();
} else {
const result = await subscribe(
schema,
query,
null,
graphqlContext,
variables,
operationName,
);
if (
typeof (<AsyncIterator<ExecutionResult>>result).next ===
'function'
) {
while (true) {
const next = await (<AsyncIterator<
ExecutionResult
>>result).next();
observer.next(next.value);
if (next.done) {
observer.complete();
break;
}
}
} else {
observer.next(result as ExecutionResult);
observer.complete();
}
}
} catch (error) {
observer.error.bind(observer);
}
})();
});
});

Expand Down
1 change: 1 addition & 0 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import './testSchemaGenerator';
import './testLogger';
import './testMocking';
import './testResolution';
import './testMakeRemoteExecutableSchema';
import './testMergeSchemas';

0 comments on commit 342bece

Please sign in to comment.