Skip to content

Commit

Permalink
EQMS-6844 Do not keep platform client open (#5539)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Onnikov <[email protected]>
  • Loading branch information
aonnikov authored May 8, 2024
1 parent bf80ba0 commit ceb1c95
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 73 deletions.
6 changes: 3 additions & 3 deletions server/collaborator/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { type DocumentId, type PlatformDocumentId } from '@hcengineering/collabo
import { WorkspaceId, generateId } from '@hcengineering/core'
import { decodeToken } from '@hcengineering/server-token'
import { onAuthenticatePayload } from '@hocuspocus/server'
import { ClientFactory, Controller, getClientFactory } from './platform'
import { ClientFactory, simpleClientFactory } from './platform'

export interface Context {
connectionId: string
Expand All @@ -36,7 +36,7 @@ export type withContext<T extends WithContext> = Omit<T, 'context'> & {
context: Context
}

export function buildContext (data: onAuthenticatePayload, controller: Controller): Context {
export function buildContext (data: onAuthenticatePayload): Context {
const context = data.context as Partial<Context>

const connectionId = context.connectionId ?? generateId()
Expand All @@ -48,7 +48,7 @@ export function buildContext (data: onAuthenticatePayload, controller: Controlle
return {
connectionId,
workspaceId: decodedToken.workspace,
clientFactory: getClientFactory(decodedToken, controller),
clientFactory: simpleClientFactory(decodedToken),
initialContentId,
platformDocumentId
}
Expand Down
23 changes: 11 additions & 12 deletions server/collaborator/src/extensions/authentication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import { Extension, onAuthenticatePayload } from '@hocuspocus/server'

import { getWorkspaceInfo } from '../account'
import { Context, buildContext } from '../context'
import { Controller } from '../platform'

export interface AuthenticationConfiguration {
ctx: MeasureContext
controller: Controller
}

export class AuthenticationExtension implements Extension {
Expand All @@ -35,20 +33,21 @@ export class AuthenticationExtension implements Extension {
}

async onAuthenticate (data: onAuthenticatePayload): Promise<Context> {
this.configuration.ctx.measure('authenticate', 1)

const ctx = this.configuration.ctx
const { workspaceUrl, collaborativeDoc } = parseDocumentId(data.documentName as DocumentId)

// verify workspace can be accessed with the token
const workspaceInfo = await getWorkspaceInfo(data.token)
return await ctx.with('authenticate', { workspace: workspaceUrl }, async () => {
// verify workspace can be accessed with the token
const workspaceInfo = await getWorkspaceInfo(data.token)

// verify workspace url in the document matches the token
if (workspaceInfo.workspace !== workspaceUrl) {
throw new Error('documentName must include workspace')
}
// verify workspace url in the document matches the token
if (workspaceInfo.workspace !== workspaceUrl) {
throw new Error('documentName must include workspace')
}

data.connection.readOnly = isReadonlyDoc(collaborativeDoc)
data.connection.readOnly = isReadonlyDoc(collaborativeDoc)

return buildContext(data, this.configuration.controller)
return buildContext(data)
})
}
}
34 changes: 17 additions & 17 deletions server/collaborator/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import client from '@hcengineering/client'
import clientResources from '@hcengineering/client-resources'
import core, { Client, Tx, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core'
import core, { Client, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import { Token, generateToken } from '@hcengineering/server-token'
import config from './config'
Expand Down Expand Up @@ -52,13 +52,25 @@ export interface ClientFactoryParams {
/**
* @public
*/
export type ClientFactory = (params: ClientFactoryParams) => Promise<TxOperations>
export type ClientFactory = (params?: ClientFactoryParams) => Promise<TxOperations>

/**
* @public
*/
export function getClientFactory (token: Token, controller: Controller): ClientFactory {
return async ({ derived }: ClientFactoryParams) => {
export function simpleClientFactory (token: Token): ClientFactory {
return async (params?: ClientFactoryParams) => {
const derived = params?.derived ?? false
const client = await connect(generateToken(token.email, token.workspace))
return await getTxOperations(client, token, derived)
}
}

/**
* @public
*/
export function reusableClientFactory (token: Token, controller: Controller): ClientFactory {
return async (params?: ClientFactoryParams) => {
const derived = params?.derived ?? false
const workspaceClient = await controller.get(token.workspace)
return await getTxOperations(workspaceClient.client, token, derived)
}
Expand Down Expand Up @@ -94,16 +106,10 @@ export class Controller {
* @public
*/
export class WorkspaceClient {
private readonly txHandlers: ((...tx: Tx[]) => Promise<void>)[] = []

private constructor (
readonly workspace: WorkspaceId,
readonly client: Client
) {
this.client.notify = (...tx: Tx[]) => {
void this.txHandler(...tx)
}
}
) {}

static async create (workspace: WorkspaceId): Promise<WorkspaceClient> {
const token = generateToken(systemAccountEmail, workspace)
Expand All @@ -114,10 +120,4 @@ export class WorkspaceClient {
async close (): Promise<void> {
await this.client.close()
}

private async txHandler (...tx: Tx[]): Promise<void> {
for (const h of this.txHandlers) {
await h(...tx)
}
}
}
23 changes: 9 additions & 14 deletions server/collaborator/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { MeasureContext, generateId, metricsAggregate } from '@hcengineering/cor
import { MinioService } from '@hcengineering/minio'
import { Token, decodeToken } from '@hcengineering/server-token'
import { ServerKit } from '@hcengineering/text'
import { Hocuspocus, onDestroyPayload } from '@hocuspocus/server'
import { Hocuspocus } from '@hocuspocus/server'
import bp from 'body-parser'
import compression from 'compression'
import cors from 'cors'
Expand All @@ -31,7 +31,7 @@ import { Config } from './config'
import { Context } from './context'
import { AuthenticationExtension } from './extensions/authentication'
import { StorageExtension } from './extensions/storage'
import { Controller, getClientFactory } from './platform'
import { simpleClientFactory } from './platform'
import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc'
import { PlatformStorageAdapter } from './storage/platform'
import { MarkupTransformer } from './transformers/markup'
Expand Down Expand Up @@ -83,8 +83,6 @@ export async function start (

const extensionsCtx = ctx.newChild('extensions', {})

const controller = new Controller()

const transformer = new MarkupTransformer(extensions)

const hocuspocus = new Hocuspocus({
Expand Down Expand Up @@ -124,18 +122,13 @@ export async function start (

extensions: [
new AuthenticationExtension({
ctx: extensionsCtx.newChild('authenticate', {}),
controller
ctx: extensionsCtx.newChild('authenticate', {})
}),
new StorageExtension({
ctx: extensionsCtx.newChild('storage', {}),
adapter: new PlatformStorageAdapter({ minio }, mongo, transformer)
})
],

async onDestroy (data: onDestroyPayload): Promise<void> {
await controller.close()
}
]
})

const rpcCtx = ctx.newChild('rpc', {})
Expand All @@ -144,7 +137,7 @@ export async function start (
return {
connectionId: generateId(),
workspaceId: token.workspace,
clientFactory: getClientFactory(token, controller)
clientFactory: simpleClientFactory(token)
}
}

Expand Down Expand Up @@ -192,9 +185,11 @@ export async function start (
}
res.status(400).send(response)
} else {
await rpcCtx.withLog('/rpc', { method: request.method }, async (ctx) => {
await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => {
try {
const response: RpcResponse = await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
const response: RpcResponse = await rpcCtx.with(request.method, {}, async (ctx) => {
return await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
})
res.status(200).send(response)
} catch (err: any) {
res.status(500).send({ error: err.message })
Expand Down
60 changes: 33 additions & 27 deletions server/collaborator/src/storage/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import core, {
CollaborativeDoc,
Doc,
MeasureContext,
TxOperations,
collaborativeDocWithLastVersion,
toWorkspaceString
} from '@hcengineering/core'
Expand Down Expand Up @@ -87,7 +88,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
const { platformDocumentId } = context
if (platformDocumentId !== undefined) {
ctx.info('load document platform content', { documentId, platformDocumentId })
const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => {
const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => {
try {
return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
} catch (err) {
Expand All @@ -112,27 +113,37 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
}

async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
let snapshot: YDocVersion | undefined
try {
ctx.info('take document snapshot', { documentId })
snapshot = await this.takeSnapshot(ctx, documentId, document, context)
} catch (err) {
ctx.error('failed to take document snapshot', { documentId, error: err })
}
const { clientFactory } = context

const client = await ctx.with('connect', {}, async () => {
return await clientFactory()
})

try {
ctx.info('save document content', { documentId })
await this.saveDocumentToStorage(ctx, documentId, document, context)
} catch (err) {
ctx.error('failed to save document', { documentId, error: err })
}
let snapshot: YDocVersion | undefined
try {
ctx.info('take document snapshot', { documentId })
snapshot = await this.takeSnapshot(ctx, client, documentId, document, context)
} catch (err) {
ctx.error('failed to take document snapshot', { documentId, error: err })
}

const { platformDocumentId } = context
if (platformDocumentId !== undefined) {
ctx.info('save document content to platform', { documentId, platformDocumentId })
await ctx.with('save-document', { storage: 'platform' }, async (ctx) => {
await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context)
})
try {
ctx.info('save document content', { documentId })
await this.saveDocumentToStorage(ctx, documentId, document, context)
} catch (err) {
ctx.error('failed to save document', { documentId, error: err })
}

const { platformDocumentId } = context
if (platformDocumentId !== undefined) {
ctx.info('save document content to platform', { documentId, platformDocumentId })
await ctx.with('save-to-platform', {}, async (ctx) => {
await this.saveDocumentToPlatform(ctx, client, documentId, platformDocumentId, document, snapshot, context)
})
}
} finally {
await client.close()
}
}

Expand Down Expand Up @@ -180,16 +191,16 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {

async takeSnapshot (
ctx: MeasureContext,
client: Omit<TxOperations, 'close'>,
documentId: DocumentId,
document: YDoc,
context: Context
): Promise<YDocVersion | undefined> {
const { storage, collaborativeDoc } = parseDocumentId(documentId)
const adapter = this.getStorageAdapter(storage)

const { clientFactory, workspaceId } = context
const { workspaceId } = context

const client = await clientFactory({ derived: false })
const timestamp = Date.now()

const yDocVersion: YDocVersion = {
Expand Down Expand Up @@ -233,6 +244,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {

async saveDocumentToPlatform (
ctx: MeasureContext,
client: Omit<TxOperations, 'close'>,
documentName: string,
platformDocumentId: PlatformDocumentId,
document: YDoc,
Expand All @@ -241,12 +253,6 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
): Promise<void> {
const { objectClass, objectId, objectAttr } = parsePlatformDocumentId(platformDocumentId)

const { clientFactory } = context

const client = await ctx.with('connect', {}, async () => {
return await clientFactory({ derived: false })
})

const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr)
if (attribute === undefined) {
ctx.info('attribute not found', { documentName, objectClass, objectAttr })
Expand Down

0 comments on commit ceb1c95

Please sign in to comment.