一分钟快速搭建 rtmpd 服务器: https://blog.csdn.net/freeabc/article/details/102880984
软件下载地址: http://www.qiyicc.com/download/rtmpd.rar
github 地址:https://github.com/superconvert/smart_rtmpd
-----------------------------------------------------------------------------------------------------------------------------------------
webrtc 中有关 socket 运行机制以及 stun 收发过程 及 Candidates 生成流程分析
我写文章一般是两个思路:
1. 下一步要调用什么对象的方法
2. 这一步的对象,怎么关联到下一步的对象的流程分析
这一步的流程主要阐述怎么关联下一步的对象的流程分析,当然这一步做了什么具体的工作,不能
详细展示,否则,太庞大了,需要各位朋友针对重点的部分,自己揣摩了。
//*******************************************************************************************
//
// webrtc 内部很多创建 socket 的地方,这个需要调用类厂 BasicPacketSocketFactory , 下面
// 这一小段就是分析 BasicPacketSocketFactory 的创建,以及内部管理的 socket 的部分流程
//
//*******************************************************************************************
AsyncPacketSocket* BasicPacketSocketFactory::CreateUdpSocket(const SocketAddress& address, uint16_t min_port, uint16_t max_port) { // 参见下面的 SocketDispatcher AsyncSocket* socket = socket_factory()->CreateAsyncSocket(address.family(), SOCK_DGRAM); if (!socket) { return NULL; } // 这个 BindSocket 最终会调用系统的 bind if (BindSocket(socket, address, min_port, max_port) < 0) { RTC_LOG(LS_ERROR) << "UDP bind failed with error " << socket->GetError(); delete socket; return NULL; } // 这个里面绑定了读和写事件到 AsyncUDPSocket::OnReadEvent , AsyncUDPSocket::OnWriteEvent return new AsyncUDPSocket(socket);}
1. 创建 BasicPacketSocketFactory
./pc/peer_connection_factory.cc
BasicPacketSocketFactory 是 PeerConnectionFactory::Initialize() 中创建的
default_socket_factory_.reset(new rtc::BasicPacketSocketFactory(network_thread_));
2.
./sdk/android/src/jni/pc/peer_connection_factory.cc
而 network_thread_ 则是 接口 CreatePeerConnectionFactoryForJava 里的
std::unique_ptr<rtc::Thread> network_thread = rtc::Thread::CreateWithSocketServer();
其实就是这个
std::unique_ptr<Thread> Thread::CreateWithSocketServer() {return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));}
其实就是创建了 PhysicalSocketServer
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {#if defined(__native_client__) return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); #else return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); #endif}
Thread 继承于 class Thread : public MessageQueue, public webrtc::TaskQueueBase ,我们看出 Thread 拥有消息队列的对象
构造函数 Thread(SocketServer* ss)把 ss 赋值给基类 MessageQueue,基类接口通过 socketserver 返回这个对象,后续只要
调用接口 socketserver 的,就是返回这个 socket 对象。
SocketServer* MessageQueue::socketserver() {return ss_;}
上面的 BasicPacketSocketFactory::CreateUdpSocket 里的,这句话 socket_factory()->CreateAsyncSocket 其实就是调用
./rtc_base/physical_socket_server.cc
AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { SocketDispatcher* dispatcher = new SocketDispatcher(this); // 这个里面通过 PhysicalSocket::Create 创建一个套接字 if (dispatcher->Create(family, type)) {return dispatcher; } else { delete dispatcher; return nullptr; }}
//******************************************************************************
//
// 下面这段是讲述 socket 怎么接收数据的,和上述流程没任何关系
//
//******************************************************************************
上述流程中,有一个这个函数调用,
std::unique_ptr<Thread> Thread::CreateWithSocketServer() {return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));}
创建一个带线程的 socket 这个线程的 Run 如下:
void Thread::Run() {ProcessMessages(kForever);}
// 这个里面不断的 Get 最新的 message 进行处理
bool Thread::ProcessMessages(int cmsLoop) {// Using ProcessMessages with a custom clock for testing and a time greater // than 0 doesn't work, since it's not guaranteed to advance the custom // clock's time, and may get stuck in an infinite loop. RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || cmsLoop == kForever); int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); int cmsNext = cmsLoop; while (true) { #if defined(WEBRTC_MAC) ScopedAutoReleasePool pool; #endif Message msg; if (!Get(&msg, cmsNext)) return !IsQuitting(); Dispatch(&msg); if (cmsLoop != kForever) { cmsNext = static_cast<int>(TimeUntil(msEnd)); if (cmsNext < 0) return true; } }}
// 其实就是基类的 MessageQueue 的接口
bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io)// 看到这个 ss_ 了吗,就是 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
这个地方监听所有的 socket 操作,三个版本的都有 win, linux,随便找一个分析
./rtc_base/physical_socket_server.cc
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io)return WaitEpoll(cmsWait, signal_wakeup_);bool PhysicalSocketServer::WaitEpoll(int cmsWait)ProcessEvents(pdispatcher, readable, writable, check_error);static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable, bool check_error) // 这里就是 SocketDispatcher - dispatcher->OnEvent(ff, errcode);void SocketDispatcher::OnEvent(uint32_t ff, int err) // 如果是读,这里假设是 UDP SignalReadEvent(this);
./rtc_base/async_udp_socket.cc
void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket)SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr, (timestamp > -1 ? timestamp : TimeMicros()));
./p2p/base/stun_port.cc
void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,const char* data, size_t size, const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us) { RTC_DCHECK(socket == socket_); RTC_DCHECK(!remote_addr.IsUnresolvedIP()); // Look for a response from the STUN server. // Even if the response doesn't match one of our outstanding requests, we // will eat it because it might be a response to a retransmitted packet, and // we already cleared the request when we got the first response. if (server_addresses_.find(remote_addr) != server_addresses_.end()) { // 这是 stun 阶段接收包 requests_.CheckResponse(data, size); return; } // 这是建立链接后接收包,参考 webrtc 的视频数据接收过程 if (Connection* conn = GetConnection(remote_addr)) { conn->OnReadPacket(data, size, packet_time_us); } else { Port::OnReadPacket(data, size, remote_addr, PROTO_UDP); }}
//******************************************************************************
//
// 下面就分析了有关 webrtc stun 流程的部分
//
//******************************************************************************
1. 从这里开始分析,这个的调用参考 createPeerConnection 流程
JsepTransportController::MaybeStartGathering
2. 这个 ice_transport 就是 P2PTransportChannel 对象
dtls->ice_transport()->MaybeStartGathering();
3. 第一次创建流程
./p2p/base/p2p_transport_channel.cc
P2PTransportChannel::MaybeStartGathering //------------------------------------------------------------ // 这个就是创建一个 PortAllocatorSession 并把信号挂接 P2PTransportChannel //------------------------------------------------------------ AddAllocatorSession(allocator_->CreateSession( transport_name(), component(), ice_parameters_.ufrag, ice_parameters_.pwd)); // 进行 PortAllocatorSession 接口的调用 allocator_sessions_.back()->StartGettingPorts();
3.1
这个 allocator_ 来自下面的函数调用,我们看出 就是 JsepTransportController 的成员 port_allocator_
./pc/jsep_transport_controller.cc
rtc::scoped_refptr<webrtc::IceTransportInterface>JsepTransportController::CreateIceTransport(const std::string& transport_name, bool rtcp) {int component = rtcp ? cricket::ICE_CANDIDATE_COMPONENT_RTCP :cricket::ICE_CANDIDATE_COMPONENT_RTP; IceTransportInit init; init.set_port_allocator(port_allocator_); init.set_async_resolver_factory(async_resolver_factory_); init.set_event_log(config_.event_log); return config_.ice_transport_factory->CreateIceTransport(transport_name, component, std::move(init));}
./api/ice_transport_factory.cc
rtc::scoped_refptr<IceTransportInterface> CreateIceTransport(IceTransportInit init) {return new rtc::RefCountedObject<IceTransportWithTransportChannel>( std::make_unique<cricket::P2PTransportChannel>( "", 0, init.port_allocator(), init.async_resolver_factory(), init.event_log()));}
3.2
我们跟踪一下 port_allocator_ 是在 JsepTransportController 初始化过程中传递过来的,我们分析 JsepTransportController
初始化,发现其实就是来自 PeerConnection 的 port_allocator_ 对象
./pc/peer_connection.cc
bool PeerConnection::Initialize(const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies) // 传递过来的。。。。。。 port_allocator_ = std::move(dependencies.allocator); ... ... // 赋值给 JsepTransportController transport_controller_.reset(new JsepTransportController( signaling_thread(), network_thread(), port_allocator_.get(), async_resolver_factory_.get(), config));
3.3
我们分析 PeerConnection 的初始化过程中, port_allocator_ 的产生过程
./pc/peer_connection_factory.cc
rtc::scoped_refptr<PeerConnectionInterface>PeerConnectionFactory::CreatePeerConnection(const PeerConnectionInterface::RTCConfiguration& configuration, PeerConnectionDependencies dependencies) if (!dependencies.allocator) { rtc::PacketSocketFactory* packet_socket_factory; if (dependencies.packet_socket_factory) packet_socket_factory = dependencies.packet_socket_factory.get(); else // 这个就是 BasicPacketSocketFactory 参见上面的分析 packet_socket_factory = default_socket_factory_.get(); network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &configuration, &dependencies, &packet_socket_factory]() { //------------------------------------------------------ // 这个就是我们要追踪的 port_allocator_ !!!!!!!!!!!! //------------------------------------------------------ dependencies.allocator = std::make_unique<cricket::BasicPortAllocator>( default_network_manager_.get(), packet_socket_factory, configuration.turn_customizer); }); } rtc::scoped_refptr<PeerConnection> pc(new rtc::RefCountedObject<PeerConnection>(this, std::move(event_log), std::move(call))); ActionsBeforeInitializeForTesting(pc); if (!pc->Initialize(configuration, std::move(dependencies))) { return nullptr; }
上述函数被下面这个调用,我们发现这个里面 dependencies.allocator 为空,因此 port_allocator_ 是在上面的步骤中分配的
./sdk/android/src/jni/pc/peer_connection_factory.cc
static jlong JNI_PeerConnectionFactory_CreatePeerConnection(JNIEnv* jni, jlong factory, const JavaParamRef<jobject>& j_rtc_config, const JavaParamRef<jobject>& j_constraints, jlong observer_p, const JavaParamRef<jobject>& j_sslCertificateVerifier) PeerConnectionDependencies peer_connection_dependencies(observer.get()); if (!j_sslCertificateVerifier.is_null()) { peer_connection_dependencies.tls_cert_verifier = std::make_unique<SSLCertificateVerifierWrapper>( jni, j_sslCertificateVerifier); } rtc::scoped_refptr<PeerConnectionInterface> pc = PeerConnectionFactoryFromJava(factory)->CreatePeerConnection(rtc_config, std::move(peer_connection_dependencies));
3.4 我们继续分析 BasicPortAllocator 的接口 CreateSession
./p2p/base/port_allocator.cc
std::unique_ptr<PortAllocatorSession> PortAllocator::CreateSession(const std::string& content_name, int component, const std::string& ice_ufrag, const std::string& ice_pwd) { CheckRunOnValidThreadAndInitialized(); auto session = std::unique_ptr<PortAllocatorSession>( CreateSessionInternal(content_name, component, ice_ufrag, ice_pwd)); session->SetCandidateFilter(candidate_filter()); return session;}
./p2p/client/basic_port_allocator.cc
PortAllocatorSession* BasicPortAllocator::CreateSessionInternal(const std::string& content_name,int component, const std::string& ice_ufrag, const std::string& ice_pwd) { CheckRunOnValidThreadAndInitialized(); PortAllocatorSession* session = new BasicPortAllocatorSession(this, content_name, component, ice_ufrag, ice_pwd); session->SignalIceRegathering.connect(this, &BasicPortAllocator::OnIceRegathering); return session;}
4.
./p2p/client/basic_port_allocator.cc
void BasicPortAllocatorSession::StartGettingPorts() { RTC_DCHECK_RUN_ON(network_thread_); state_ = SessionState::GATHERING; if (!socket_factory_) { owned_socket_factory_.reset( new rtc::BasicPacketSocketFactory(network_thread_)); socket_factory_ = owned_socket_factory_.get(); } network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START); RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy " << turn_port_prune_policy_;}
5.
void BasicPortAllocatorSession::OnMessage(rtc::Message* message) { switch (message->message_id) { case MSG_CONFIG_START: GetPortConfigurations(); break; case MSG_CONFIG_READY: OnConfigReady(static_cast<PortConfiguration*>(message->pdata)); break; case MSG_ALLOCATE: OnAllocate(); break; case MSG_SEQUENCEOBJECTS_CREATED: OnAllocationSequenceObjectsCreated(); break; case MSG_CONFIG_STOP: OnConfigStop(); break; default: RTC_NOTREACHED(); }}
void BasicPortAllocatorSession::GetPortConfigurations() { RTC_DCHECK_RUN_ON(network_thread_); PortConfiguration* config = new PortConfiguration(allocator_->stun_servers(), username(), password()); for (const RelayServerConfig& turn_server : allocator_->turn_servers()) { config->AddRelay(turn_server); } ConfigReady(config);}
void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) { RTC_DCHECK_RUN_ON(network_thread_); network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config);}
6.
void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) { RTC_DCHECK_RUN_ON(network_thread_); if (config) { configs_.push_back(config); } AllocatePorts();}
void BasicPortAllocatorSession::AllocatePorts() { RTC_DCHECK_RUN_ON(network_thread_); network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE);}
7.
void BasicPortAllocatorSession::OnAllocate() { RTC_DCHECK_RUN_ON(network_thread_); if (network_manager_started_ && !IsStopped()) { bool disable_equivalent_phases = true; DoAllocate(disable_equivalent_phases); } allocation_started_ = true;}
void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent) AllocationSequence* sequence = new AllocationSequence(this, networks[i], config, sequence_flags); sequence->SignalPortAllocationComplete.connect(this, &BasicPortAllocatorSession::OnPortAllocationComplete); sequence->Init(); sequence->Start(); sequences_.push_back(sequence); done_signal_needed = true; network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED);
7.1 sequence->Init() 这个里面创建了一个 UDP 套接字,并绑定读取接口
./p2p/client/basic_port_allocator.cc
void AllocationSequence::Init() { if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) { udp_socket_.reset(session_->socket_factory()->CreateUdpSocket( rtc::SocketAddress(network_->GetBestIP(), 0), session_->allocator()->min_port(), session_->allocator()->max_port())); if (udp_socket_) { udp_socket_->SignalReadPacket.connect(this, &AllocationSequence::OnReadPacket); } // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP // are next available options to setup a communication channel. } }
7.2 sequence->Start()
session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
7.3
void AllocationSequence::OnMessage(rtc::Message* msg) { const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"}; // Perform all of the phases in the current step. RTC_LOG(LS_INFO) << network_->ToString() << ": Allocation Phase=" << PHASE_NAMES[phase_]; switch (phase_) { case PHASE_UDP: CreateUDPPorts(); CreateStunPorts(); break; case PHASE_RELAY: CreateRelayPorts(); break; case PHASE_TCP: CreateTCPPorts(); state_ = kCompleted; break; default: RTC_NOTREACHED(); } if (state() == kRunning) { ++phase_; session_->network_thread()->PostDelayed(RTC_FROM_HERE, session_->allocator()->step_delay(), this, MSG_ALLOCATION_PHASE); } else { // If all phases in AllocationSequence are completed, no allocation // steps needed further. Canceling pending signal. session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE); SignalPortAllocationComplete(this); } }
7.4
void AllocationSequence::CreateUDPPorts() // 把上述创建的 udp_socket_ port = UDPPort::Create( session_->network_thread(), session_->socket_factory(), network_, session_->allocator()->min_port(), session_->allocator()->max_port(), session_->username(), session_->password(), session_->allocator()->origin(), emit_local_candidate_for_anyaddress, session_->allocator()->stun_candidate_keepalive_interval()); //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ // 参见下面的 AddAllocatedPort 主要是 OnCandidateReady, OnPortComplete //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ session_->AddAllocatedPort(port.release(), this, true);
7.4.1
./p2p/base/stun_port.h
static std::unique_ptr<UDPPort> Create( rtc::Thread* thread, rtc::PacketSocketFactory* factory, rtc::Network* network, rtc::AsyncPacketSocket* socket, const std::string& username, const std::string& password, const std::string& origin, bool emit_local_for_anyaddress, absl::optional<int> stun_keepalive_interval) { // Using `new` to access a non-public constructor. auto port = absl::WrapUnique(new UDPPort(thread, factory, network, socket, username, password, origin, emit_local_for_anyaddress)); port->set_stun_keepalive_delay(stun_keepalive_interval); if (!port->Init()) { return nullptr; } return port; }
7.4.2
bool UDPPort::Init() stun_keepalive_lifetime_ = GetStunKeepaliveLifetime(); if (!SharedSocket()) { RTC_DCHECK(socket_ == nullptr); //--------------------------------------------------------------------------------- // 这里的 socket_factory 其实就是上面的 BasicPacketSocketFactory 的接口,创建 socket 并绑定 //--------------------------------------------------------------------------------- socket_ = socket_factory()->CreateUdpSocket( rtc::SocketAddress(Network()->GetBestIP(), 0), min_port(), max_port()); if (!socket_) { RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed"; return false; } socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacket); } socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket); socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend); socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady); requests_.SignalSendPacket.connect(this, &UDPPort::OnSendPacket);
到这一步,我们看到 socket 接收数据 pipeline ( 参见下面流程 7.7.1 --> 7.7.7 ) 已经建立
7.5
void BasicPortAllocatorSession::AddAllocatedPort(Port* port, AllocationSequence* seq, bool prepare_address) { if (!port) return; port->set_content_name(content_name()); port->set_component(component()); port->set_generation(generation()); if (allocator_->proxy().type != rtc::PROXY_NONE) port->set_proxy(allocator_->user_agent(), allocator_->proxy()); port->set_send_retransmit_count_attribute( (flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0); PortData data(port, seq); ports_.push_back(data); port->SignalCandidateReady.connect( this, &BasicPortAllocatorSession::OnCandidateReady); port->SignalCandidateError.connect( this, &BasicPortAllocatorSession::OnCandidateError); port->SignalPortComplete.connect(this, &BasicPortAllocatorSession::OnPortComplete); port->SignalDestroyed.connect(this, &BasicPortAllocatorSession::OnPortDestroyed); port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError); RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator"; if (prepare_address) port->PrepareAddress(); }
7.6
void UDPPort::PrepareAddress() { if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) { OnLocalAddressReady(socket_, socket_->GetLocalAddress()); } } void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket, const rtc::SocketAddress& address) { // When adapter enumeration is disabled and binding to the any address, the // default local address will be issued as a candidate instead if // |emit_local_for_anyaddress| is true. This is to allow connectivity for // applications which absolutely requires a HOST candidate. rtc::SocketAddress addr = address; // If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at // least the port is listening. MaybeSetDefaultLocalAddress(&addr); AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "", LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false); MaybePrepareStunCandidate(); } void UDPPort::MaybePrepareStunCandidate() { // Sending binding request to the STUN server if address is available to // prepare STUN candidate. if (!server_addresses_.empty()) { SendStunBindingRequests(); } else { // Port is done allocating candidates. MaybeSetPortCompleteOrError(); } }
7.7 这个地方发送 stun 的绑定命令到 stun 服务器
void UDPPort::SendStunBindingRequests() { // We will keep pinging the stun server to make sure our NAT pin-hole stays // open until the deadline (specified in SendStunBindingRequest). RTC_DCHECK(requests_.empty()); for (ServerAddresses::const_iterator it = server_addresses_.begin(); it != server_addresses_.end(); ++it) { SendStunBindingRequest(*it); } } void UDPPort::SendStunBindingRequest(const rtc::SocketAddress& stun_addr) { if (stun_addr.IsUnresolvedIP()) { ResolveStunAddress(stun_addr); } else if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) { // Check if |server_addr_| is compatible with the port's ip. if (IsCompatibleAddress(stun_addr)) { // 发送 stun 绑定命令 requests_.Send(new StunBindingRequest(this, stun_addr, rtc::TimeMillis())); } else { // Since we can't send stun messages to the server, we should mark this // port ready. const char* reason = "STUN server address is incompatible."; RTC_LOG(LS_WARNING) << reason; OnStunBindingOrResolveRequestFailed(stun_addr, SERVER_NOT_REACHABLE_ERROR, reason); } } }
//------------------------------------------------------------------------------------------------------------------------------------------
// stun 服务器响应流程分析 ,顺便也分析了 socket 接收数据的一整个流程
// MessageQueue::Get 这个流程上面已经分析过了, 就是初始化启动的一个线程 network_thread_
//------------------------------------------------------------------------------------------------------------------------------------------
7.7.1 --- 这个 ss_ 就是上面的 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口
bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
7.7.2 --- 就以 linux 下的 epoll 网络模型进行分析吧,大体差不多
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) return WaitEpoll(cmsWait);
7.7.3
bool PhysicalSocketServer::WaitEpoll(int cmsWait) ProcessEvents(pdispatcher, readable, writable, check_error);
7.7.4
static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable, bool check_error) dispatcher->OnEvent(ff, errcode);
7.7.5 --- 这个绑定关系参见开篇的 new AsyncUDPSocket(socket)
./rtc_base/physical_socket_server.cc
void SocketDispatcher::OnEvent(uint32_t ff, int err) SignalReadEvent(this);
7.7.6 --- 这个的绑定关系参见上面的 7.4.2 UDPPort::Init()
./rtc_base/async_udp_socket.cc
void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket) SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr, (timestamp > -1 ? timestamp : TimeMicros()));
7.7.7 --- requests_ 其实就是对象 StunRequestManager
./p2p/base/stun_port.cc
void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket, const char* data, size_t size, const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us) requests_.CheckResponse(data, size);
7.7.8 --- 对应上面的 requests_.Send stun 的绑定命令
./p2p/base/stun_request.cc
bool StunRequestManager::CheckResponse(StunMessage* msg) RequestMap::iterator iter = requests_.find(msg->transaction_id()); StunRequest* request = iter->second; request->OnResponse(msg);
7.7.9
./p2p/base/stun_port.cc
void StunBindingRequest::OnResponse(StunMessage* response) port_->OnStunBindingRequestSucceeded(this->Elapsed(), server_addr_, addr);
7.8 --- stun 成功,则进入
./p2p/base/stun_port.cc
void UDPPort::OnStunBindingRequestSucceeded(int rtt_ms, const rtc::SocketAddress& stun_server_addr, const rtc::SocketAddress& stun_reflected_addr) AddAddress(stun_reflected_addr, socket_->GetLocalAddress(), related_address, UDP_PROTOCOL_NAME, "", "", STUN_PORT_TYPE, ICE_TYPE_PREFERENCE_SRFLX, 0, url.str(), false);
7.9
void Port::AddAddress(const rtc::SocketAddress& address, const rtc::SocketAddress& base_address, const rtc::SocketAddress& related_address, const std::string& protocol, const std::string& relay_protocol, const std::string& tcptype, const std::string& type, uint32_t type_preference, uint32_t relay_preference, const std::string& url, bool is_final) FinishAddingAddress(c, is_final);
7.10 --- SignalCandidateReady 的绑定关系见 7.5
void Port::FinishAddingAddress(const Candidate& c, bool is_final) { candidates_.push_back(c); SignalCandidateReady(this, c); PostAddAddress(is_final); }
7.11
./p2p/client/basic_port_allocator.cc
void BasicPortAllocatorSession::OnCandidateReady(Port* port, const Candidate& c) SignalPortReady(this, port); ---> ./p2p/base/p2p_transport_channel.cc:168: session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady); void P2PTransportChannel::OnPortReady(PortAllocatorSession* session, PortInterface* port) CreateConnection(port, *iter, iter->origin_port()); bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, PortInterface* origin_port) if (CreateConnection(origin_port, remote_candidate, origin_port)) bool P2PTransportChannel::CreateConnection(PortInterface* port, const Candidate& remote_candidate, PortInterface* origin_port) // 针对这个 port 创建一个 Connection,后续的数据接收都是通过这个进行了,表明 stun 已经成功 Connection* connection = port->CreateConnection(remote_candidate, origin); AddConnection(connection); void P2PTransportChannel::AddConnection(Connection* connection) connection->SignalReadPacket.connect(this, &P2PTransportChannel::OnReadPacket); SignalCandidatesReady(this, candidates);
7.12
./p2p/base/p2p_transport_channel.cc
void P2PTransportChannel::OnCandidatesReady( PortAllocatorSession* session, const std::vector<Candidate>& candidates) { RTC_DCHECK_RUN_ON(network_thread_); for (size_t i = 0; i < candidates.size(); ++i) { SignalCandidateGathered(this, candidates[i]); } }
7.13
./pc/jsep_transport_controller.cc
void JsepTransportController::OnTransportCandidateGathered_n( cricket::IceTransportInternal* transport, const cricket::Candidate& candidate) { RTC_DCHECK(network_thread_->IsCurrent()); // We should never signal peer-reflexive candidates. if (candidate.type() == cricket::PRFLX_PORT_TYPE) { RTC_NOTREACHED(); return; } std::string transport_name = transport->transport_name(); invoker_.AsyncInvoke<void>( RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] { SignalIceCandidatesGathered(transport_name, {candidate}); }); }
7.14
./pc/peer_connection.cc
void PeerConnection::OnTransportControllerCandidatesGathered( const std::string& transport_name, const cricket::Candidates& candidates) { int sdp_mline_index; if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) { RTC_LOG(LS_ERROR) << "OnTransportControllerCandidatesGathered: content name " << transport_name << " not found"; return; } for (cricket::Candidates::const_iterator citer = candidates.begin(); citer != candidates.end(); ++citer) { // Use transport_name as the candidate media id. std::unique_ptr<JsepIceCandidate> candidate( new JsepIceCandidate(transport_name, sdp_mline_index, *citer)); if (local_description()) { mutable_local_description()->AddCandidate(candidate.get()); } OnIceCandidate(std::move(candidate)); } } void PeerConnection::OnIceCandidate( std::unique_ptr<IceCandidateInterface> candidate) { if (IsClosed()) { return; } ReportIceCandidateCollected(candidate->candidate()); // 这个地方回调到 Java 层的接口,并把自己的 candidate 发送给对方 Observer()->OnIceCandidate(candidate.get()); }
8.
void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() { RTC_DCHECK_RUN_ON(network_thread_); allocation_sequences_created_ = true; // Send candidate allocation complete signal if we have no sequences. MaybeSignalCandidatesAllocationDone();}
./p2p/base/p2p_transport_channel.cc
void P2PTransportChannel::OnCandidatesAllocationDone( PortAllocatorSession* session) { RTC_DCHECK_RUN_ON(network_thread_); if (config_.gather_continually()) { RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name() << ", component " << component() << " gathering complete, but using continual " "gathering so not changing gathering state."; return; } gathering_state_ = kIceGatheringComplete; RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name() << ", component " << component() << " gathering complete"; SignalGatheringState(this);}
./pc/jsep_transport_controller.cc
void JsepTransportController::OnTransportGatheringState_n( cricket::IceTransportInternal* transport) { RTC_DCHECK(network_thread_->IsCurrent()); UpdateAggregateStates_n();}