Skip to content

Commit

Permalink
feat: per cube dataSource support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Nov 17, 2019
1 parent 27cfc96 commit 6dc3fef
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 59 deletions.
31 changes: 5 additions & 26 deletions packages/cubejs-api-gateway/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,6 @@ const prepareAnnotation = (metaConfig, query) => {
};
};

const prepareAliasToMemberNameMap = (metaConfig, sqlQuery, query) => {
const configMap = toConfigMap(metaConfig);

const lookupAlias = (memberType) => (member) => {
const path = member.split('.');
const config = configMap[path[0]][memberType].find(m => m.name === member);
if (!config) {
return undefined;
}
return [config.aliasName, member];
};

return R.fromPairs(
(query.measures || []).map(lookupAlias('measures'))
.concat((query.dimensions || []).map(lookupAlias('dimensions')))
.concat((query.segments || []).map(lookupAlias('segments')))
.concat((query.timeDimensions || []).map(td => lookupAlias('dimensions')(td.dimension)))
.concat(sqlQuery.timeDimensionAlias ? [[sqlQuery.timeDimensionAlias, sqlQuery.timeDimensionField]] : [])
.filter(a => !!a)
);
};

const transformValue = (value, type) => {
if (value && type === 'time') {
return (value instanceof Date ? moment(value) : moment.utc(value)).format(moment.HTML5_FMT.DATETIME_LOCAL_MS);
Expand Down Expand Up @@ -320,17 +298,18 @@ class ApiGateway {
this.getCompilerApi(context).metaConfig()
]);
const sqlQuery = compilerSqlResult;
const metaConfig = metaConfigResult;
const annotation = prepareAnnotation(metaConfig, normalizedQuery);
const aliasToMemberNameMap = prepareAliasToMemberNameMap(metaConfig, sqlQuery, normalizedQuery);
const annotation = prepareAnnotation(metaConfigResult, normalizedQuery);
const aliasToMemberNameMap = sqlQuery.aliasNameToMember;
const toExecute = {
...sqlQuery,
query: sqlQuery.sql[0],
values: sqlQuery.sql[1],
continueWait: true,
renewQuery: normalizedQuery.renewQuery
};
const response = await this.getAdapterApi(context).executeQuery(toExecute);
const response = await this.getAdapterApi({
...context, dataSource: sqlQuery.dataSource
}).executeQuery(toExecute);
this.log(context, {
type: 'Load Request Success',
query,
Expand Down
8 changes: 6 additions & 2 deletions packages/cubejs-schema-compiler/adapter/BaseMeasure.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ class BaseMeasure {
}

aliasName() {
return this.query.escapeColumnName(this.unescapedAliasName());
}

unescapedAliasName() {
if (this.expression) {
return this.query.escapeColumnName(this.query.aliasName(this.expressionName));
return this.query.aliasName(this.expressionName);
}
return this.query.escapeColumnName(this.query.aliasName(this.measure));
return this.query.aliasName(this.measure);
}

isCumulative() {
Expand Down
27 changes: 26 additions & 1 deletion packages/cubejs-schema-compiler/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BaseQuery {
return dimension;
}).filter(R.identity).map(this.newTimeDimension.bind(this));
this.allFilters = this.timeDimensions.concat(this.segments).concat(this.filters);
this.join = this.joinGraph.buildJoin(this.collectCubeNames());
this.join = this.joinGraph.buildJoin(this.allCubeNames);
this.cubeAliasPrefix = this.options.cubeAliasPrefix;
this.preAggregationsSchemaOption =
this.options.preAggregationsSchema != null ? this.options.preAggregationsSchema : DEFAULT_PREAGGREGATIONS_SCHEMA;
Expand All @@ -81,6 +81,31 @@ class BaseQuery {
this.initUngrouped();
}

get allCubeNames() {
if (!this.collectedCubeNames) {
this.collectedCubeNames = this.collectCubeNames();
}
return this.collectedCubeNames;
}

get dataSource() {
const dataSources = R.uniq(this.allCubeNames.map(c => this.cubeEvaluator.cubeFromPath(c).dataSource || 'default'));
if (dataSources.length > 1) {
throw new UserError(`Joins across data sources aren't supported in community edition. Found data sources: ${dataSources.join(', ')}`);
}
return dataSources[0];
}

get aliasNameToMember() {
return R.fromPairs(
this.measures.map(m => [m.unescapedAliasName(), m.measure]).concat(
this.dimensions.map(m => [m.unescapedAliasName(), m.dimension])
).concat(
this.timeDimensions.map(m => [m.unescapedAliasName(), m.dimension])
)
);
}

initUngrouped() {
this.ungrouped = this.options.ungrouped;
if (this.ungrouped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ const BaseMeasure = require('../adapter/BaseMeasure');
const UserError = require('./UserError');

class CubeToMetaTransformer {
constructor(cubeValidator, cubeEvaluator, contextEvaluator, joinGraph, adapter) {
constructor(cubeValidator, cubeEvaluator, contextEvaluator, joinGraph) {
this.cubeValidator = cubeValidator;
this.cubeSymbols = cubeEvaluator;
this.cubeEvaluator = cubeEvaluator;
this.contextEvaluator = contextEvaluator;
this.joinGraph = joinGraph;
this.adapter = adapter;
}

compile(cubes, errorReporter) {
Expand Down Expand Up @@ -39,7 +38,6 @@ class CubeToMetaTransformer {
title: this.title(cubeTitle, nameToDimension),
type: nameToDimension[1].type,
description: nameToDimension[1].description,
aliasName: this.adapter.aliasName(`${cube.name}.${nameToDimension[0]}`),
shortTitle: this.title(cubeTitle, nameToDimension, true),
suggestFilterValues:
nameToDimension[1].suggestFilterValues == null ? true : nameToDimension[1].suggestFilterValues,
Expand Down Expand Up @@ -102,7 +100,6 @@ class CubeToMetaTransformer {
description: nameToMetric[1].description,
shortTitle: this.title(cubeTitle, nameToMetric, true),
format: nameToMetric[1].format,
aliasName: this.adapter.aliasName(name),
cumulativeTotal: nameToMetric[1].cumulative || BaseMeasure.isCumulative(nameToMetric[1]),
cumulative: nameToMetric[1].cumulative || BaseMeasure.isCumulative(nameToMetric[1]),
type: 'number', // TODO
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-schema-compiler/compiler/CubeValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const cubeSchema = Joi.object().keys({
allDefinitions: Joi.func(),
title: Joi.string(),
sqlAlias: Joi.string(),
dataSource: Joi.string(),
description: Joi.string(),
joins: Joi.object().pattern(identifierRegex, Joi.object().keys({
sql: Joi.func().required(),
Expand Down
8 changes: 1 addition & 7 deletions packages/cubejs-schema-compiler/compiler/PrepareCompiler.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const CubeEvaluator = require('./CubeEvaluator');
const ContextEvaluator = require('./ContextEvaluator');
const DashboardTemplateEvaluator = require('./DashboardTemplateEvaluator');
const JoinGraph = require('./JoinGraph');
const QueryBuilder = require('../adapter/QueryBuilder');
const Funnels = require('../extensions/Funnels');
const CubeToMetaTransformer = require('./CubeToMetaTransformer');

Expand All @@ -27,12 +26,7 @@ exports.prepareCompiler = (repo, options) => {
const contextEvaluator = new ContextEvaluator(cubeEvaluator);
const joinGraph = new JoinGraph(cubeValidator, cubeEvaluator);
const dashboardTemplateEvaluator = new DashboardTemplateEvaluator(cubeEvaluator);
const query = QueryBuilder.query(
{ cubeEvaluator, joinGraph },
options.adapter,
{}
);
const metaTransformer = new CubeToMetaTransformer(cubeValidator, cubeEvaluator, contextEvaluator, joinGraph, query);
const metaTransformer = new CubeToMetaTransformer(cubeValidator, cubeEvaluator, contextEvaluator, joinGraph);
const compiler = new DataSchemaCompiler(repo, Object.assign({}, {
cubeNameCompilers: [cubeDictionary],
preTranspileCubeCompilers: [cubeSymbols, cubeValidator],
Expand Down
15 changes: 15 additions & 0 deletions packages/cubejs-schema-compiler/test/SQLGenerationTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ describe('SQL Generation', function test() {
\`,
sqlAlias: 'cube_with_long_name',
dataSource: 'oracle',
measures: {
count: {
Expand Down Expand Up @@ -1426,4 +1428,17 @@ describe('SQL Generation', function test() {
{ "cube_with_long_name__count": '3' }
])
);

it('data source', () => compiler.compile().then(() => {
const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, {
measures: ['CubeWithVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongName.count'],
dimensions: [],
timeDimensions: [],
timezone: 'America/Los_Angeles',
filters: [],
order: []
});

query.dataSource.should.be.deepEqual('oracle');
}));
});
46 changes: 34 additions & 12 deletions packages/cubejs-server-core/core/CompilerApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,57 @@ class CompilerApi {
this.logger(this.compilers ? 'Recompiling schema' : 'Compiling schema', { version: compilerVersion });
// TODO check if saving this promise can produce memory leak?
this.compilers = PrepareCompiler.compile(this.repository, {
adapter: this.dbType,
allowNodeRequire: this.allowNodeRequire
});
this.compilerVersion = compilerVersion;
}
return this.compilers;
}

getDbType(dataSource) {
if (typeof this.dbType === 'function') {
return this.dbType({ dataSource: dataSource || 'default' });
}
return this.dbType;
}

async getSql(query) {
const sqlGenerator = QueryBuilder.query(
await this.getCompilers(),
this.dbType, {
...query,
externalDbType: this.options.externalDbType,
preAggregationsSchema: this.preAggregationsSchema,
allowUngroupedWithoutPrimaryKey: this.allowUngroupedWithoutPrimaryKey
}
);
return (await this.getCompilers()).compiler.withQuery(sqlGenerator, () => ({
const dbType = this.getDbType('default');
const compilers = await this.getCompilers();
let sqlGenerator = this.createQuery(compilers, dbType, query);

const dataSource = compilers.compiler.withQuery(sqlGenerator, () => sqlGenerator.dataSource);

if (dataSource !== 'default' && dbType !== this.getDbType(dataSource)) {
// TODO consider more efficient way than instantiating query
sqlGenerator = this.createQuery(compilers, this.getDbType(dataSource), query);
}

return compilers.compiler.withQuery(sqlGenerator, () => ({
external: sqlGenerator.externalPreAggregationQuery(),
sql: sqlGenerator.buildSqlAndParams(),
timeDimensionAlias: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].unescapedAliasName(),
timeDimensionField: sqlGenerator.timeDimensions[0] && sqlGenerator.timeDimensions[0].dimension,
order: sqlGenerator.order,
cacheKeyQueries: sqlGenerator.cacheKeyQueries(),
preAggregations: sqlGenerator.preAggregations.preAggregationsDescription()
preAggregations: sqlGenerator.preAggregations.preAggregationsDescription(),
dataSource: sqlGenerator.dataSource,
aliasNameToMember: sqlGenerator.aliasNameToMember
}));
}

createQuery(compilers, dbType, query) {
return QueryBuilder.query(
compilers,
dbType, {
...query,
externalDbType: this.options.externalDbType,
preAggregationsSchema: this.preAggregationsSchema,
allowUngroupedWithoutPrimaryKey: this.allowUngroupedWithoutPrimaryKey
}
);
}

async metaConfig() {
return (await this.getCompilers()).metaTransformer.cubes;
}
Expand Down
18 changes: 11 additions & 7 deletions packages/cubejs-server-core/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class CubejsServerCore {
this.preAggregationsSchema =
typeof options.preAggregationsSchema === 'function' ? options.preAggregationsSchema : () => options.preAggregationsSchema;
this.appIdToCompilerApi = {};
this.appIdToOrchestratorApi = {};
this.dataSourceIdToOrchestratorApi = {};
this.contextToAppId = options.contextToAppId || (() => process.env.CUBEJS_APP || 'STANDALONE');
this.orchestratorOptions =
typeof options.orchestratorOptions === 'function' ?
Expand Down Expand Up @@ -270,7 +270,7 @@ class CubejsServerCore {
if (!this.appIdToCompilerApi[appId]) {
this.appIdToCompilerApi[appId] = this.createCompilerApi(
this.repositoryFactory(context), {
dbType: this.contextToDbType(context),
dbType: (dataSourceContext) => this.contextToDbType({ ...context, ...dataSourceContext }),
externalDbType: this.contextToExternalDbType(context),
schemaVersion: this.options.schemaVersion && (() => this.options.schemaVersion(context)),
preAggregationsSchema: this.preAggregationsSchema(context)
Expand All @@ -280,12 +280,16 @@ class CubejsServerCore {
return this.appIdToCompilerApi[appId];
}

contextToDataSourceId(context) {
return `${this.contextToAppId(context)}_${context.dataSource}`;
}

getOrchestratorApi(context) {
const appId = this.contextToAppId(context);
if (!this.appIdToOrchestratorApi[appId]) {
const dataSourceId = this.contextToDataSourceId(context);
if (!this.dataSourceIdToOrchestratorApi[dataSourceId]) {
let driverPromise;
let externalPreAggregationsDriverPromise;
this.appIdToOrchestratorApi[appId] = this.createOrchestratorApi({
this.dataSourceIdToOrchestratorApi[dataSourceId] = this.createOrchestratorApi({
getDriver: () => {
if (!driverPromise) {
const driver = this.driverFactory(context);
Expand All @@ -306,11 +310,11 @@ class CubejsServerCore {
}
return externalPreAggregationsDriverPromise;
},
redisPrefix: appId,
redisPrefix: dataSourceId,
orchestratorOptions: this.orchestratorOptions(context)
});
}
return this.appIdToOrchestratorApi[appId];
return this.dataSourceIdToOrchestratorApi[dataSourceId];
}

createCompilerApi(repository, options) {
Expand Down

0 comments on commit 6dc3fef

Please sign in to comment.