vscode 源码解析 - 进程间调用
vscode 的架构中主要有四类进程:主进程、渲染进程、shared 进程和 host 进程,这四个进程之间会发生进程间调用(Inter Process Calling, IPC)。vscode 中有专门的 IPC 模块来实现 IPC 机制,这篇文章将会深入介绍 vscode IPC 模块的设计和原理。
IPC 原理
在我们开始学习 vscode 的 IPC 机制之前,不妨根据我们已经掌握的关于计算机网络的基本知识,来推演一下 IPC 有何要点:
- 有客户端和服务端,客户端向服务端发起请求,请求即是要求调用服务端的某个方法,服务端返回响应,响应即是该方法的返回值
- 客户端和服务端需要建立连接
- 服务端需要以某种机制分派请求,以找到被调用的方法所在的模块
- 请求需要通过某种协议来让双方知道如何解析和生成请求或响应
可以看到 IPC 理念上是比较简单的,而 vscode IPC 模块的优点在于,它清楚地定义了 IPC 模块的各个层次,将客户端的调用过程封装得就像是在调用本地的一个异步方法一样,还让不同的跨进程环境——例如本地进程、基于网络的跨进程、web worker——都能够很容易地实现。
vscode IPC 机制概述
vscode IPC 分为基于 Channel 的和基于 RpcProtocol 的两种。
基于 Channel 的机制
我们通过一个例子开始对 Channel 机制的介绍。
在渲染进程初始化的时候,会创建一个 ElectronIPCMainProcessService
,然后以此创建一个 LoggerChannelClient
,并以 ILoggerService
为 key 添加到依赖注入系统当中:
// Main Processconst mainProcessService = this._register(new ElectronIPCMainProcessService(this.configuration.windowId));serviceCollection.set(IMainProcessService, mainProcessService);
// Loggerconst loggerService = new LoggerChannelClient(mainProcessService.getChannel('logger'));serviceCollection.set(ILoggerService, loggerService);
我们进一步看 LoggerChannelClient
的实现的话,就会发现它会调用 channel
的 call
方法,这里就就发起了一个 IPC:
export class LoggerChannelClient implements ILoggerService { constructor(private readonly channel: IChannel) { }
createConsoleMainLogger(): ILogger { return new AdapterLogger({ log: (level: LogLevel, args: any[]) => { this.channel.call('consoleLog', [level, args]); } }); }}
就是 channel IPC。
channel IPC 主要支持两种类型的调用,通过下面这个枚举类型可以看出:
export const enum RequestType { Promise = 100, PromiseCancel = 101, EventListen = 102, EventDispose = 103}
- 第一种是基于 Promise 的调用
- 第二种是基于事件的调用
- 而切两种调用方式都有对应的取消的办法
我们这篇文章将会以基于 Promise 的调用为例,基于事件的调用大家可以自行了解。
channel IPC 主要有以下这些参与者,它们之间的关系如下图所示:
服务端
- 服务,各种业务逻辑实际发生的地方
IServerChannel
,一个IServerChannel
和一种服务对应,它们提供了call
和listen
两个方法给ChannelServer
调用,然后调用它们对应的服务来具体执行各种业务逻辑,实际上是对服务的一种包裹IChannelServer
,它负责监听IMessagePassingProtocol
传过来的请求,然后根据请求中指定的 channelName 来找到IServerChannel
并进行调用,还能将执行结果返回给客户端IPCServer
,它提供了一组注册和获取IServerChannel
的方法,并能够通过路由机制选择要通讯的客户端IMessagePassingProtocol
,它负责传输Uint8Array
类型的二进制信息,并且在收到消息的时候通过事件通知上层
客户端
- 业务代码,业务代码会调用
IChannel
提供的方法来发起一个 IPC IChannel
,它们提供了call
和listen
两个方法给业务代码调用,用以发起 IPCIChannelClient
,它们是实际发起请求的地方,会将请求封装成一定的数据格式,在接收到响应的时候返回给业务代码IPCClient
,它提供一组注册和获取IChannel
的方法IMessagePassingProtocol
,和它在服务端的对等方的功能一致
下面我们会具体讲解每个模块的机制。
IChannel 和 IServerChannel
阅读过本系列之前两篇关于依赖注入和服务化的读者,应该已经知道 vscode 中各种功能都是封装在服务当中的,所以 IPC 的执行过程中必须要找到某个能响应特定调用的服务,IServerChannel
则是负责和服务一一对应,帮助它们接入 IPC 系统的,我们将称为实体。
而在客户端一侧,业务代码不知道 IPC 机制的接口,因此不能直接发起请求,而是将 IChannel
作为一个能够帮助它发起请求的代理。
IserverChannel
和 IChannel
分别就是实体和代理的接口:
export interface IChannel { call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>; listen<T>(event: string, arg?: any): Event<T>;}
/** * An `IServerChannel` is the counter part to `IChannel`, * on the server-side. You should implement this interface * if you'd like to handle remote promises or events. */export interface IServerChannel<TContext = string> { call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>; listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;}
一个 IChannel
像就这样(即 return 返回的对象):
getChannel<T extends IChannel>(channelName: string): T { const that = this;
return { call(command: string, arg?: any, cancellationToken?: CancellationToken) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestPromise(channelName, command, arg, cancellationToken); }, listen(event: string, arg: any) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestEvent(channelName, event, arg); } } as T; }
而一个 IServerChannel
会像是这样:
export class TestChannel implements IServerChannel {
constructor(private testService: ITestService) { }
listen(_: unknown, event: string): Event<any> { switch (event) { case 'marco': return this.testService.onMarco; }
throw new Error('Event not found'); }
call(_: unknown, command: string, ...args: any[]): Promise<any> { switch (command) { case 'pong': return this.testService.pong(args[0]); case 'cancelMe': return this.testService.cancelMe(); case 'marco': return this.testService.marco(); default: return Promise.reject(new Error(`command not found: ${command}`)); } }}
在这个例子中 TestChannel
就是对 ITestService
的一层封装。
创建 IServerChannel 的方式有很多种,除了上面这样的直接实现,还可以借助 ProxyChannel
namespace 提供的方法。
ProxyChannel
如果不需要为 service 做一些特殊处理,可以直接使用 ProxyChannel
namespace 下的 fromService
方法将一个 service 包装成一个 IServerChannel
:
export function fromService(service: unknown, options?: ICreateServiceChannelOptions): IServerChannel { const handler = service as { [key: string]: unknown }; const disableMarshalling = options && options.disableMarshalling;
// Buffer any event that should be supported by // iterating over all property keys and finding them const mapEventNameToEvent = new Map<string, Event<unknown>>(); for (const key in handler) { if (propertyIsEvent(key)) { mapEventNameToEvent.set(key, Event.buffer(handler[key] as Event<unknown>, true)); } }
return new class implements IServerChannel {
listen<T>(_: unknown, event: string): Event<T> { const eventImpl = mapEventNameToEvent.get(event); if (eventImpl) { return eventImpl as Event<T>; }
throw new Error(`Event not found: ${event}`); }
call(_: unknown, command: string, args?: any[]): Promise<any> { const target = handler[command]; if (typeof target === 'function') {
// Revive unless marshalling disabled if (!disableMarshalling && Array.isArray(args)) { for (let i = 0; i < args.length; i++) { args[i] = revive(args[i]); } }
return target.apply(handler, args); }
throw new Error(`Method not found: ${command}`); } }; }
同样的,也可以通过 toService
将 IChannel
封装成服务供业务代码调用,这样业务代码就不用自己去调用 IChannel
的 call
或者 listen
方法。
export function toService<T>(channel: IChannel, options?: ICreateProxyServiceOptions): T { const disableMarshalling = options && options.disableMarshalling;
return new Proxy({}, { get(_target: T, propKey: PropertyKey) { if (typeof propKey === 'string') {
// Check for predefined values if (options?.properties?.has(propKey)) { return options.properties.get(propKey); }
// Event if (propertyIsEvent(propKey)) { return channel.listen(propKey); }
// Function return async function (...args: any[]) {
// Add context if any let methodArgs: any[]; if (options && !isUndefinedOrNull(options.context)) { methodArgs = [options.context, ...args]; } else { methodArgs = args; }
const result = await channel.call(propKey, methodArgs);
// Revive unless marshalling disabled if (!disableMarshalling) { return revive(result); }
return result; }; }
throw new Error(`Property not found: ${String(propKey)}`); } }) as T; }
本质上是创建了一个 Proxy,将对 Proxy 属性的访问转换成对 channel 的 call
listen
方法的调用。
IChannelServer
IChannelServer
的主要职责包括:
- 从
protocol
接收消息 - 根据消息的类型进行处理
- 调用合适的
IServerChannel
来处理请求 - 将响应发送给客户端
- 注册
IServerChannel
IChannelServer
直接监听 protocol
的消息,然后调用自己的 onRawMessage
方法处理请求。onRawMessge
会根据请求的类型来调用其他方法。以基于 Promise 的调用为例,可以看到它的核心逻辑就是调用 IServerChannel
的 call
方法。
private onRawMessage(message: VSBuffer): void { const reader = new BufferReader(message); const header = deserialize(reader); const body = deserialize(reader); const type = header[0] as RequestType;
switch (type) { case RequestType.Promise: if (this.logger) { this.logger.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}: ${header[2]}.${header[3]}`, body); } return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body }); // ... } }
private onPromise(request: IRawPromiseRequest): void { const channel = this.channels.get(request.channelName);
let promise: Promise<any>;
try { promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token); } catch (err) { // ... }
const id = request.id;
promise.then(data => { this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.PromiseSuccess }); this.activeRequests.delete(request.id); }, err => { // ... }); }
可以看到,这里通过 request
的 channelName
获取到一个 IServerChannel
,然后调用了它的 call
方法,并将结果通过 this.sendResponse
发送给客户端。显然,这里 this.channels
需要注册 IServerChannel
,而 IChannelServer
提供了这样的方法:
registerChannel(channelName: string, channel: IServerChannel<TContext>): void { this.channels.set(channelName, channel);
setTimeout(() => this.flushPendingRequests(channelName), 0); }
IChannelClient
IChannelClient
的逻辑比较简单,它只提供了一个接口,即 getChannel
,它返回了一个 IChannel
,实际上就是通过闭包保存了 channelName
,然后在业务方调用的时候调用 requestPromise
等发起请求。
getChannel<T extends IChannel>(channelName: string): T { const that = this;
return { call(command: string, arg?: any, cancellationToken?: CancellationToken) { if (that.isDisposed) { return Promise.reject(errors.canceled()); } return that.requestPromise(channelName, command, arg, cancellationToken); }, // ... } as T; }
private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<any> { const id = this.lastRequestId++; const type = RequestType.Promise; const request: IRawRequest = { id, type, channelName, name, arg };
if (cancellationToken.isCancellationRequested) { return Promise.reject(errors.canceled()); }
let disposable: IDisposable;
const result = new Promise((c, e) => { if (cancellationToken.isCancellationRequested) { return e(errors.canceled()); }
const doRequest = () => { const handler: IHandler = response => { switch (response.type) { case ResponseType.PromiseSuccess: this.handlers.delete(id); c(response.data); break;
case ResponseType.PromiseError: this.handlers.delete(id); const error = new Error(response.data.message); (<any>error).stack = response.data.stack; error.name = response.data.name; e(error); break;
case ResponseType.PromiseErrorObj: this.handlers.delete(id); e(response.data); break; } };
this.handlers.set(id, handler); this.sendRequest(request); };
let uninitializedPromise: CancelablePromise<void> | null = null; if (this.state === State.Idle) { doRequest(); } else { // ... }
const cancellationTokenListener = cancellationToken.onCancellationRequested(cancel); disposable = combinedDisposable(toDisposable(cancel), cancellationTokenListener); this.activeRequests.add(disposable); });
return result.finally(() => { this.activeRequests.delete(disposable); }); }
消息传输
我们已经看到了 IChannelServer
和 IChannelClient
之间会互发数据,这里简单讲解一下消息传输的机制。
首先消息传输需要约定好请求和响应的结构。
IPC 请求的字段如下:
type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any; };type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel, id: number };type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any; };type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number };
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;
type
,表明这是一种什么类型的调用id
,请求的唯一标识符,与请求相对应的响应会有相同的 idchannelName
,调用的 channel 的名称name
,如果是基于 Promise 的调用,就是方法的名称,如果是基于事件的监听,就是事件的名称arg
,参数
IPC 响应的字段如下:
type IRawInitializeResponse = { type: ResponseType.Initialize };type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any };type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string, name: string, stack: string[] | undefined } };type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any };type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any };
type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;
type
,表明是什么类型的响应id
,响应的唯一标识符data
,返回的数据
请求和响应在被发送之前,都会通过 VSBuffer
进行序列化,在接收之后则会进行反序列化。
需要一定的机制来将请求和响应对应起来。这在服务端比较容易,因为服务端的处理在顺序上处于 IPC 的中间环节,可以很自然的通过作用域来对应请求和响应。而在客户端,则需要一些机制来匹配请求和响应。
IChannelClient
在 sendRequest
之前,会通过 id
来在自身的 handlers
Map 上绑定一个 handler
this.handlers.set(id, handler);
而在收到消息的时候,就会通过这里 id
调用相应的 handler
,从而 resolve 客户端 IChannel
的调用。
private onResponse(response: IRawResponse): void { if (response.type === ResponseType.Initialize) { this.state = State.Idle; this._onDidInitialize.fire(); return; }
const handler = this.handlers.get(response.id);
if (handler) { handler(response); } }
IMessagePassingProtocol
IChannelServer
和 IChannelClient
之间会通过 protocol 传输数据。对于上层,它提供二进制数据流传输服务(用 VSBuffer
进行了封装),并能够在有新消息到达的时候通知上层。
其接口非常简单:
export interface IMessagePassingProtocol { send(buffer: VSBuffer): void; onMessage: Event<VSBuffer>; /** * Wait for the write buffer (if applicable) to become empty. */ drain?(): Promise<void>;}
send
通过下层信道发送Uint8Array
格式的消息onMessage
则在下层信道收到消息时触发上层的回调函数
不同的通讯端有不同的信道,因此 IMessagePassingProtocol
也有多种实现,大致有以下几种:
- 基于 Electron IPC 模块的实现,通过 webContents 和 ipcRenderer 收发消息;主进程和渲染进程的通讯使用这种方法
- 基于 web worker 的实现,通过 postMessage 和 onMessage 进行通讯;vscode 浏览器端部分插件基于这种实现
- 基于 web socket 的实现或 Node.js net 模块实现 ,通过 WebSocket 或 net 创建的套接字或者 pipe 进行通讯;vscode 浏览器端部分插件,以及渲染进程和 Host 进程的通讯基于这种实现(这是最有趣的一个对 Protocol 的实现,vscode 团队在这里实现了一个翻版的 TCP 协议)
IPCClient
它用于在客户端管理 IChannel
,它同时实现了 IChannelClient
和 IChannelServer
,所以它实际上可以发起也可以响应 IPC:
export interface IChannelClient { getChannel<T extends IChannel>(channelName: string): T;}
export interface IChannelServer<TContext = string> { registerChannel(channelName: string, channel: IServerChannel<TContext>): void;}
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
private channelClient: ChannelClient; private channelServer: ChannelServer<TContext>;
constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) { const writer = new BufferWriter(); serialize(writer, ctx); protocol.send(writer.buffer);
this.channelClient = new ChannelClient(protocol, ipcLogger); this.channelServer = new ChannelServer(protocol, ctx, ipcLogger); }
getChannel<T extends IChannel>(channelName: string): T { return this.channelClient.getChannel(channelName) as T; }
registerChannel(channelName: string, channel: IServerChannel<TContext>): void { this.channelServer.registerChannel(channelName, channel); }
dispose(): void { this.channelClient.dispose(); this.channelServer.dispose(); }}
可以看到它仅有一个 IMessagePassingProtocol
,换句话说,就是只能跟一方进行通讯,这也是它跟 IPCServer
最大的区别。
IPCServer
它一共实现了三个接口:
export interface IChannelServer<TContext = string> { registerChannel(channelName: string, channel: IServerChannel<TContext>): void;}
export interface IRoutingChannelClient<TContext = string> { getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T;}
export interface IConnectionHub<TContext> { readonly connections: Connection<TContext>[]; readonly onDidAddConnection: Event<Connection<TContext>>; readonly onDidRemoveConnection: Event<Connection<TContext>>;}
IRoutingChannelClient
说明它可以根据一定的条件选择向哪个IChannelServer
发起调用,即对调用进行路由IConnectionHub
则说明它可以管理客户端连接
我们来看 IPCServer
的构造方法:
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable { constructor(onDidClientConnect: Event<ClientConnectionEvent>) { onDidClientConnect(({ protocol, onDidClientDisconnect }) => { const onFirstMessage = Event.once(protocol.onMessage);
onFirstMessage(msg => { const reader = new BufferReader(msg); const ctx = deserialize(reader) as TContext;
const channelServer = new ChannelServer(protocol, ctx); const channelClient = new ChannelClient(protocol);
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
const connection: Connection<TContext> = { channelServer, channelClient, ctx }; this._connections.add(connection); this._onDidAddConnection.fire(connection);
onDidClientDisconnect(() => { channelServer.dispose(); channelClient.dispose(); this._connections.delete(connection); this._onDidRemoveConnection.fire(connection); }); }); }); }}
可以看到,在对方发来第一条消息时,IPCServer
会创建:
ChannelServer
ChannelClient
Connection
,这个Connection
就是来描述连接的,它的接口如下
interface Connection<TContext> extends Client<TContext> { readonly channelServer: ChannelServer<TContext>; readonly channelClient: ChannelClient;}
export interface Client<TContext> { readonly ctx: TContext;}
注意这里的 ctx
属性,它是客户端的标识符,将用在请求路由的过程中。它的 getChannel
和 ChannelClient
的 getChannel
有很大不同:
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T; getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T; getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T { const that = this;
return { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> { let connectionPromise: Promise<Client<TContext>>;
if (isFunction(routerOrClientFilter)) { // when no router is provided, we go random client picking let connection = getRandomElement(that.connections.filter(routerOrClientFilter));
connectionPromise = connection // if we found a client, let's call on it ? Promise.resolve(connection) // else, let's wait for a client to come along : Event.toPromise(Event.filter(that.onDidAddConnection, routerOrClientFilter)); } else { connectionPromise = routerOrClientFilter.routeCall(that, command, arg); }
const channelPromise = connectionPromise .then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
return getDelayedChannel(channelPromise) .call(command, arg, cancellationToken); }, listen(event: string, arg: any): Event<T> { // ... } } as T; }
可以看到,在调用 getChannel
的时候如果传入了 routerOrClientFilter
,则会在 connections
中选择一个。
Routing
选择 Connection
的方法,可以是一个简单的 filter 函数,也可以是通过 IClientRouter
提供的 routeCall
或者 routeEvent
方法。我们以 StaticRouter
为例:
export class StaticRouter<TContext = string> implements IClientRouter<TContext> {
constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { }
routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { return this.route(hub); }
routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { return this.route(hub); }
private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> { for (const connection of hub.connections) { if (await Promise.resolve(this.fn(connection.ctx))) { return Promise.resolve(connection); } }
await Event.toPromise(hub.onDidAddConnection); return await this.route(hub); }}
实际上在 getChannel
调用他的时候,会通过 fn
来选择一个 IConnectionHub
中的 Connection
。
到这里,整个基于 channel 的 IPC 机制我们就介绍完毕了。
基于 RpcProtocol 的机制
vscode IPC 的第二种机制基于 RpcProtocol
,用于渲染进程和 extension host 进程通讯(如果 vscode 的运行环境是浏览器,那么就是主线程和 extension host web worker 之间进行通讯)。
举个例子,在 host 进程初始化时如果发生了错误,它会告知渲染进程,代码如下:
const mainThreadExtensions = rpcProtocol.getProxy(MainContext.MainThreadExtensionService); const mainThreadErrors = rpcProtocol.getProxy(MainContext.MainThreadErrors); errors.setUnexpectedErrorHandler(err => { const data = errors.transformErrorForSerialization(err); const extension = extensionErrors.get(err); if (extension) { mainThreadExtensions.$onExtensionRuntimeError(extension.identifier, data); } else { mainThreadErrors.$onUnexpectedError(data); } });
在调用 mainThreadExtensions
或 mainThreadError
的方法的时候,即发生了 IPC。
该机制如下图所示:
下面介绍其原理。
shape
客户端怎么知道 mainThreadExtensions
上有一个 $onExtensionRuntimeError
方法可以调用呢?
显然,这里需要定义一个接口,这个接口就是 MainThreadExtensionServiceShape
,定义在 extHost.protocol.ts 文件中。vscode 对于每一个可以调用的实体,都定义了一个以 Shape
为后缀的接口,服务端的实体必须要实现该接口,这样客户端在编写代码的时候就知道有哪些方法可以调用了。
identifier
客户端如何获取到服务端的实体在本地的代理,也就是 mainThreadExtensions
呢?换个问法,mainThreadExtensions
是如何跟 mainThreadErrors
相区别的呢?
代码中我们可以看到 mainThreadExtensions
是通过 rpcProtocol.getProxy(MainContext.MainThreadExtensionService)
获得的,MainContext.MainThreadExtensionService
在这里就起到了一个标识符的作用,它将每一个实体-代理的对子区别开。
MainContext.MainThreadExtensionService
定义在 extHost.protocol.ts 当中:
export const MainContext = { MainThreadExtensionService: createMainId<MainThreadExtensionServiceShape>('MainThreadExtensionService')}
而 createMainId
就是用于创建标识符的方法,本质上是创建了一个 ProxyIdentifier
对象并存在到一个数组当中:
export class ProxyIdentifier<T> { public static count = 0; _proxyIdentifierBrand: void;
public readonly isMain: boolean; public readonly sid: string; public readonly nid: number;
constructor(isMain: boolean, sid: string) { this.isMain = isMain; this.sid = sid; this.nid = (++ProxyIdentifier.count); }}
const identifiers: ProxyIdentifier<any>[] = [];
export function createMainContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> { const result = new ProxyIdentifier<T>(true, identifier); identifiers[result.nid] = result; return result;}
export function createExtHostContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> { const result = new ProxyIdentifier<T>(false, identifier); identifiers[result.nid] = result; return result;}
每个标识符有三个字段:
- isMain 标识实体是否是在渲染进程当中
- sid 标识字符串 id
- nid 标识数字 id
context
我们如何知道另外一个进程中,有哪些实体可以被调用?
extHost.protocol.ts 文件中定义了 MainContext 和 ExtHostContext 两个文件。前者定义了渲染进程中可被调用的实体,后者定义了 host 进程中可被调用的实体。这里也可以看出,在 RpcProtocol 机制下,渲染进程和 host 进程是可以互相调用的。
customer
可被调用的实体是如何注册的?
host 进程调用 mainThreadExtensions
方法的时候,渲染进程必须要有类提供这个方法,而且它还需要注册到这个 RpcProtocol 的机制上。通过查找实现了 MainThreadExtensionServiceShape
的类,不难发现 mainThreadExtensionService.ts 中存在这样一段代码:
@extHostNamedCustomer(MainContext.MainThreadExtensionService)export class MainThreadExtensionService implements MainThreadExtensionServiceShape { // ...}
注意这里装饰器的调用,我们探究其实现:
export function extHostNamedCustomer<T extends IDisposable>(id: ProxyIdentifier<T>) { return function <Services extends BrandedService[]>(ctor: { new(context: IExtHostContext, ...services: Services): T }): void { ExtHostCustomersRegistryImpl.INSTANCE.registerNamedCustomer(id, ctor as IExtHostCustomerCtor<T>); };}
可以发现它是将 id
,也就是 MainContext.MainThreadExtensionService
和 MainThreadExtensionService
绑定起来,而在 extension host 初始化的时候实例化它:
const namedCustomers = ExtHostCustomersRegistry.getNamedCustomers(); for (let i = 0, len = namedCustomers.length; i < len; i++) { const [id, ctor] = namedCustomers[i]; const instance = this._instantiationService.createInstance(ctor, extHostContext); this._customers.push(instance); this._rpcProtocol.set(id, instance); }
注册的最后一步就是调用 RpcProtocol.set
方法注册可被调用的实体。
RpcProtocol 的通讯原理
到这里我们基本了解了 RpcProtocol 的接口了,下面来了解一下它的内部逻辑。
首先来看 getProxy
,我们知道客户端要通过这个方法获取可调用的代理:
public getProxy<T>(identifier: ProxyIdentifier<T>): T { const { nid: rpcId, sid } = identifier; if (!this._proxies[rpcId]) { this._proxies[rpcId] = this._createProxy(rpcId, sid); } return this._proxies[rpcId]; }
private _createProxy<T>(rpcId: number, debugName: string): T { let handler = { get: (target: any, name: PropertyKey) => { if (typeof name === 'string' && !target[name] && name.charCodeAt(0) === CharCode.DollarSign) { target[name] = (...myArgs: any[]) => { return this._remoteCall(rpcId, name, myArgs); }; } if (name === _RPCProxySymbol) { return debugName; } return target[name]; } }; return new Proxy(Object.create(null), handler); }
可以看到它的核心逻辑就是创建一个 Proxy 对象,当对象上的属性被访问时,所有以 $ 开头的属性都会被包装为一个对 this._remoteCall
进行调用的方法。
_remoteCall
的核心逻辑则主要是下面几行(这里主要省略了取消请求相关的逻辑):
private _remoteCall(rpcId: number, methodName: string, args: any[]): Promise<any> { const serializedRequestArguments = MessageIO.serializeRequestArguments(args, this._uriReplacer);
const req = ++this._lastMessageId; const callId = String(req); const result = new LazyPromise();
this._pendingRPCReplies[callId] = result; this._onWillSendRequest(req); const msg = MessageIO.serializeRequest(req, rpcId, methodName, serializedRequestArguments, !!cancellationToken);
this._protocol.send(msg); return result; } // MessageIO public static serializeRequest(req: number, rpcId: number, method: string, serializedArgs: SerializedRequestArguments, usesCancellationToken: boolean): VSBuffer { if (serializedArgs.type === 'mixed') { return this._requestMixedArgs(req, rpcId, method, serializedArgs.args, serializedArgs.argsType, usesCancellationToken); } return this._requestJSONArgs(req, rpcId, method, serializedArgs.args, usesCancellationToken); }
private static _requestJSONArgs(req: number, rpcId: number, method: string, args: string, usesCancellationToken: boolean): VSBuffer { const methodBuff = VSBuffer.fromString(method); const argsBuff = VSBuffer.fromString(args);
let len = 0; len += MessageBuffer.sizeUInt8(); len += MessageBuffer.sizeShortString(methodBuff); len += MessageBuffer.sizeLongString(argsBuff);
let result = MessageBuffer.alloc(usesCancellationToken ? MessageType.RequestJSONArgsWithCancellation : MessageType.RequestJSONArgs, req, len); result.writeUInt8(rpcId); result.writeShortString(methodBuff); result.writeLongString(argsBuff); return result.buffer; }
// MessageBuffer public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer { let result = new MessageBuffer(VSBuffer.alloc(messageSize + 1 /* type */ + 4 /* req */), 0); result.writeUInt8(type); result.writeUInt32(req); return result; }
可以看到一个请求主要有以下这些信息:
type
,请求的类型,由一个枚举 MessageType 所定义req
,请求的序号,是一个自增的数字rpcId
,identifier 的字符串 id,表明是哪个实体-代理之间的请求method
,指定要调用实体的哪个方法argsBuff
,序列化的参数
最终这些参数都会被封装为一个 VSBuffer
并通过 protocol 发送,而这些而这里的 protocol,这是我们的老朋友 IMessagePassingProtocol
。 所以我们可以看到 RpcProtocol 机制也是分层的设计,可以在不同的环境中使用。
当服务端接收到一个请求时,会回调 _receiveOneMessage
方法进行处理:
private _receiveOneMessage(rawmsg: VSBuffer): void { if (this._isDisposed) { return; }
const msgLength = rawmsg.byteLength; const buff = MessageBuffer.read(rawmsg, 0); const messageType = <MessageType>buff.readUInt8(); const req = buff.readUInt32();
switch (messageType) { case MessageType.RequestJSONArgs: case MessageType.RequestJSONArgsWithCancellation: { let { rpcId, method, args } = MessageIO.deserializeRequestJSONArgs(buff); if (this._uriTransformer) { args = transformIncomingURIs(args, this._uriTransformer); } this._receiveRequest(msgLength, req, rpcId, method, args, (messageType === MessageType.RequestJSONArgsWithCancellation)); break; } // ... }
即根据 type
来调用不同的方法对请求进行处理,这里来看 _receiveRequest
方法:
private _receiveRequest(msgLength: number, req: number, rpcId: number, method: string, args: any[], usesCancellationToken: boolean): void { const callId = String(req);
let promise: Promise<any>; let cancel: () => void; if (usesCancellationToken) { // ... } else { // cannot be cancelled promise = this._invokeHandler(rpcId, method, args); cancel = noop; }
// Acknowledge the request const msg = MessageIO.serializeAcknowledged(req); this._protocol.send(msg);
promise.then((r) => { delete this._cancelInvokedHandlers[callId]; const msg = MessageIO.serializeReplyOK(req, r, this._uriReplacer); this._protocol.send(msg); }, (err) => { // ... }); } private _invokeHandler(rpcId: number, methodName: string, args: any[]): Promise<any> { try { return Promise.resolve(this._doInvokeHandler(rpcId, methodName, args)); } catch (err) { return Promise.reject(err); } }
private _doInvokeHandler(rpcId: number, methodName: string, args: any[]): any { const actor = this._locals[rpcId]; if (!actor) { throw new Error('Unknown actor ' + getStringIdentifierForProxy(rpcId)); } let method = actor[methodName]; if (typeof method !== 'function') { throw new Error('Unknown method ' + methodName + ' on actor ' + getStringIdentifierForProxy(rpcId)); } return method.apply(actor, args); }
核心就是调用 _invokeHandler
然后将结果发送回去。
注意到这一行 const actor = this._locals[rpcId];
获取了可被调用的实体,记得之前注册实体时调用的 set 方法吗:
public set<T, R extends T>(identifier: ProxyIdentifier<T>, value: R): R { this._locals[identifier.nid] = value; return value; }
到这里,我们就了解了 RpcProtocol 的原理了。
, CC BY-NC 4.0 © Wendell.RSS