vscode 源码解析 - 进程间调用

Wenzhao,vscodesource codeChineseipc


vscode 的架构中主要有四类进程:主进程、渲染进程、shared 进程和 host 进程,这四个进程之间会发生进程间调用(Inter Process Calling, IPC)。vscode 中有专门的 IPC 模块来实现 IPC 机制,这篇文章将会深入介绍 vscode IPC 模块的设计和原理。

IPC 原理

在我们开始学习 vscode 的 IPC 机制之前,不妨根据我们已经掌握的关于计算机网络的基本知识,来推演一下 IPC 有何要点:

  1. 客户端服务端,客户端向服务端发起请求,请求即是要求调用服务端的某个方法,服务端返回响应,响应即是该方法的返回值
  2. 客户端和服务端需要建立连接
  3. 服务端需要以某种机制分派请求,以找到被调用的方法所在的模块
  4. 请求需要通过某种协议来让双方知道如何解析和生成请求或响应

可以看到 IPC 理念上是比较简单的,而 vscode IPC 模块的优点在于,它清楚地定义了 IPC 模块的各个层次,将客户端的调用过程封装得就像是在调用本地的一个异步方法一样,还让不同的跨进程环境——例如本地进程、基于网络的跨进程、web worker——都能够很容易地实现。

vscode IPC 机制概述

vscode IPC 分为基于 Channel 的基于 RpcProtocol 的两种。

基于 Channel 的机制

我们通过一个例子开始对 Channel 机制的介绍。

在渲染进程初始化的时候,会创建一个 ElectronIPCMainProcessService,然后以此创建一个 LoggerChannelClient,并以 ILoggerService 为 key 添加到依赖注入系统当中:

// Main Process
const mainProcessService = this._register(new ElectronIPCMainProcessService(this.configuration.windowId));
serviceCollection.set(IMainProcessService, mainProcessService);
 
// Logger
const loggerService = new LoggerChannelClient(mainProcessService.getChannel('logger'));
serviceCollection.set(ILoggerService, loggerService);

我们进一步看 LoggerChannelClient 的实现的话,就会发现它会调用 channelcall 方法,这里就就发起了一个 IPC:

export class LoggerChannelClient implements ILoggerService {
	constructor(private readonly channel: IChannel) { }
 
	createConsoleMainLogger(): ILogger {
		return new AdapterLogger({
			log: (level: LogLevel, args: any[]) => {
				this.channel.call('consoleLog', [level, args]);
			}
		});
	}
}

就是 channel IPC

channel IPC 主要支持两种类型的调用,通过下面这个枚举类型可以看出:

export const enum RequestType {
	Promise = 100,
	PromiseCancel = 101,
	EventListen = 102,
	EventDispose = 103
}

我们这篇文章将会以基于 Promise 的调用为例,基于事件的调用大家可以自行了解。

channel IPC 主要有以下这些参与者,它们之间的关系如下图所示:

1r4qaFQIf7Vw7xVCw6kCxOl3A26cIaY-So42x1xb7Yw

服务端

客户端

下面我们会具体讲解每个模块的机制。

IChannel 和 IServerChannel

阅读过本系列之前两篇关于依赖注入和服务化的读者,应该已经知道 vscode 中各种功能都是封装在服务当中的,所以 IPC 的执行过程中必须要找到某个能响应特定调用的服务,IServerChannel 则是负责和服务一一对应,帮助它们接入 IPC 系统的,我们将称为实体

而在客户端一侧,业务代码不知道 IPC 机制的接口,因此不能直接发起请求,而是将 IChannel 作为一个能够帮助它发起请求的代理

IserverChannelIChannel 分别就是实体和代理的接口:

export interface IChannel {
	call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
	listen<T>(event: string, arg?: any): Event<T>;
}
 
/**
 * An `IServerChannel` is the counter part to `IChannel`,
 * on the server-side. You should implement this interface
 * if you'd like to handle remote promises or events.
 */
export interface IServerChannel<TContext = string> {
	call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
	listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;
}

一个 IChannel 像就这样(即 return 返回的对象):

getChannel<T extends IChannel>(channelName: string): T {
		const that = this;
 
		return {
			call(command: string, arg?: any, cancellationToken?: CancellationToken) {
				if (that.isDisposed) {
					return Promise.reject(errors.canceled());
				}
				return that.requestPromise(channelName, command, arg, cancellationToken);
			},
			listen(event: string, arg: any) {
				if (that.isDisposed) {
					return Promise.reject(errors.canceled());
				}
				return that.requestEvent(channelName, event, arg);
			}
		} as T;
	}

而一个 IServerChannel 会像是这样:

export class TestChannel implements IServerChannel {
 
	constructor(private testService: ITestService) { }
 
	listen(_: unknown, event: string): Event<any> {
		switch (event) {
			case 'marco': return this.testService.onMarco;
		}
 
		throw new Error('Event not found');
	}
 
	call(_: unknown, command: string, ...args: any[]): Promise<any> {
		switch (command) {
			case 'pong': return this.testService.pong(args[0]);
			case 'cancelMe': return this.testService.cancelMe();
			case 'marco': return this.testService.marco();
			default: return Promise.reject(new Error(`command not found: ${command}`));
		}
	}
}

在这个例子中 TestChannel 就是对 ITestService 的一层封装。

创建 IServerChannel 的方式有很多种,除了上面这样的直接实现,还可以借助 ProxyChannel namespace 提供的方法。

ProxyChannel

如果不需要为 service 做一些特殊处理,可以直接使用 ProxyChannel namespace 下的 fromService 方法将一个 service 包装成一个 IServerChannel

   export function fromService(service: unknown, options?: ICreateServiceChannelOptions): IServerChannel {
   	const handler = service as { [key: string]: unknown };
   	const disableMarshalling = options && options.disableMarshalling;
 
   	// Buffer any event that should be supported by
   	// iterating over all property keys and finding them
   	const mapEventNameToEvent = new Map<string, Event<unknown>>();
   	for (const key in handler) {
   		if (propertyIsEvent(key)) {
   			mapEventNameToEvent.set(key, Event.buffer(handler[key] as Event<unknown>, true));
   		}
   	}
 
   	return new class implements IServerChannel {
 
   		listen<T>(_: unknown, event: string): Event<T> {
   			const eventImpl = mapEventNameToEvent.get(event);
   			if (eventImpl) {
   				return eventImpl as Event<T>;
   			}
 
   			throw new Error(`Event not found: ${event}`);
   		}
 
   		call(_: unknown, command: string, args?: any[]): Promise<any> {
   			const target = handler[command];
   			if (typeof target === 'function') {
 
   				// Revive unless marshalling disabled
   				if (!disableMarshalling && Array.isArray(args)) {
   					for (let i = 0; i < args.length; i++) {
   						args[i] = revive(args[i]);
   					}
   				}
 
   				return target.apply(handler, args);
   			}
 
   			throw new Error(`Method not found: ${command}`);
   		}
   	};
   }

同样的,也可以通过 toServiceIChannel 封装成服务供业务代码调用,这样业务代码就不用自己去调用 IChannelcall 或者 listen 方法。

	export function toService<T>(channel: IChannel, options?: ICreateProxyServiceOptions): T {
		const disableMarshalling = options && options.disableMarshalling;
 
		return new Proxy({}, {
			get(_target: T, propKey: PropertyKey) {
				if (typeof propKey === 'string') {
 
					// Check for predefined values
					if (options?.properties?.has(propKey)) {
						return options.properties.get(propKey);
					}
 
					// Event
					if (propertyIsEvent(propKey)) {
						return channel.listen(propKey);
					}
 
					// Function
					return async function (...args: any[]) {
 
						// Add context if any
						let methodArgs: any[];
						if (options && !isUndefinedOrNull(options.context)) {
							methodArgs = [options.context, ...args];
						} else {
							methodArgs = args;
						}
 
						const result = await channel.call(propKey, methodArgs);
 
						// Revive unless marshalling disabled
						if (!disableMarshalling) {
							return revive(result);
						}
 
						return result;
					};
				}
 
				throw new Error(`Property not found: ${String(propKey)}`);
			}
		}) as T;
	}

本质上是创建了一个 Proxy,将对 Proxy 属性的访问转换成对 channel 的 call listen 方法的调用。

IChannelServer

IChannelServer 的主要职责包括:

  1. protocol 接收消息
  2. 根据消息的类型进行处理
  3. 调用合适的 IServerChannel 来处理请求
  4. 将响应发送给客户端
  5. 注册 IServerChannel

IChannelServer 直接监听 protocol 的消息,然后调用自己的 onRawMessage 方法处理请求。onRawMessge 会根据请求的类型来调用其他方法。以基于 Promise 的调用为例,可以看到它的核心逻辑就是调用 IServerChannelcall 方法。

  private onRawMessage(message: VSBuffer): void {
		const reader = new BufferReader(message);
		const header = deserialize(reader);
		const body = deserialize(reader);
		const type = header[0] as RequestType;
 
		switch (type) {
			case RequestType.Promise:
				if (this.logger) {
					this.logger.logIncoming(message.byteLength, header[1], RequestInitiator.OtherSide, `${requestTypeToStr(type)}: ${header[2]}.${header[3]}`, body);
				}
				return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
        
        // ...
		}
	}
 
	private onPromise(request: IRawPromiseRequest): void {
		const channel = this.channels.get(request.channelName);
 
		let promise: Promise<any>;
 
		try {
			promise = channel.call(this.ctx, request.name, request.arg, cancellationTokenSource.token);
		} catch (err) {
			// ...
		}
 
		const id = request.id;
 
		promise.then(data => {
			this.sendResponse(<IRawResponse>{ id, data, type: ResponseType.PromiseSuccess });
			this.activeRequests.delete(request.id);
		}, err => {
			// ...
		});
	}

可以看到,这里通过 requestchannelName 获取到一个 IServerChannel,然后调用了它的 call 方法,并将结果通过 this.sendResponse 发送给客户端。显然,这里 this.channels 需要注册 IServerChannel,而 IChannelServer 提供了这样的方法:

	registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
		this.channels.set(channelName, channel);
 
		setTimeout(() => this.flushPendingRequests(channelName), 0);
	}

IChannelClient

IChannelClient 的逻辑比较简单,它只提供了一个接口,即 getChannel ,它返回了一个 IChannel,实际上就是通过闭包保存了 channelName,然后在业务方调用的时候调用 requestPromise 等发起请求。

	getChannel<T extends IChannel>(channelName: string): T {
		const that = this;
 
		return {
			call(command: string, arg?: any, cancellationToken?: CancellationToken) {
				if (that.isDisposed) {
					return Promise.reject(errors.canceled());
				}
				return that.requestPromise(channelName, command, arg, cancellationToken);
			},
			// ...
		} as T;
	}
 
	private requestPromise(channelName: string, name: string, arg?: any, cancellationToken = CancellationToken.None): Promise<any> {
		const id = this.lastRequestId++;
		const type = RequestType.Promise;
		const request: IRawRequest = { id, type, channelName, name, arg };
 
		if (cancellationToken.isCancellationRequested) {
			return Promise.reject(errors.canceled());
		}
 
		let disposable: IDisposable;
 
		const result = new Promise((c, e) => {
			if (cancellationToken.isCancellationRequested) {
				return e(errors.canceled());
			}
 
			const doRequest = () => {
				const handler: IHandler = response => {
					switch (response.type) {
						case ResponseType.PromiseSuccess:
							this.handlers.delete(id);
							c(response.data);
							break;
 
						case ResponseType.PromiseError:
							this.handlers.delete(id);
							const error = new Error(response.data.message);
							(<any>error).stack = response.data.stack;
							error.name = response.data.name;
							e(error);
							break;
 
						case ResponseType.PromiseErrorObj:
							this.handlers.delete(id);
							e(response.data);
							break;
					}
				};
 
				this.handlers.set(id, handler);
				this.sendRequest(request);
			};
 
			let uninitializedPromise: CancelablePromise<void> | null = null;
			if (this.state === State.Idle) {
				doRequest();
			} else {
				// ...
			}
 
			const cancellationTokenListener = cancellationToken.onCancellationRequested(cancel);
			disposable = combinedDisposable(toDisposable(cancel), cancellationTokenListener);
			this.activeRequests.add(disposable);
		});
 
		return result.finally(() => { this.activeRequests.delete(disposable); });
	}

消息传输

我们已经看到了 IChannelServerIChannelClient 之间会互发数据,这里简单讲解一下消息传输的机制。

首先消息传输需要约定好请求和响应的结构。

IPC 请求的字段如下:

type IRawPromiseRequest = { type: RequestType.Promise; id: number; channelName: string; name: string; arg: any; };
type IRawPromiseCancelRequest = { type: RequestType.PromiseCancel, id: number };
type IRawEventListenRequest = { type: RequestType.EventListen; id: number; channelName: string; name: string; arg: any; };
type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number };
 
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;

IPC 响应的字段如下:

type IRawInitializeResponse = { type: ResponseType.Initialize };
type IRawPromiseSuccessResponse = { type: ResponseType.PromiseSuccess; id: number; data: any };
type IRawPromiseErrorResponse = { type: ResponseType.PromiseError; id: number; data: { message: string, name: string, stack: string[] | undefined } };
type IRawPromiseErrorObjResponse = { type: ResponseType.PromiseErrorObj; id: number; data: any };
type IRawEventFireResponse = { type: ResponseType.EventFire; id: number; data: any };
 
type IRawResponse = IRawInitializeResponse | IRawPromiseSuccessResponse | IRawPromiseErrorResponse | IRawPromiseErrorObjResponse | IRawEventFireResponse;

请求和响应在被发送之前,都会通过 VSBuffer 进行序列化,在接收之后则会进行反序列化。

需要一定的机制来将请求和响应对应起来。这在服务端比较容易,因为服务端的处理在顺序上处于 IPC 的中间环节,可以很自然的通过作用域来对应请求和响应。而在客户端,则需要一些机制来匹配请求和响应。

IChannelClientsendRequest 之前,会通过 id 来在自身的 handlers Map 上绑定一个 handler

this.handlers.set(id, handler);

而在收到消息的时候,就会通过这里 id 调用相应的 handler,从而 resolve 客户端 IChannel 的调用。

	private onResponse(response: IRawResponse): void {
		if (response.type === ResponseType.Initialize) {
			this.state = State.Idle;
			this._onDidInitialize.fire();
			return;
		}
 
		const handler = this.handlers.get(response.id);
 
		if (handler) {
			handler(response);
		}
	}

IMessagePassingProtocol

IChannelServerIChannelClient 之间会通过 protocol 传输数据。对于上层,它提供二进制数据流传输服务(用 VSBuffer 进行了封装),并能够在有新消息到达的时候通知上层。

其接口非常简单:

export interface IMessagePassingProtocol {
	send(buffer: VSBuffer): void;
	onMessage: Event<VSBuffer>;
	/**
	 * Wait for the write buffer (if applicable) to become empty.
	 */
	drain?(): Promise<void>;
}

不同的通讯端有不同的信道,因此 IMessagePassingProtocol 也有多种实现,大致有以下几种:

IPCClient

它用于在客户端管理 IChannel ,它同时实现了 IChannelClientIChannelServer,所以它实际上可以发起也可以响应 IPC:

export interface IChannelClient {
	getChannel<T extends IChannel>(channelName: string): T;
}
 
export interface IChannelServer<TContext = string> {
	registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
}
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
 
	private channelClient: ChannelClient;
	private channelServer: ChannelServer<TContext>;
 
	constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) {
		const writer = new BufferWriter();
		serialize(writer, ctx);
		protocol.send(writer.buffer);
 
		this.channelClient = new ChannelClient(protocol, ipcLogger);
		this.channelServer = new ChannelServer(protocol, ctx, ipcLogger);
	}
 
	getChannel<T extends IChannel>(channelName: string): T {
		return this.channelClient.getChannel(channelName) as T;
	}
 
	registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
		this.channelServer.registerChannel(channelName, channel);
	}
 
	dispose(): void {
		this.channelClient.dispose();
		this.channelServer.dispose();
	}
}

可以看到它仅有一个 IMessagePassingProtocol,换句话说,就是只能跟一方进行通讯,这也是它跟 IPCServer 最大的区别。

IPCServer

它一共实现了三个接口:

export interface IChannelServer<TContext = string> {
	registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
}
 
export interface IRoutingChannelClient<TContext = string> {
	getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T;
}
 
export interface IConnectionHub<TContext> {
	readonly connections: Connection<TContext>[];
	readonly onDidAddConnection: Event<Connection<TContext>>;
	readonly onDidRemoveConnection: Event<Connection<TContext>>;
}

我们来看 IPCServer 的构造方法:

export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
	constructor(onDidClientConnect: Event<ClientConnectionEvent>) {
		onDidClientConnect(({ protocol, onDidClientDisconnect }) => {
			const onFirstMessage = Event.once(protocol.onMessage);
 
			onFirstMessage(msg => {
				const reader = new BufferReader(msg);
				const ctx = deserialize(reader) as TContext;
 
				const channelServer = new ChannelServer(protocol, ctx);
				const channelClient = new ChannelClient(protocol);
 
				this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
 
				const connection: Connection<TContext> = { channelServer, channelClient, ctx };
				this._connections.add(connection);
				this._onDidAddConnection.fire(connection);
 
				onDidClientDisconnect(() => {
					channelServer.dispose();
					channelClient.dispose();
					this._connections.delete(connection);
					this._onDidRemoveConnection.fire(connection);
				});
			});
		});
	}
}

可以看到,在对方发来第一条消息时,IPCServer 会创建:

interface Connection<TContext> extends Client<TContext> {
	readonly channelServer: ChannelServer<TContext>;
	readonly channelClient: ChannelClient;
}
 
export interface Client<TContext> {
	readonly ctx: TContext;
}

注意这里的 ctx 属性,它是客户端的标识符,将用在请求路由的过程中。它的 getChannelChannelClientgetChannel 有很大不同:

	getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
	getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T;
	getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T {
		const that = this;
 
		return {
			call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
				let connectionPromise: Promise<Client<TContext>>;
 
				if (isFunction(routerOrClientFilter)) {
					// when no router is provided, we go random client picking
					let connection = getRandomElement(that.connections.filter(routerOrClientFilter));
 
					connectionPromise = connection
						// if we found a client, let's call on it
						? Promise.resolve(connection)
						// else, let's wait for a client to come along
						: Event.toPromise(Event.filter(that.onDidAddConnection, routerOrClientFilter));
				} else {
					connectionPromise = routerOrClientFilter.routeCall(that, command, arg);
				}
 
				const channelPromise = connectionPromise
					.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
 
				return getDelayedChannel(channelPromise)
					.call(command, arg, cancellationToken);
			},
			listen(event: string, arg: any): Event<T> {
				// ...
			}
		} as T;
	}

可以看到,在调用 getChannel 的时候如果传入了 routerOrClientFilter,则会在 connections 中选择一个。

Routing

选择  Connection 的方法,可以是一个简单的 filter 函数,也可以是通过 IClientRouter 提供的 routeCall 或者 routeEvent 方法。我们以 StaticRouter 为例:

export class StaticRouter<TContext = string> implements IClientRouter<TContext> {
 
	constructor(private fn: (ctx: TContext) => boolean | Promise<boolean>) { }
 
	routeCall(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
		return this.route(hub);
	}
 
	routeEvent(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
		return this.route(hub);
	}
 
	private async route(hub: IConnectionHub<TContext>): Promise<Client<TContext>> {
		for (const connection of hub.connections) {
			if (await Promise.resolve(this.fn(connection.ctx))) {
				return Promise.resolve(connection);
			}
		}
 
		await Event.toPromise(hub.onDidAddConnection);
		return await this.route(hub);
	}
}

实际上在 getChannel 调用他的时候,会通过 fn 来选择一个 IConnectionHub 中的 Connection

到这里,整个基于 channel 的 IPC 机制我们就介绍完毕了。

基于 RpcProtocol 的机制

vscode IPC 的第二种机制基于 RpcProtocol,用于渲染进程和 extension host 进程通讯(如果 vscode 的运行环境是浏览器,那么就是主线程和 extension host web worker 之间进行通讯)。

举个例子,在 host 进程初始化时如果发生了错误,它会告知渲染进程,代码如下:

		const mainThreadExtensions = rpcProtocol.getProxy(MainContext.MainThreadExtensionService);
		const mainThreadErrors = rpcProtocol.getProxy(MainContext.MainThreadErrors);
		errors.setUnexpectedErrorHandler(err => {
			const data = errors.transformErrorForSerialization(err);
			const extension = extensionErrors.get(err);
			if (extension) {
				mainThreadExtensions.$onExtensionRuntimeError(extension.identifier, data);
			} else {
				mainThreadErrors.$onUnexpectedError(data);
			}
		});

在调用 mainThreadExtensionsmainThreadError 的方法的时候,即发生了 IPC。

该机制如下图所示:

下面介绍其原理。

shape

客户端怎么知道 mainThreadExtensions 上有一个 $onExtensionRuntimeError 方法可以调用呢?

显然,这里需要定义一个接口,这个接口就是 MainThreadExtensionServiceShape ,定义在 extHost.protocol.ts 文件中。vscode 对于每一个可以调用的实体,都定义了一个以 Shape 为后缀的接口,服务端的实体必须要实现该接口,这样客户端在编写代码的时候就知道有哪些方法可以调用了。

identifier

客户端如何获取到服务端的实体在本地的代理,也就是 mainThreadExtensions 呢?换个问法,mainThreadExtensions 是如何跟 mainThreadErrors 相区别的呢?

代码中我们可以看到 mainThreadExtensions 是通过 rpcProtocol.getProxy(MainContext.MainThreadExtensionService) 获得的,MainContext.MainThreadExtensionService 在这里就起到了一个标识符的作用,它将每一个实体-代理的对子区别开。

MainContext.MainThreadExtensionService 定义在 extHost.protocol.ts 当中:

export const MainContext = {
  MainThreadExtensionService: createMainId<MainThreadExtensionServiceShape>('MainThreadExtensionService')
}

createMainId 就是用于创建标识符的方法,本质上是创建了一个 ProxyIdentifier 对象并存在到一个数组当中:

export class ProxyIdentifier<T> {
	public static count = 0;
	_proxyIdentifierBrand: void;
 
	public readonly isMain: boolean;
	public readonly sid: string;
	public readonly nid: number;
 
	constructor(isMain: boolean, sid: string) {
		this.isMain = isMain;
		this.sid = sid;
		this.nid = (++ProxyIdentifier.count);
	}
}
 
const identifiers: ProxyIdentifier<any>[] = [];
 
export function createMainContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> {
	const result = new ProxyIdentifier<T>(true, identifier);
	identifiers[result.nid] = result;
	return result;
}
 
export function createExtHostContextProxyIdentifier<T>(identifier: string): ProxyIdentifier<T> {
	const result = new ProxyIdentifier<T>(false, identifier);
	identifiers[result.nid] = result;
	return result;
}

每个标识符有三个字段:

context

我们如何知道另外一个进程中,有哪些实体可以被调用?

extHost.protocol.ts 文件中定义了 MainContext 和 ExtHostContext 两个文件。前者定义了渲染进程中可被调用的实体,后者定义了 host 进程中可被调用的实体。这里也可以看出,在 RpcProtocol 机制下,渲染进程和 host 进程是可以互相调用的。

customer

可被调用的实体是如何注册的?

host 进程调用 mainThreadExtensions 方法的时候,渲染进程必须要有类提供这个方法,而且它还需要注册到这个 RpcProtocol 的机制上。通过查找实现了 MainThreadExtensionServiceShape 的类,不难发现 mainThreadExtensionService.ts 中存在这样一段代码:

@extHostNamedCustomer(MainContext.MainThreadExtensionService)
export class MainThreadExtensionService implements MainThreadExtensionServiceShape {
   // ...
}

注意这里装饰器的调用,我们探究其实现:

export function extHostNamedCustomer<T extends IDisposable>(id: ProxyIdentifier<T>) {
	return function <Services extends BrandedService[]>(ctor: { new(context: IExtHostContext, ...services: Services): T }): void {
		ExtHostCustomersRegistryImpl.INSTANCE.registerNamedCustomer(id, ctor as IExtHostCustomerCtor<T>);
	};
}

可以发现它是将 id,也就是 MainContext.MainThreadExtensionServiceMainThreadExtensionService 绑定起来,而在 extension host 初始化的时候实例化它:

		const namedCustomers = ExtHostCustomersRegistry.getNamedCustomers();
		for (let i = 0, len = namedCustomers.length; i < len; i++) {
			const [id, ctor] = namedCustomers[i];
			const instance = this._instantiationService.createInstance(ctor, extHostContext);
			this._customers.push(instance);
			this._rpcProtocol.set(id, instance);
		}

注册的最后一步就是调用 RpcProtocol.set 方法注册可被调用的实体。

RpcProtocol 的通讯原理

到这里我们基本了解了 RpcProtocol 的接口了,下面来了解一下它的内部逻辑。

首先来看 getProxy,我们知道客户端要通过这个方法获取可调用的代理:

	public getProxy<T>(identifier: ProxyIdentifier<T>): T {
		const { nid: rpcId, sid } = identifier;
		if (!this._proxies[rpcId]) {
			this._proxies[rpcId] = this._createProxy(rpcId, sid);
		}
		return this._proxies[rpcId];
	}
 
	private _createProxy<T>(rpcId: number, debugName: string): T {
		let handler = {
			get: (target: any, name: PropertyKey) => {
				if (typeof name === 'string' && !target[name] && name.charCodeAt(0) === CharCode.DollarSign) {
					target[name] = (...myArgs: any[]) => {
						return this._remoteCall(rpcId, name, myArgs);
					};
				}
				if (name === _RPCProxySymbol) {
					return debugName;
				}
				return target[name];
			}
		};
		return new Proxy(Object.create(null), handler);
	}

可以看到它的核心逻辑就是创建一个 Proxy 对象,当对象上的属性被访问时,所有以 $ 开头的属性都会被包装为一个对 this._remoteCall 进行调用的方法。

_remoteCall 的核心逻辑则主要是下面几行(这里主要省略了取消请求相关的逻辑):

	private _remoteCall(rpcId: number, methodName: string, args: any[]): Promise<any> {
		const serializedRequestArguments = MessageIO.serializeRequestArguments(args, this._uriReplacer);
 
		const req = ++this._lastMessageId;
		const callId = String(req);
		const result = new LazyPromise();
 
		this._pendingRPCReplies[callId] = result;
		this._onWillSendRequest(req);
		const msg = MessageIO.serializeRequest(req, rpcId, methodName, serializedRequestArguments, !!cancellationToken);
 
		this._protocol.send(msg);
		return result;
	}
  
  // MessageIO
  public static serializeRequest(req: number, rpcId: number, method: string, serializedArgs: SerializedRequestArguments, usesCancellationToken: boolean): VSBuffer {
		if (serializedArgs.type === 'mixed') {
			return this._requestMixedArgs(req, rpcId, method, serializedArgs.args, serializedArgs.argsType, usesCancellationToken);
		}
		return this._requestJSONArgs(req, rpcId, method, serializedArgs.args, usesCancellationToken);
	}
 
	private static _requestJSONArgs(req: number, rpcId: number, method: string, args: string, usesCancellationToken: boolean): VSBuffer {
		const methodBuff = VSBuffer.fromString(method);
		const argsBuff = VSBuffer.fromString(args);
 
		let len = 0;
		len += MessageBuffer.sizeUInt8();
		len += MessageBuffer.sizeShortString(methodBuff);
		len += MessageBuffer.sizeLongString(argsBuff);
 
		let result = MessageBuffer.alloc(usesCancellationToken ? MessageType.RequestJSONArgsWithCancellation : MessageType.RequestJSONArgs, req, len);
		result.writeUInt8(rpcId);
		result.writeShortString(methodBuff);
		result.writeLongString(argsBuff);
		return result.buffer;
	}
 
  // MessageBuffer
	public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer {
		let result = new MessageBuffer(VSBuffer.alloc(messageSize + 1 /* type */ + 4 /* req */), 0);
		result.writeUInt8(type);
		result.writeUInt32(req);
		return result;
	}

可以看到一个请求主要有以下这些信息:

  1. type,请求的类型,由一个枚举 MessageType 所定义
  2. req,请求的序号,是一个自增的数字
  3. rpcId,identifier 的字符串 id,表明是哪个实体-代理之间的请求
  4. method,指定要调用实体的哪个方法
  5. argsBuff,序列化的参数

最终这些参数都会被封装为一个 VSBuffer 并通过 protocol 发送,而这些而这里的 protocol,这是我们的老朋友 IMessagePassingProtocol 。 所以我们可以看到 RpcProtocol 机制也是分层的设计,可以在不同的环境中使用。

当服务端接收到一个请求时,会回调 _receiveOneMessage 方法进行处理:

	private _receiveOneMessage(rawmsg: VSBuffer): void {
		if (this._isDisposed) {
			return;
		}
 
		const msgLength = rawmsg.byteLength;
		const buff = MessageBuffer.read(rawmsg, 0);
		const messageType = <MessageType>buff.readUInt8();
		const req = buff.readUInt32();
 
		switch (messageType) {
			case MessageType.RequestJSONArgs:
			case MessageType.RequestJSONArgsWithCancellation: {
				let { rpcId, method, args } = MessageIO.deserializeRequestJSONArgs(buff);
				if (this._uriTransformer) {
					args = transformIncomingURIs(args, this._uriTransformer);
				}
				this._receiveRequest(msgLength, req, rpcId, method, args, (messageType === MessageType.RequestJSONArgsWithCancellation));
				break;
			}
			
			// ...
	}

即根据 type 来调用不同的方法对请求进行处理,这里来看 _receiveRequest 方法:

	private _receiveRequest(msgLength: number, req: number, rpcId: number, method: string, args: any[], usesCancellationToken: boolean): void {
		const callId = String(req);
 
		let promise: Promise<any>;
		let cancel: () => void;
		if (usesCancellationToken) {
      // ...
		} else {
			// cannot be cancelled
			promise = this._invokeHandler(rpcId, method, args);
			cancel = noop;
		}
 
		// Acknowledge the request
		const msg = MessageIO.serializeAcknowledged(req);
		this._protocol.send(msg);
 
		promise.then((r) => {
			delete this._cancelInvokedHandlers[callId];
			const msg = MessageIO.serializeReplyOK(req, r, this._uriReplacer);
			this._protocol.send(msg);
		}, (err) => {
      // ...
		});
	}
  
  private _invokeHandler(rpcId: number, methodName: string, args: any[]): Promise<any> {
		try {
			return Promise.resolve(this._doInvokeHandler(rpcId, methodName, args));
		} catch (err) {
			return Promise.reject(err);
		}
	}
 
	private _doInvokeHandler(rpcId: number, methodName: string, args: any[]): any {
		const actor = this._locals[rpcId];
		if (!actor) {
			throw new Error('Unknown actor ' + getStringIdentifierForProxy(rpcId));
		}
		let method = actor[methodName];
		if (typeof method !== 'function') {
			throw new Error('Unknown method ' + methodName + ' on actor ' + getStringIdentifierForProxy(rpcId));
		}
		return method.apply(actor, args);
	}

核心就是调用 _invokeHandler 然后将结果发送回去。

注意到这一行 const actor = this._locals[rpcId]; 获取了可被调用的实体,记得之前注册实体时调用的 set 方法吗:

	public set<T, R extends T>(identifier: ProxyIdentifier<T>, value: R): R {
		this._locals[identifier.nid] = value;
		return value;
	}

到这里,我们就了解了 RpcProtocol 的原理了。

, CC BY-NC 4.0 © Wenzhao.RSS