Chromium WebSocket 发送处理
最近定位一个 Chrome 浏览器通过 WebSocket 发送大文件给后端,但后端接收处理出错的问题,最终定位原因是后端没有正确处理 Continuation 类型的帧。

design

需要注意的是,如下的设计文档都已过时,仅供参考。

WebSocketStream: integrating streams with the WebSocket API
https://developer.chrome.com/docs/capabilities/web-apis/websocketstream

95068d99d7fcb97579057aaa520dc66b.png

Chromium WebSocket design doc
https://docs.google.com/document/d/1_R6YjCIrm4kikJ3YeapcOU2Keqr3lVUPd-OeaIJ93qQ/edit?pli=1#heading=h.fc87a8abb588

b400c7f5bddd64833930bacb8e98a260.png

WebSocket messages that the script has sent are delivered to net::WebSocketChannel via function calls between modules and IPC made by content::WebSocketChannelDispatcher. Given a message, WebSokcetChannel will build a WebSocket frame, and pass it to WebSocketStream. WebSocketStream knows how to send that frame to the network, which is usually a normal TCP connection but may be a SPDY stream.

WebSocket Protocol Stack in chrome/net
https://docs.google.com/document/d/11n3hpwb9lD9YVqnjX3OwzE_jHgTmKIqd6GvXE9bDGUg/edit#heading=h.wu5anpnlvlfy

arch

// third_party/blink/renderer/modules/websockets/

This directory contains:
- the implementation of
  [the WebSocket API](https://html.spec.whatwg.org/multipage/web-sockets.html).
- the Pepper implementation of the WebSocket API

They use WebSocketChannelImpl to connect to the WebSocket service i.e. the
blink.mojom.WebSocket implementation in content/browser/websockets/.

content

// content/browser/websockts/

This directory contains the network.mojom.WebSocket implementation. It hosts the WebSocket API implementations in Blink over Mojo, and exports the WebSocket protocol implementation in net/websockets/ as a service.

services

// services/network/

this only contains features that go over the network. e.g. no file loading, data URLs etc...

only the lowest-level of networking should be here. e.g. http, sockets, web sockets. Anything that is built on top of this should be in higher layers.

The network service is designed as a [Mojo service](/docs/mojo_and_services.md)
that in general doesn't need to be aware of which thread/process it runs on.
The browser process launches the network service and decides whether to run it
inside the browser process (*in-process*) or in a dedicated utility process
(*out-of-process*).

The out-of-process configuration is preferred for isolation and stability, and
is the default on most platforms. The in-process configuration is the default on
Android because of some unresolved issues; see https://crbug.com/1049008.  It
can also be useful for debugging; for example, it's used in Chromium's
[`--single-process`](https://www.chromium.org/developers/design-documents/process-models)
mode.

Process Model and Site Isolation
https://chromium.googlesource.com/chromium/src/+/main/docs/process_model_and_site_isolation.md

As part of moving the Chrome codebase towards a service model, we aim to split low-level parts of the code into separate Mojo services. This includes services such as UI, file system, and networking.

Network Service in Chrome
https://docs.google.com/document/d/1wAHLw9h7gGuqJNCgG1mP1BmLtCGfZ2pys-PdZQ1vg7M/edit?pref=2&pli=1#

Move Chrome codebase towards a service-oriented model. This will produce reusable and decoupled components while also reducing duplication.

Chrome Service Model
https://docs.google.com/document/d/15I7sQyQo6zsqXVNAlVd520tdGaS8FCicZHrN0yRu-oU/edit#heading=h.p37l9e7o0io5

net

// net/websockets/

This directory contains the implementation of
[the WebSocket protocol](https://tools.ietf.org/html/rfc6455).

code

代码以 Chromium 120.0.6099 版本为例。

connect

// third_party/blink/renderer/modules/websockets/dom_websocket.cc

DOMWebSocket* DOMWebSocket::Create
    DOMWebSocket* websocket = MakeGarbageCollected<DOMWebSocket>(context);
    websocket->Connect

DOMWebSocket::Connect(const String& url)
    channel_ = CreateChannel(GetExecutionContext(), this);
        return WebSocketChannelImpl::Create(context, client)
    common_.Connect(GetExecutionContext(), url, protocols, channel_)

// third_party/blink/renderer/modules/websockets/websocket_channel_impl.cc

WebSocketChannelImpl* WebSocketChannelImpl::Create
    auto* channel = MakeGarbageCollected<WebSocketChannelImpl>(execution_context, client)
    return channel;

// third_party/blink/renderer/modules/websockets/websocket_common.cc

WebSocketCommon::Connect
    channel->Connect(url_, protocol_string)

// third_party/blink/renderer/modules/websockets/websocket_channel_impl.cc

WebSocketChannelImpl::Connect(const KURL& url, const String& protocol)
    mojo::Remote<mojom::blink::WebSocketConnector> connector
    // blink::mojom::blink::WebSocketConnectorProxy::Connect
    connector->Connect(url, protocols)

// gen/third_party/blink/public/mojom/websockets/websocket_connector.mojom-blink.cc

WebSocketConnectorProxy::Connect
    ::mojo::internal::SendMojoMessage(*receiver_, message);

// gen/third_party/blink/public/mojom/websockets/websocket_connector.mojom.cc

blink::mojom::WebSocketConnectorStubDispatch::Accept(WebSocketConnector* impl, mojo::Message* message)
    // content::WebSocketConnectorImpl::Connect
    impl->Connect

// content/browser/websockets/websocket_connector_impl.cc

WebSocketConnectorImpl::Connect
    // network::mojom::NetworkContextProxy::CreateWebSocket
    process->GetStoragePartition()->GetNetworkContext()->CreateWebSocket

// gen/services/network/public/mojom/network_context.mojom.cc

network::mojom::NetworkContextStubDispatch::Accept(NetworkContext* impl, mojo::Message* message)
    // network::NetworkContext::CreateWebSocket
    impl->CreateWebSocket

// services/network/network_context.cc

network::NetworkContext::CreateWebSocket
    websocket_factory_ = std::make_unique<WebSocketFactory>(this);
    websocket_factory_->CreateWebSocket

// serivces/network/websocket_factory.cc

WebSocketFactory::CreateWebSocket
    connections_.insert(std::make_unique<WebSocket>(this, url)

// serivces/network/websocket.cc

WebSocket::WebSocket
    network::WebSocket::AddChannel
        std::unique_ptr<net::WebSocketEventInterface> event_interface(new WebSocketEventHandler(this));
        channel_ = std::make_unique<net::WebSocketChannel>(std::move(event_interface)
        channel_->SendAddChannelRequest

// net/websockets/websocket_channel.cc

WebSocketChannel::SendAddChannelRequest
    SendAddChannelRequestWithSuppliedCallback(socket_url, base::BindOnce(&WebSocketStream::CreateAndConnectStream))
        auto connect_delegate = std::make_unique<ConnectDelegate>(this);
        // WebSocketStream::CreateAndConnectStream
        stream_request_ = std::move(callback).Run(std::move(connect_delegate));

// net/websockets/websocket_stream.cc

WebSocketStream::CreateAndConnectStream
    auto request = std::make_unique<WebSocketStreamRequestImpl>
    request->Start

WebSocketStreamRequestImpl::WebSocketStreamRequestImpl
    // net::WebSocketChannel::ConnectDelegate::OnCreateRequest
    connect_delegate_->OnCreateRequest
        // net::WebSocketChannel::OnCreateURLRequest
        creator_->OnCreateURLRequest(request);
            event_interface_->OnCreateURLRequest(request);

// net/websockets/websocket_stream.cc

Delegate::OnResponseStarted
    // raw_ptr<WebSocketStreamRequestImpl> owner_;
    owner_->PerformUpgrade();

net::WebSocketStreamRequestImpl::PerformUpgrade
    // WebSocketChannel::ConnectDelegate::OnSuccess
    connect_delegate_->OnSuccess
        // net::WebSocketChannel::OnConnectSuccess
        creator_->OnConnectSuccess
            event_interface_->OnAddChannelResponse

// services/network/websocket.cc

network::WebSocket::WebSocketEventHandler::OnAddChannelResponse
    data_pipe_options{kReceiveDataPipeCapacity}; // 131000
    // const raw_ptr<WebSocket> impl_;
    mojo::CreateDataPipe(&data_pipe_options, impl_->writable_, readable)
    mojo::CreateDataPipe(&data_pipe_options, writable, impl_->readable_)

// mojo/public/cpp/system/data_pipe.h
mojo::CreateDataPipe
// mojo/public/c/system/thunks.cc
MojoCreateDataPipe
// mojo/core/core_ipcz.cc
MojoCreateDataPipeIpcz
// mojo/core/ipcz_driver/data_pipe.cc
mojo::core::ipcz_driver::DataPipe::CreatePair

前端页面的 websocket 发送请求会拆分成以 131000 字节为单位的帧进行发送。

send

// third_party/blink/renderer/modules/websockets/dom_websocket.cc

DOMWebSocket::send(const String& message)

// gen/services/network/public/mojom/websocket.mojom.cc

network::mojom::WebSocketStubDispatch::Accept
    // network::WebSocket::SendMessage
    impl->SendMessage

// services/network/websocket.cc

network::WebSocket::SendMessage
    pending_send_data_frames_.emplace(type, data_length, do_not_fragment);
    ReadAndSendFromDataPipe(InterruptionReason::kNone);

network::WebSocket::ReadAndSendFromDataPipe
    while (!pending_send_data_frames_.empty()) {
        DataFrame& data_frame = pending_send_data_frames_.front();
        // 拆分成以 131000 字节为单位的帧进行发送
        ReadAndSendFrameFromDataPipe(&data_frame)
        pending_send_data_frames_.pop();
    }

network::WebSocket::ReadAndSendFrameFromDataPipe
    while (true) {
        // mojo::DataPipeConsumerHandle::BeginReadData
        readable_->BeginReadData(&buffer, &readable_size)

        // net::WebSocketChannel::SendFrame
        channel_->SendFrame

        // mojo::DataPipeConsumerHandle::BeginReadData
        readable_->EndReadData

        if (size_to_send == data_frame->data_length) {
            return true;
        }

        data_frame->type = mojom::WebSocketMessageType::CONTINUATION;
        data_frame->data_length -= size_to_send;
    }

// mojo/public/cpp/system/data_pipe.h
mojo::DataPipeConsumerHandle::BeginReadData
// mojo/public/c/system/thunks.cc
MojoBeginReadData
// mojo/core/core_ipcz.cc
MojoBeginReadDataIpcz
// mojo/core/ipcz_driver/data_pipe.cc
mojo::core::ipcz_driver::DataPipe::BeginReadData

log

❯ ./chrome --incognito --disable-extensions --disable-plugins --single-process --vmodule='*/websocket*/*=10,websocket*=10,web_socket*=10'

connect

ws = new WebSocket('ws://localhost:3000/api/xterm')
[174115:174198:1211/152229.513298:VERBOSE1:dom_websocket.cc(185)] DOMWebSocket 0x1377009d4188 created
[174115:174198:1211/152229.513344:VERBOSE1:dom_websocket.cc(251)] WebSocket 0x1377009d4188 connect() url="ws://localhost:3000/api/xterm"
[174115:174198:1211/152229.513427:VERBOSE1:websocket_channel_impl.cc(272)] WebSocketChannelImpl 0x1377009d4290 Connect()
[174115:174153:1211/152229.513824:VERBOSE3:websocket.cc(610)] WebSocket::AddChannel @0x240804a38280 socket_url="ws://localhost:3000/api/xterm" requested_protocols="" origin="chrome://newtab" site_for_cookies="SiteForCookies: {site=chrome://newtab; schemefully_same=true}" has_storage_access=0
[174115:174153:1211/152229.513845:VERBOSE1:websocket.cc(183)] WebSocketEventHandler created @0x240800c70d80
[174115:174153:1211/152229.514303:VERBOSE3:websocket_endpoint_lock_manager.cc(62)] Locking endpoint [::1]:3000
[174115:174153:1211/152229.514409:VERBOSE3:websocket_endpoint_lock_manager.cc(112)] Registered (LockReleaser*)0x2408047bf6e8 for [::1]:3000
[174115:174153:1211/152229.514725:VERBOSE3:websocket.cc(329)] WebSocketEventHandler::OnStartOpeningHandshake @0x240800c70d80 can_read_raw_cookies =0
[174115:174153:1211/152229.515519:VERBOSE3:websocket_stream.cc(396)] OnResponseStarted (response code 101)
[174115:174153:1211/152229.515551:VERBOSE3:websocket_endpoint_lock_manager.cc(118)] Delaying 10ms before unlocking endpoint [::1]:3000
[174115:174153:1211/152229.515579:VERBOSE3:websocket.cc(206)] WebSocketEventHandler::OnAddChannelResponse @0x240800c70d80 selected_protocol="" extensions=""
[174115:174153:1211/152229.526088:VERBOSE3:websocket_endpoint_lock_manager.cc(139)] Unlocking endpoint [::1]:3000
[174115:174198:1211/152229.528209:VERBOSE1:websocket_channel_impl.cc(555)] WebSocketChannelImpl 0x1377009d4290 OnOpeningHandshakeStarted("ws://localhost:3000/api/xterm")
[174115:174198:1211/152229.528428:VERBOSE1:websocket_channel_impl.cc(584)] WebSocketChannelImpl 0x1377009d4290 OnConnectionEstablished("", "")
[174115:174198:1211/152229.528523:VERBOSE1:dom_websocket.cc(520)] WebSocket 0x1377009d4188 DidConnect()

text data

ws.send('x'.repeat(262003))
[174115:174198:1211/152351.330242:VERBOSE1:dom_websocket.cc(323)] WebSocket 0x1377009d4188 send() Sending String "xxx...") (std::string argument)
[174115:174198:1211/152351.712075:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(131000)
[174115:174153:1211/152351.712086:VERBOSE3:websocket.cc(494)] WebSocket::SendMessage @0x240804a38280 type=TEXT data is 262003 bytes
[174115:174153:1211/152351.712107:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(TEXT, (data_length = 262003))
[174115:174153:1211/152351.712341:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 131003))
[174115:174198:1211/152351.730944:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 262003 => 131003
[174115:174198:1211/152351.731032:VERBOSE2:websocket_channel_impl.cc(1130)] WebSocketChannelImpl 0x1377009d4290 OnWritable mojo_result=0
[174115:174198:1211/152351.731076:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(131000)
[174115:174153:1211/152351.731087:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 131003))
[174115:174153:1211/152351.731263:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 3))
[174115:174198:1211/152351.744237:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 131003 => 3
[174115:174198:1211/152351.744319:VERBOSE2:websocket_channel_impl.cc(1130)] WebSocketChannelImpl 0x1377009d4290 OnWritable mojo_result=0
[174115:174198:1211/152351.744342:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(3)
[174115:174153:1211/152351.744376:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 3))
[174115:174198:1211/152351.749214:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 3 => 0

binary data

ws.send(new TextEncoder().encode('x'.repeat(262003)))
[174115:174198:1211/152444.831129:VERBOSE1:dom_websocket.cc(367)] WebSocket 0x1377009d4188 send() Sending ArrayBufferView 0x137700f2eaf8
[174115:174198:1211/152444.831188:VERBOSE1:websocket_channel_impl.cc(439)] WebSocketChannelImpl 0x1377009d4290 Send(0x10ec00088000, 0, 262003) (DOMArrayBuffer argument)
[174115:174198:1211/152444.831819:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(131000)
[174115:174153:1211/152444.831869:VERBOSE3:websocket.cc(494)] WebSocket::SendMessage @0x240804a38280 type={BINARY, LAST} data is 262003 bytes
[174115:174153:1211/152444.831888:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=({BINARY, LAST}, (data_length = 262003))
[174115:174153:1211/152444.832133:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 131003))
[174115:174153:1211/152444.832151:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 131003))
[174115:174198:1211/152444.852354:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 262003 => 131003
[174115:174198:1211/152444.852437:VERBOSE2:websocket_channel_impl.cc(1130)] WebSocketChannelImpl 0x1377009d4290 OnWritable mojo_result=0
[174115:174198:1211/152444.852471:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(131000)
[174115:174153:1211/152444.852495:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 131003))
[174115:174153:1211/152444.852718:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 3))
[174115:174153:1211/152444.852745:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 3))
[174115:174198:1211/152444.856171:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 131003 => 3
[174115:174198:1211/152444.856221:VERBOSE2:websocket_channel_impl.cc(1130)] WebSocketChannelImpl 0x1377009d4290 OnWritable mojo_result=0
[174115:174198:1211/152444.856238:VERBOSE1:dom_websocket.cc(588)] WebSocket 0x1377009d4188 DidConsumeBufferedAmount(3)
[174115:174153:1211/152444.856255:VERBOSE2:websocket.cc(755)]  ConsumePendingDataFrame frame=(CONTINUATION, (data_length = 3))
[174115:174198:1211/152444.856294:VERBOSE1:dom_websocket.cc(307)] WebSocket 0x1377009d4188 reflectBufferedAmountConsumption() 3 => 0

pcap

text

131000

ac6e0f9e057ac8f157ebb7a8d4c39fbb.png

131003

0a492b24573cae3fd498fa5df0571d5e.png

binary

131000

8f89b1f7c806f8c075da75baadb4f3b6.png

131003

be0c67f2ceb97257f7e4fe111a1a3558.png

2abf931fb8c893f7489f38abe3a59c6e.png

appendix

V8 doesn’t actually have an implementation of WebSockets, this is actually in WebKit / Chromium.

WebSockets in V8
https://groups.google.com/g/nodejs/c/hdHED5hR7lI


最后修改于 2023-12-13