Skip to content

Commit

Permalink
fix(server): dataloader batching being broken (#3838)
Browse files Browse the repository at this point in the history
* fix(server): dataloader batching being broken

* feat(server): optimized stream collaborator retrieval

* test fix
  • Loading branch information
fabis94 authored Jan 17, 2025
1 parent 7b295ba commit 2e8863e
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 65 deletions.
2 changes: 1 addition & 1 deletion packages/server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export function buildApolloSubscriptionServer(
graphql_variables: redactSensitiveVariables(baseParams.variables),
graphql_operation_type: 'subscription'
},
'Subscription started for {graphqlOperationName}'
'Subscription started for {graphql_operation_name}'
)

baseParams.formatResponse = (val: SubscriptionResponse) => {
Expand Down
14 changes: 4 additions & 10 deletions packages/server/modules/core/domain/streams/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,11 @@ export type GetStreamCollaborators = (
}>
) => Promise<Array<LimitedUserWithStreamRole>>

export type GetUserDeletableStreams = (userId: string) => Promise<Array<string>>
export type GetStreamsCollaborators = (params: { streamIds: string[] }) => Promise<{
[streamId: string]: Array<LimitedUserWithStreamRole>
}>

export type LegacyGetStreamCollaborators = (params: { streamId: string }) => Promise<
{
role: string
id: string
name: string
company: string
avatar: string
}[]
>
export type GetUserDeletableStreams = (userId: string) => Promise<Array<string>>

export type StoreStream = (
input: StreamCreateInput | ProjectCreateArgs,
Expand Down
20 changes: 18 additions & 2 deletions packages/server/modules/core/graph/dataloaders/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
getOwnedFavoritesCountByUserIdsFactory,
getStreamRolesFactory,
getUserStreamCountsFactory,
getStreamsSourceAppsFactory
getStreamsSourceAppsFactory,
getStreamsCollaboratorsFactory
} from '@/modules/core/repositories/streams'
import { keyBy } from 'lodash'
import {
Expand Down Expand Up @@ -83,7 +84,10 @@ import {
} from '@/modules/automate/errors/executionEngine'
import { queryInvitesFactory } from '@/modules/serverinvites/repositories/serverInvites'
import { getAppScopesFactory } from '@/modules/auth/repositories'
import { StreamWithCommitId } from '@/modules/core/domain/streams/types'
import {
LimitedUserWithStreamRole,
StreamWithCommitId
} from '@/modules/core/domain/streams/types'
import {
getUsersFactory,
UserWithOptionalRole
Expand Down Expand Up @@ -139,6 +143,7 @@ const dataLoadersDefinition = defineRequestDataloaders(
const getUserStreamCounts = getUserStreamCountsFactory({ db })
const getStreamsSourceApps = getStreamsSourceAppsFactory({ db })
const getUsers = getUsersFactory({ db })
const getStreamsCollaborators = getStreamsCollaboratorsFactory({ db })

return {
streams: {
Expand Down Expand Up @@ -198,6 +203,17 @@ const dataLoadersDefinition = defineRequestDataloaders(
}
}
})(),
/**
* Get all collaborators for a stream
*/
getCollaborators: createLoader<string, Array<LimitedUserWithStreamRole>>(
async (streamIds) => {
const results = await getStreamsCollaborators({
streamIds: streamIds.slice()
})
return streamIds.map((i) => results[i] || [])
}
),

/**
* Get favorite metadata for a specific stream and user
Expand Down
5 changes: 2 additions & 3 deletions packages/server/modules/core/graph/resolvers/projects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ const getUsers = getUsersFactory({ db })
const getUser = getUserFactory({ db })
const saveActivity = saveActivityFactory({ db })
const getStream = getStreamFactory({ db })
const getStreamCollaborators = getStreamCollaboratorsFactory({ db })
const createStreamReturnRecord = createStreamReturnRecordFactory({
inviteUsersToProject: inviteUsersToProjectFactory({
createAndSendInvite: createAndSendInviteFactory({
Expand Down Expand Up @@ -394,8 +393,8 @@ export = {

return await ctx.loaders.streams.getRole.load(parent.id)
},
async team(parent) {
const users = await getStreamCollaborators(parent.id)
async team(parent, _args, ctx) {
const users = await ctx.loaders.streams.getCollaborators.load(parent.id)
return users.map((u) => ({
user: u,
role: u.streamRole,
Expand Down
13 changes: 8 additions & 5 deletions packages/server/modules/core/graph/resolvers/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
getStreamCollaboratorsFactory,
canUserFavoriteStreamFactory,
setStreamFavoritedFactory,
legacyGetStreamUsersFactory,
getUserStreamsPageFactory,
getUserStreamsCountFactory
} from '@/modules/core/repositories/streams'
Expand Down Expand Up @@ -177,7 +176,6 @@ const favoriteStream = favoriteStreamFactory({
setStreamFavorited: setStreamFavoritedFactory({ db }),
getStream
})
const getStreamUsers = legacyGetStreamUsersFactory({ db })
const getUserStreams = getUserStreamsPageFactory({ db })
const getUserStreamsCount = getUserStreamsCountFactory({ db })

Expand Down Expand Up @@ -267,9 +265,14 @@ export = {
},

Stream: {
async collaborators(parent) {
const users = await getStreamUsers({ streamId: parent.id })
return users
async collaborators(parent, _args, ctx) {
const collaborators = await ctx.loaders.streams.getCollaborators.load(parent.id)

// In this GQL return type, role actually refers to the stream role
return collaborators.map((collaborator) => ({
...collaborator,
role: collaborator.streamRole
}))
},

async pendingCollaborators(parent) {
Expand Down
9 changes: 7 additions & 2 deletions packages/server/modules/core/loaders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { graphDataloadersBuilders } from '@/modules'
import { ModularizedDataLoadersConstraint } from '@/modules/shared/helpers/graphqlHelper'
import { Knex } from 'knex'
import { isNonNullable, Optional } from '@speckle/shared'
import { flatten, noop } from 'lodash'
import { flatten, noop, isFunction } from 'lodash'
import { db } from '@/db/knex'

/**
Expand All @@ -26,7 +26,12 @@ const makeLazyDataLoader = <K, V, C = K>(
dataloader = new DataLoader<K, V, C>(...args)
}

return dataloader[prop as keyof DataLoader<K, V, C>]
const ret = dataloader[prop as keyof DataLoader<K, V, C>]
if (isFunction(ret)) {
return ret.bind(dataloader)
} else {
return ret
}
}
})
}
Expand Down
62 changes: 34 additions & 28 deletions packages/server/modules/core/repositories/streams.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _, {
clamp,
groupBy,
has,
isNaN,
isNull,
Expand Down Expand Up @@ -81,7 +82,6 @@ import {
GetFavoritedStreamsCount,
SetStreamFavorited,
CanUserFavoriteStream,
LegacyGetStreamCollaborators,
GetBatchUserFavoriteData,
GetBatchStreamFavoritesCounts,
GetOwnedFavoritesCountByUserIds,
Expand All @@ -96,7 +96,8 @@ import {
MarkBranchStreamUpdated,
MarkCommitStreamUpdated,
MarkOnboardingBaseStream,
GetUserDeletableStreams
GetUserDeletableStreams,
GetStreamsCollaborators
} from '@/modules/core/domain/streams/operations'
import { generateProjectName } from '@/modules/core/domain/projects/logic'
export type { StreamWithOptionalRole, StreamWithCommitId }
Expand Down Expand Up @@ -597,6 +598,37 @@ export const getDiscoverableStreamsPageFactory =
return await q
}

/**
* Get stream collaborators for multiple streams at a time
*/
export const getStreamsCollaboratorsFactory =
(deps: { db: Knex }): GetStreamsCollaborators =>
async ({ streamIds }) => {
if (!streamIds.length) return {}

const q = tables
.streamAcl(deps.db)
.select<Array<UserWithRole & { streamRole: StreamRoles; streamId: string }>>([
...Users.cols,
knex.raw(`(array_agg(??))[1] as "streamRole"`, [StreamAcl.col.role]),
knex.raw(`(array_agg(??))[1] as "streamId"`, [StreamAcl.col.resourceId]),
knex.raw(`(array_agg(??))[1] as "role"`, [ServerAcl.col.role])
])
.whereIn(StreamAcl.col.resourceId, streamIds)
.innerJoin(Users.name, Users.col.id, StreamAcl.col.userId)
.innerJoin(ServerAcl.name, ServerAcl.col.userId, Users.col.id)
.groupBy(StreamAcl.col.resourceId, Users.col.id)

const res = (await q).map((i) => ({
...removePrivateFields(i),
streamRole: i.streamRole,
role: i.role,
streamId: i.streamId
}))

return groupBy(res, 'streamId')
}

/**
* Get all stream collaborators. Optionally filter only specific roles.
*/
Expand Down Expand Up @@ -633,32 +665,6 @@ export const getStreamCollaboratorsFactory =
return items
}

/**
* @deprecated Use getStreamCollaborators instead
*/
export const legacyGetStreamUsersFactory =
(deps: { db: Knex }): LegacyGetStreamCollaborators =>
async ({ streamId }) => {
const query = tables
.streamAcl(deps.db)
.columns({ role: 'stream_acl.role' }, 'id', 'name', 'company', 'avatar')
.select()
.where({ resourceId: streamId })
.rightJoin('users', { 'users.id': 'stream_acl.userId' })
.select<
{
role: string
id: string
name: string
company: string
avatar: string
}[]
>('stream_acl.role', 'name', 'id', 'company', 'avatar')
.orderBy('stream_acl.role')

return await query
}

export const getProjectCollaboratorsFactory =
(deps: { db: Knex }): GetProjectCollaborators =>
async ({ projectId }) => {
Expand Down
11 changes: 9 additions & 2 deletions packages/server/modules/core/tests/graph.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,15 @@ describe('GraphQL API Core @core-api', () => {

expect(stream.name).to.equal('TS1 (u A) Private UPDATED')
expect(stream.collaborators).to.have.lengthOf(2)
expect(stream.collaborators[0].role).to.equal(Roles.Stream.Contributor)
expect(stream.collaborators[1].role).to.equal(Roles.Stream.Owner)

const d2User = stream.collaborators.find((c) => c.name === 'd2')
const testUserUpdated = stream.collaborators.find(
(c) => c.name === 'test user updated'
)
expect(d2User).to.be.ok
expect(testUserUpdated).to.be.ok
expect(d2User.role).to.equal(Roles.Stream.Contributor)
expect(testUserUpdated.role).to.equal(Roles.Stream.Owner)
})

it('Should retrieve a public stream even if not authenticated', async () => {
Expand Down
8 changes: 5 additions & 3 deletions packages/server/modules/core/tests/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import {
createStreamFactory,
deleteStreamFactory,
getStreamFactory,
getStreamsCollaboratorsFactory,
grantStreamPermissionsFactory,
legacyGetStreamUsersFactory,
markBranchStreamUpdatedFactory,
markCommitStreamUpdatedFactory,
revokeStreamPermissionsFactory,
Expand Down Expand Up @@ -198,7 +198,7 @@ const isStreamCollaborator = isStreamCollaboratorFactory({
getStream
})
const grantPermissionsStream = grantStreamPermissionsFactory({ db })
const getStreamUsers = legacyGetStreamUsersFactory({ db })
const getStreamsUsers = getStreamsCollaboratorsFactory({ db })
const createObject = createObjectFactory({
storeSingleObjectIfNotFoundFactory: storeSingleObjectIfNotFoundFactory({ db }),
storeClosuresIfNotFound: storeClosuresIfNotFoundFactory({ db })
Expand Down Expand Up @@ -338,7 +338,9 @@ describe('Streams @core-streams', () => {
})

it('Should get the users with access to a stream', async () => {
const users = await getStreamUsers({ streamId: testStream.id })
const ret = await getStreamsUsers({ streamIds: [testStream.id] })
const users = ret[testStream.id]

expect(users).to.have.lengthOf(2)
expect(users[0]).to.not.have.property('email')
expect(users[0]).to.have.property('id')
Expand Down
2 changes: 1 addition & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"cors": "^2.8.5",
"cross-fetch": "^3.1.5",
"crypto-random-string": "^3.2.0",
"dataloader": "^2.0.0",
"dataloader": "^2.2.3",
"dayjs": "^1.11.5",
"dotenv": "^8.2.0",
"ejs": "^3.1.8",
Expand Down
16 changes: 8 additions & 8 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -17301,7 +17301,7 @@ __metadata:
cross-env: "npm:^7.0.3"
cross-fetch: "npm:^3.1.5"
crypto-random-string: "npm:^3.2.0"
dataloader: "npm:^2.0.0"
dataloader: "npm:^2.2.3"
dayjs: "npm:^1.11.5"
deep-equal-in-any-order: "npm:^1.1.15"
dotenv: "npm:^8.2.0"
Expand Down Expand Up @@ -28668,20 +28668,20 @@ __metadata:
languageName: node
linkType: hard

"dataloader@npm:^2.0.0":
version: 2.1.0
resolution: "dataloader@npm:2.1.0"
checksum: 10/671b5806d4f130629dce9bdd902786a3098a47d0ee83b16ed877cc3e77efa68f618e914696b6218c8ae11db0656f81c1a3fa33aa62e56044b0a7b3f13119e19d
languageName: node
linkType: hard

"dataloader@npm:^2.2.2":
version: 2.2.2
resolution: "dataloader@npm:2.2.2"
checksum: 10/9c7a1f02cfa6391ab8bc21ebd0ef60b03832bd3beafdfecf48b111fba14090f98d33965f8e268045ba3c289f801b6a9000a9e61a41188363bdee2344811f64f1
languageName: node
linkType: hard

"dataloader@npm:^2.2.3":
version: 2.2.3
resolution: "dataloader@npm:2.2.3"
checksum: 10/83fe6259abe00ae64c5f48252ef59d8e5fcabda9fd4d26685f14a76eeca596bf6f9500d9f22a0094c50c3ea782a0977728f9367e232dfa0fdb5c9d646de279b2
languageName: node
linkType: hard

"date-fns@npm:^1.29.0":
version: 1.30.1
resolution: "date-fns@npm:1.30.1"
Expand Down

0 comments on commit 2e8863e

Please sign in to comment.