From ceb1c95c7cca9fe4387a35b4320637a57dc95934 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 8 May 2024 13:06:38 +0700 Subject: [PATCH] EQMS-6844 Do not keep platform client open (#5539) Signed-off-by: Alexander Onnikov --- server/collaborator/src/context.ts | 6 +- .../src/extensions/authentication.ts | 23 ++++--- server/collaborator/src/platform.ts | 34 +++++------ server/collaborator/src/server.ts | 23 +++---- server/collaborator/src/storage/platform.ts | 60 ++++++++++--------- 5 files changed, 73 insertions(+), 73 deletions(-) diff --git a/server/collaborator/src/context.ts b/server/collaborator/src/context.ts index 36d60f249bd..b0de5441efa 100644 --- a/server/collaborator/src/context.ts +++ b/server/collaborator/src/context.ts @@ -17,7 +17,7 @@ import { type DocumentId, type PlatformDocumentId } from '@hcengineering/collabo import { WorkspaceId, generateId } from '@hcengineering/core' import { decodeToken } from '@hcengineering/server-token' import { onAuthenticatePayload } from '@hocuspocus/server' -import { ClientFactory, Controller, getClientFactory } from './platform' +import { ClientFactory, simpleClientFactory } from './platform' export interface Context { connectionId: string @@ -36,7 +36,7 @@ export type withContext = Omit & { context: Context } -export function buildContext (data: onAuthenticatePayload, controller: Controller): Context { +export function buildContext (data: onAuthenticatePayload): Context { const context = data.context as Partial const connectionId = context.connectionId ?? generateId() @@ -48,7 +48,7 @@ export function buildContext (data: onAuthenticatePayload, controller: Controlle return { connectionId, workspaceId: decodedToken.workspace, - clientFactory: getClientFactory(decodedToken, controller), + clientFactory: simpleClientFactory(decodedToken), initialContentId, platformDocumentId } diff --git a/server/collaborator/src/extensions/authentication.ts b/server/collaborator/src/extensions/authentication.ts index c82b2096665..98b877084c1 100644 --- a/server/collaborator/src/extensions/authentication.ts +++ b/server/collaborator/src/extensions/authentication.ts @@ -20,11 +20,9 @@ import { Extension, onAuthenticatePayload } from '@hocuspocus/server' import { getWorkspaceInfo } from '../account' import { Context, buildContext } from '../context' -import { Controller } from '../platform' export interface AuthenticationConfiguration { ctx: MeasureContext - controller: Controller } export class AuthenticationExtension implements Extension { @@ -35,20 +33,21 @@ export class AuthenticationExtension implements Extension { } async onAuthenticate (data: onAuthenticatePayload): Promise { - this.configuration.ctx.measure('authenticate', 1) - + const ctx = this.configuration.ctx const { workspaceUrl, collaborativeDoc } = parseDocumentId(data.documentName as DocumentId) - // verify workspace can be accessed with the token - const workspaceInfo = await getWorkspaceInfo(data.token) + return await ctx.with('authenticate', { workspace: workspaceUrl }, async () => { + // verify workspace can be accessed with the token + const workspaceInfo = await getWorkspaceInfo(data.token) - // verify workspace url in the document matches the token - if (workspaceInfo.workspace !== workspaceUrl) { - throw new Error('documentName must include workspace') - } + // verify workspace url in the document matches the token + if (workspaceInfo.workspace !== workspaceUrl) { + throw new Error('documentName must include workspace') + } - data.connection.readOnly = isReadonlyDoc(collaborativeDoc) + data.connection.readOnly = isReadonlyDoc(collaborativeDoc) - return buildContext(data, this.configuration.controller) + return buildContext(data) + }) } } diff --git a/server/collaborator/src/platform.ts b/server/collaborator/src/platform.ts index b072581864c..0a9ff484800 100644 --- a/server/collaborator/src/platform.ts +++ b/server/collaborator/src/platform.ts @@ -15,7 +15,7 @@ import client from '@hcengineering/client' import clientResources from '@hcengineering/client-resources' -import core, { Client, Tx, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core' +import core, { Client, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core' import { setMetadata } from '@hcengineering/platform' import { Token, generateToken } from '@hcengineering/server-token' import config from './config' @@ -52,13 +52,25 @@ export interface ClientFactoryParams { /** * @public */ -export type ClientFactory = (params: ClientFactoryParams) => Promise +export type ClientFactory = (params?: ClientFactoryParams) => Promise /** * @public */ -export function getClientFactory (token: Token, controller: Controller): ClientFactory { - return async ({ derived }: ClientFactoryParams) => { +export function simpleClientFactory (token: Token): ClientFactory { + return async (params?: ClientFactoryParams) => { + const derived = params?.derived ?? false + const client = await connect(generateToken(token.email, token.workspace)) + return await getTxOperations(client, token, derived) + } +} + +/** + * @public + */ +export function reusableClientFactory (token: Token, controller: Controller): ClientFactory { + return async (params?: ClientFactoryParams) => { + const derived = params?.derived ?? false const workspaceClient = await controller.get(token.workspace) return await getTxOperations(workspaceClient.client, token, derived) } @@ -94,16 +106,10 @@ export class Controller { * @public */ export class WorkspaceClient { - private readonly txHandlers: ((...tx: Tx[]) => Promise)[] = [] - private constructor ( readonly workspace: WorkspaceId, readonly client: Client - ) { - this.client.notify = (...tx: Tx[]) => { - void this.txHandler(...tx) - } - } + ) {} static async create (workspace: WorkspaceId): Promise { const token = generateToken(systemAccountEmail, workspace) @@ -114,10 +120,4 @@ export class WorkspaceClient { async close (): Promise { await this.client.close() } - - private async txHandler (...tx: Tx[]): Promise { - for (const h of this.txHandlers) { - await h(...tx) - } - } } diff --git a/server/collaborator/src/server.ts b/server/collaborator/src/server.ts index 2b88d19e0ca..51b6fc65a3e 100644 --- a/server/collaborator/src/server.ts +++ b/server/collaborator/src/server.ts @@ -18,7 +18,7 @@ import { MeasureContext, generateId, metricsAggregate } from '@hcengineering/cor import { MinioService } from '@hcengineering/minio' import { Token, decodeToken } from '@hcengineering/server-token' import { ServerKit } from '@hcengineering/text' -import { Hocuspocus, onDestroyPayload } from '@hocuspocus/server' +import { Hocuspocus } from '@hocuspocus/server' import bp from 'body-parser' import compression from 'compression' import cors from 'cors' @@ -31,7 +31,7 @@ import { Config } from './config' import { Context } from './context' import { AuthenticationExtension } from './extensions/authentication' import { StorageExtension } from './extensions/storage' -import { Controller, getClientFactory } from './platform' +import { simpleClientFactory } from './platform' import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc' import { PlatformStorageAdapter } from './storage/platform' import { MarkupTransformer } from './transformers/markup' @@ -83,8 +83,6 @@ export async function start ( const extensionsCtx = ctx.newChild('extensions', {}) - const controller = new Controller() - const transformer = new MarkupTransformer(extensions) const hocuspocus = new Hocuspocus({ @@ -124,18 +122,13 @@ export async function start ( extensions: [ new AuthenticationExtension({ - ctx: extensionsCtx.newChild('authenticate', {}), - controller + ctx: extensionsCtx.newChild('authenticate', {}) }), new StorageExtension({ ctx: extensionsCtx.newChild('storage', {}), adapter: new PlatformStorageAdapter({ minio }, mongo, transformer) }) - ], - - async onDestroy (data: onDestroyPayload): Promise { - await controller.close() - } + ] }) const rpcCtx = ctx.newChild('rpc', {}) @@ -144,7 +137,7 @@ export async function start ( return { connectionId: generateId(), workspaceId: token.workspace, - clientFactory: getClientFactory(token, controller) + clientFactory: simpleClientFactory(token) } } @@ -192,9 +185,11 @@ export async function start ( } res.status(400).send(response) } else { - await rpcCtx.withLog('/rpc', { method: request.method }, async (ctx) => { + await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => { try { - const response: RpcResponse = await method(ctx, context, request.payload, { hocuspocus, minio, transformer }) + const response: RpcResponse = await rpcCtx.with(request.method, {}, async (ctx) => { + return await method(ctx, context, request.payload, { hocuspocus, minio, transformer }) + }) res.status(200).send(response) } catch (err: any) { res.status(500).send({ error: err.message }) diff --git a/server/collaborator/src/storage/platform.ts b/server/collaborator/src/storage/platform.ts index 41d0f0ff66c..54f0474900c 100644 --- a/server/collaborator/src/storage/platform.ts +++ b/server/collaborator/src/storage/platform.ts @@ -29,6 +29,7 @@ import core, { CollaborativeDoc, Doc, MeasureContext, + TxOperations, collaborativeDocWithLastVersion, toWorkspaceString } from '@hcengineering/core' @@ -87,7 +88,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { const { platformDocumentId } = context if (platformDocumentId !== undefined) { ctx.info('load document platform content', { documentId, platformDocumentId }) - const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => { + const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => { try { return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context) } catch (err) { @@ -112,27 +113,37 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { } async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise { - let snapshot: YDocVersion | undefined - try { - ctx.info('take document snapshot', { documentId }) - snapshot = await this.takeSnapshot(ctx, documentId, document, context) - } catch (err) { - ctx.error('failed to take document snapshot', { documentId, error: err }) - } + const { clientFactory } = context + + const client = await ctx.with('connect', {}, async () => { + return await clientFactory() + }) try { - ctx.info('save document content', { documentId }) - await this.saveDocumentToStorage(ctx, documentId, document, context) - } catch (err) { - ctx.error('failed to save document', { documentId, error: err }) - } + let snapshot: YDocVersion | undefined + try { + ctx.info('take document snapshot', { documentId }) + snapshot = await this.takeSnapshot(ctx, client, documentId, document, context) + } catch (err) { + ctx.error('failed to take document snapshot', { documentId, error: err }) + } - const { platformDocumentId } = context - if (platformDocumentId !== undefined) { - ctx.info('save document content to platform', { documentId, platformDocumentId }) - await ctx.with('save-document', { storage: 'platform' }, async (ctx) => { - await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context) - }) + try { + ctx.info('save document content', { documentId }) + await this.saveDocumentToStorage(ctx, documentId, document, context) + } catch (err) { + ctx.error('failed to save document', { documentId, error: err }) + } + + const { platformDocumentId } = context + if (platformDocumentId !== undefined) { + ctx.info('save document content to platform', { documentId, platformDocumentId }) + await ctx.with('save-to-platform', {}, async (ctx) => { + await this.saveDocumentToPlatform(ctx, client, documentId, platformDocumentId, document, snapshot, context) + }) + } + } finally { + await client.close() } } @@ -180,6 +191,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { async takeSnapshot ( ctx: MeasureContext, + client: Omit, documentId: DocumentId, document: YDoc, context: Context @@ -187,9 +199,8 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { const { storage, collaborativeDoc } = parseDocumentId(documentId) const adapter = this.getStorageAdapter(storage) - const { clientFactory, workspaceId } = context + const { workspaceId } = context - const client = await clientFactory({ derived: false }) const timestamp = Date.now() const yDocVersion: YDocVersion = { @@ -233,6 +244,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { async saveDocumentToPlatform ( ctx: MeasureContext, + client: Omit, documentName: string, platformDocumentId: PlatformDocumentId, document: YDoc, @@ -241,12 +253,6 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { ): Promise { const { objectClass, objectId, objectAttr } = parsePlatformDocumentId(platformDocumentId) - const { clientFactory } = context - - const client = await ctx.with('connect', {}, async () => { - return await clientFactory({ derived: false }) - }) - const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr) if (attribute === undefined) { ctx.info('attribute not found', { documentName, objectClass, objectAttr })