webrtc中 socket运行机制以及 stun 收发过程及 Candidates生成流程分析

2020-12-17 09:04:17

参考地址 webrtc 中有关 socket 运行机制以及 stun 收发过程 及 Candidates 生成流程分析

一分钟快速搭建 rtmpd 服务器: https://blog.csdn.net/freeabc/article/details/102880984

软件下载地址: http://www.qiyicc.com/download/rtmpd.rar

github 地址:https://github.com/superconvert/smart_rtmpd

-----------------------------------------------------------------------------------------------------------------------------------------

webrtc 中有关 socket 运行机制以及 stun 收发过程 及 Candidates 生成流程分析

我写文章一般是两个思路:
1. 下一步要调用什么对象的方法
2.  这一步的对象,怎么关联到下一步的对象的流程分析
这一步的流程主要阐述怎么关联下一步的对象的流程分析,当然这一步做了什么具体的工作,不能
详细展示,否则,太庞大了,需要各位朋友针对重点的部分,自己揣摩了。

//*******************************************************************************************
//
// webrtc 内部很多创建 socket 的地方,这个需要调用类厂 BasicPacketSocketFactory , 下面
// 这一小段就是分析 BasicPacketSocketFactory 的创建,以及内部管理的 socket 的部分流程
//
//*******************************************************************************************

AsyncPacketSocket* BasicPacketSocketFactory::CreateUdpSocket(const SocketAddress& address,    uint16_t min_port,    uint16_t max_port) {   // 参见下面的 SocketDispatcher   AsyncSocket* socket =   socket_factory()->CreateAsyncSocket(address.family(), SOCK_DGRAM);    if (!socket) {        return NULL;    }    // 这个 BindSocket 最终会调用系统的 bind    if (BindSocket(socket, address, min_port, max_port) < 0) {        RTC_LOG(LS_ERROR) << "UDP bind failed with error " << socket->GetError();        delete socket;        return NULL;    }    // 这个里面绑定了读和写事件到 AsyncUDPSocket::OnReadEvent , AsyncUDPSocket::OnWriteEvent    return new AsyncUDPSocket(socket);}

    1. 创建 BasicPacketSocketFactory
    ./pc/peer_connection_factory.cc
    BasicPacketSocketFactory 是 PeerConnectionFactory::Initialize() 中创建的

default_socket_factory_.reset(new rtc::BasicPacketSocketFactory(network_thread_));

    2.
    ./sdk/android/src/jni/pc/peer_connection_factory.cc
    而 network_thread_ 则是 接口 CreatePeerConnectionFactoryForJava 里的
    std::unique_ptr<rtc::Thread> network_thread = rtc::Thread::CreateWithSocketServer();    
    其实就是这个

std::unique_ptr<Thread> Thread::CreateWithSocketServer() {return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));}

    其实就是创建了 PhysicalSocketServer

std::unique_ptr<SocketServer> SocketServer::CreateDefault() {#if defined(__native_client__)        return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);    #else        return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);    #endif}

   Thread 继承于 class Thread : public MessageQueue, public webrtc::TaskQueueBase ,我们看出 Thread 拥有消息队列的对象
    构造函数 Thread(SocketServer* ss)把 ss 赋值给基类 MessageQueue,基类接口通过 socketserver 返回这个对象,后续只要
    调用接口 socketserver 的,就是返回这个 socket 对象。

SocketServer* MessageQueue::socketserver() {return ss_;}

    上面的 BasicPacketSocketFactory::CreateUdpSocket 里的,这句话 socket_factory()->CreateAsyncSocket 其实就是调用 
    ./rtc_base/physical_socket_server.cc

AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {      SocketDispatcher* dispatcher = new SocketDispatcher(this);    // 这个里面通过 PhysicalSocket::Create 创建一个套接字    if (dispatcher->Create(family, type)) {return dispatcher;    } else {        delete dispatcher;        return nullptr;    }}

//******************************************************************************
//
// 下面这段是讲述 socket 怎么接收数据的,和上述流程没任何关系
//
//******************************************************************************

    上述流程中,有一个这个函数调用,

std::unique_ptr<Thread> Thread::CreateWithSocketServer() {return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault()));}

    创建一个带线程的 socket    这个线程的 Run 如下:

void Thread::Run() {ProcessMessages(kForever);}

    // 这个里面不断的 Get 最新的 message 进行处理

bool Thread::ProcessMessages(int cmsLoop) {// Using ProcessMessages with a custom clock for testing and a time greater    // than 0 doesn't work, since it's not guaranteed to advance the custom    // clock's time, and may get stuck in an infinite loop.    RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||        cmsLoop == kForever);    int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);    int cmsNext = cmsLoop;    while (true) {  #if defined(WEBRTC_MAC)      ScopedAutoReleasePool pool;  #endif      Message msg;      if (!Get(&msg, cmsNext))          return !IsQuitting();      Dispatch(&msg);      if (cmsLoop != kForever) {          cmsNext = static_cast<int>(TimeUntil(msEnd));          if (cmsNext < 0)            return true;      }    }}

   // 其实就是基类的 MessageQueue 的接口

bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io)// 看到这个 ss_ 了吗,就是 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口    if (!ss_->Wait(static_cast<int>(cmsNext), process_io))

    这个地方监听所有的 socket 操作,三个版本的都有 win, linux,随便找一个分析
    ./rtc_base/physical_socket_server.cc

bool PhysicalSocketServer::Wait(int cmsWait, bool process_io)return WaitEpoll(cmsWait, signal_wakeup_);bool PhysicalSocketServer::WaitEpoll(int cmsWait)ProcessEvents(pdispatcher, readable, writable, check_error);static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable, bool check_error) // 这里就是 SocketDispatcher -    dispatcher->OnEvent(ff, errcode);void SocketDispatcher::OnEvent(uint32_t ff, int err)    // 如果是读,这里假设是 UDP     SignalReadEvent(this);

    ./rtc_base/async_udp_socket.cc

void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket)SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,        (timestamp > -1 ? timestamp : TimeMicros()));

    ./p2p/base/stun_port.cc 

void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,const char* data,    size_t size,    const rtc::SocketAddress& remote_addr,    const int64_t& packet_time_us) {        RTC_DCHECK(socket == socket_);    RTC_DCHECK(!remote_addr.IsUnresolvedIP());    // Look for a response from the STUN server.    // Even if the response doesn't match one of our outstanding requests, we    // will eat it because it might be a response to a retransmitted packet, and    // we already cleared the request when we got the first response.            if (server_addresses_.find(remote_addr) != server_addresses_.end()) {        // 这是 stun 阶段接收包        requests_.CheckResponse(data, size);        return;    }    // 这是建立链接后接收包,参考 webrtc 的视频数据接收过程    if (Connection* conn = GetConnection(remote_addr)) {        conn->OnReadPacket(data, size, packet_time_us);    } else {        Port::OnReadPacket(data, size, remote_addr, PROTO_UDP);    }}

//******************************************************************************
//
// 下面就分析了有关 webrtc stun 流程的部分
//
//******************************************************************************
    
1. 从这里开始分析,这个的调用参考 createPeerConnection 流程
JsepTransportController::MaybeStartGathering

2. 这个 ice_transport 就是 P2PTransportChannel 对象
dtls->ice_transport()->MaybeStartGathering();

3. 第一次创建流程
./p2p/base/p2p_transport_channel.cc

P2PTransportChannel::MaybeStartGathering     //------------------------------------------------------------    // 这个就是创建一个 PortAllocatorSession 并把信号挂接 P2PTransportChannel    //------------------------------------------------------------    AddAllocatorSession(allocator_->CreateSession(          transport_name(), component(), ice_parameters_.ufrag,          ice_parameters_.pwd));    // 进行 PortAllocatorSession 接口的调用    allocator_sessions_.back()->StartGettingPorts();

 3.1
    这个 allocator_ 来自下面的函数调用,我们看出 就是 JsepTransportController 的成员 port_allocator_
    ./pc/jsep_transport_controller.cc

rtc::scoped_refptr<webrtc::IceTransportInterface>JsepTransportController::CreateIceTransport(const std::string& transport_name, bool rtcp) {int component = rtcp ? cricket::ICE_CANDIDATE_COMPONENT_RTCP :cricket::ICE_CANDIDATE_COMPONENT_RTP;        IceTransportInit init;    init.set_port_allocator(port_allocator_);    init.set_async_resolver_factory(async_resolver_factory_);    init.set_event_log(config_.event_log);    return config_.ice_transport_factory->CreateIceTransport(transport_name, component, std::move(init));}

./api/ice_transport_factory.cc

rtc::scoped_refptr<IceTransportInterface> CreateIceTransport(IceTransportInit init) {return new rtc::RefCountedObject<IceTransportWithTransportChannel>(        std::make_unique<cricket::P2PTransportChannel>(        "", 0, init.port_allocator(), init.async_resolver_factory(), init.event_log()));}

    3.2
    我们跟踪一下 port_allocator_ 是在 JsepTransportController 初始化过程中传递过来的,我们分析 JsepTransportController 
    初始化,发现其实就是来自 PeerConnection 的 port_allocator_ 对象
    
    ./pc/peer_connection.cc

bool PeerConnection::Initialize(const PeerConnectionInterface::RTCConfiguration& configuration,        PeerConnectionDependencies dependencies)            // 传递过来的。。。。。。    port_allocator_ = std::move(dependencies.allocator);    ... ...    // 赋值给 JsepTransportController    transport_controller_.reset(new JsepTransportController(        signaling_thread(), network_thread(), port_allocator_.get(),        async_resolver_factory_.get(), config));

    3.3
    我们分析 PeerConnection 的初始化过程中, port_allocator_ 的产生过程
    ./pc/peer_connection_factory.cc

rtc::scoped_refptr<PeerConnectionInterface>PeerConnectionFactory::CreatePeerConnection(const PeerConnectionInterface::RTCConfiguration& configuration,    PeerConnectionDependencies dependencies)            if (!dependencies.allocator) {        rtc::PacketSocketFactory* packet_socket_factory;        if (dependencies.packet_socket_factory)            packet_socket_factory = dependencies.packet_socket_factory.get();        else            // 这个就是 BasicPacketSocketFactory 参见上面的分析            packet_socket_factory = default_socket_factory_.get();        network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &configuration,            &dependencies,            &packet_socket_factory]() {            //------------------------------------------------------            // 这个就是我们要追踪的 port_allocator_ !!!!!!!!!!!!            //------------------------------------------------------            dependencies.allocator = std::make_unique<cricket::BasicPortAllocator>(                default_network_manager_.get(), packet_socket_factory, configuration.turn_customizer);        });    }            rtc::scoped_refptr<PeerConnection> pc(new rtc::RefCountedObject<PeerConnection>(this, std::move(event_log),        std::move(call)));    ActionsBeforeInitializeForTesting(pc);    if (!pc->Initialize(configuration, std::move(dependencies))) {        return nullptr;    }

    上述函数被下面这个调用,我们发现这个里面 dependencies.allocator 为空,因此 port_allocator_ 是在上面的步骤中分配的
    ./sdk/android/src/jni/pc/peer_connection_factory.cc

static jlong JNI_PeerConnectionFactory_CreatePeerConnection(JNIEnv* jni,    jlong factory,    const JavaParamRef<jobject>& j_rtc_config,    const JavaParamRef<jobject>& j_constraints,    jlong observer_p,    const JavaParamRef<jobject>& j_sslCertificateVerifier)            PeerConnectionDependencies peer_connection_dependencies(observer.get());    if (!j_sslCertificateVerifier.is_null()) {        peer_connection_dependencies.tls_cert_verifier = std::make_unique<SSLCertificateVerifierWrapper>(        jni, j_sslCertificateVerifier);    }    rtc::scoped_refptr<PeerConnectionInterface> pc =        PeerConnectionFactoryFromJava(factory)->CreatePeerConnection(rtc_config,         std::move(peer_connection_dependencies));

 3.4 我们继续分析 BasicPortAllocator 的接口 CreateSession
    ./p2p/base/port_allocator.cc

std::unique_ptr<PortAllocatorSession> PortAllocator::CreateSession(const std::string& content_name,    int component,    const std::string& ice_ufrag,    const std::string& ice_pwd) {    CheckRunOnValidThreadAndInitialized();    auto session = std::unique_ptr<PortAllocatorSession>(        CreateSessionInternal(content_name, component, ice_ufrag, ice_pwd));    session->SetCandidateFilter(candidate_filter());    return session;}

   ./p2p/client/basic_port_allocator.cc    

PortAllocatorSession* BasicPortAllocator::CreateSessionInternal(const std::string& content_name,int component, const std::string& ice_ufrag, const std::string& ice_pwd) {    CheckRunOnValidThreadAndInitialized();    PortAllocatorSession* session = new BasicPortAllocatorSession(this, content_name, component, ice_ufrag, ice_pwd);    session->SignalIceRegathering.connect(this, &BasicPortAllocator::OnIceRegathering);    return session;}

4.
./p2p/client/basic_port_allocator.cc

void BasicPortAllocatorSession::StartGettingPorts() {  RTC_DCHECK_RUN_ON(network_thread_);    state_ = SessionState::GATHERING;  if (!socket_factory_) {      owned_socket_factory_.reset(          new rtc::BasicPacketSocketFactory(network_thread_));      socket_factory_ = owned_socket_factory_.get();    }  network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START);    RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "        << turn_port_prune_policy_;}

5.

void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {  switch (message->message_id) {    case MSG_CONFIG_START:      GetPortConfigurations();      break;    case MSG_CONFIG_READY:      OnConfigReady(static_cast<PortConfiguration*>(message->pdata));      break;    case MSG_ALLOCATE:      OnAllocate();      break;    case MSG_SEQUENCEOBJECTS_CREATED:      OnAllocationSequenceObjectsCreated();      break;    case MSG_CONFIG_STOP:      OnConfigStop();      break;    default:      RTC_NOTREACHED();  }}
void BasicPortAllocatorSession::GetPortConfigurations() {  RTC_DCHECK_RUN_ON(network_thread_);  PortConfiguration* config =      new PortConfiguration(allocator_->stun_servers(), username(), password());  for (const RelayServerConfig& turn_server : allocator_->turn_servers()) {    config->AddRelay(turn_server);  }  ConfigReady(config);}
void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {  RTC_DCHECK_RUN_ON(network_thread_);  network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_READY, config);}

6. 

void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) {  RTC_DCHECK_RUN_ON(network_thread_);  if (config) {    configs_.push_back(config);  }  AllocatePorts();}
void BasicPortAllocatorSession::AllocatePorts() {  RTC_DCHECK_RUN_ON(network_thread_);  network_thread_->Post(RTC_FROM_HERE, this, MSG_ALLOCATE);}

7.

void BasicPortAllocatorSession::OnAllocate() {  RTC_DCHECK_RUN_ON(network_thread_);  if (network_manager_started_ && !IsStopped()) {    bool disable_equivalent_phases = true;    DoAllocate(disable_equivalent_phases);  }  allocation_started_ = true;}
void BasicPortAllocatorSession::DoAllocate(bool disable_equivalent)    AllocationSequence* sequence = new AllocationSequence(this, networks[i], config, sequence_flags);    sequence->SignalPortAllocationComplete.connect(this, &BasicPortAllocatorSession::OnPortAllocationComplete);    sequence->Init();    sequence->Start();    sequences_.push_back(sequence);    done_signal_needed = true;        network_thread_->Post(RTC_FROM_HERE, this, MSG_SEQUENCEOBJECTS_CREATED);

    7.1 sequence->Init() 这个里面创建了一个 UDP 套接字,并绑定读取接口
    ./p2p/client/basic_port_allocator.cc

    void AllocationSequence::Init() {        if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {            udp_socket_.reset(session_->socket_factory()->CreateUdpSocket(            rtc::SocketAddress(network_->GetBestIP(), 0),            session_->allocator()->min_port(), session_->allocator()->max_port()));            if (udp_socket_) {                udp_socket_->SignalReadPacket.connect(this,  &AllocationSequence::OnReadPacket);            }            // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP            // are next available options to setup a communication channel.        }    }

   7.2 sequence->Start()
        session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
    
    7.3

    void AllocationSequence::OnMessage(rtc::Message* msg) {        const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};        // Perform all of the phases in the current step.        RTC_LOG(LS_INFO) << network_->ToString() << ": Allocation Phase=" << PHASE_NAMES[phase_];        switch (phase_) {            case PHASE_UDP:            CreateUDPPorts();            CreateStunPorts();            break;        case PHASE_RELAY:            CreateRelayPorts();            break;        case PHASE_TCP:            CreateTCPPorts();            state_ = kCompleted;            break;        default:            RTC_NOTREACHED();        }        if (state() == kRunning) {            ++phase_;            session_->network_thread()->PostDelayed(RTC_FROM_HERE,                session_->allocator()->step_delay(),                this, MSG_ALLOCATION_PHASE);        } else {            // If all phases in AllocationSequence are completed, no allocation            // steps needed further. Canceling  pending signal.            session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);            SignalPortAllocationComplete(this);        }    }

  7.4

    void AllocationSequence::CreateUDPPorts()        // 把上述创建的 udp_socket_         port = UDPPort::Create(        session_->network_thread(), session_->socket_factory(), network_,        session_->allocator()->min_port(), session_->allocator()->max_port(),        session_->username(), session_->password(),        session_->allocator()->origin(), emit_local_candidate_for_anyaddress,        session_->allocator()->stun_candidate_keepalive_interval());            //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++        // 参见下面的 AddAllocatedPort 主要是 OnCandidateReady, OnPortComplete        //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++        session_->AddAllocatedPort(port.release(), this, true);

        7.4.1
        ./p2p/base/stun_port.h

        static std::unique_ptr<UDPPort> Create(            rtc::Thread* thread, rtc::PacketSocketFactory* factory,            rtc::Network* network, rtc::AsyncPacketSocket* socket,            const std::string& username, const std::string& password,            const std::string& origin, bool emit_local_for_anyaddress,            absl::optional<int> stun_keepalive_interval) {            // Using `new` to access a non-public constructor.            auto port = absl::WrapUnique(new UDPPort(thread, factory, network, socket,                username, password, origin, emit_local_for_anyaddress));            port->set_stun_keepalive_delay(stun_keepalive_interval);            if (!port->Init()) {                return nullptr;            }            return port;        }


    
        7.4.2

        bool UDPPort::Init()            stun_keepalive_lifetime_ = GetStunKeepaliveLifetime();            if (!SharedSocket()) {                RTC_DCHECK(socket_ == nullptr);                //---------------------------------------------------------------------------------                // 这里的 socket_factory 其实就是上面的 BasicPacketSocketFactory 的接口,创建 socket 并绑定                //---------------------------------------------------------------------------------                socket_ = socket_factory()->CreateUdpSocket(                    rtc::SocketAddress(Network()->GetBestIP(), 0), min_port(), max_port());                if (!socket_) {                    RTC_LOG(LS_WARNING) << ToString() << ": UDP socket creation failed";                    return false;                }                socket_->SignalReadPacket.connect(this, &UDPPort::OnReadPacket);            }            socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket);            socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend);            socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady);            requests_.SignalSendPacket.connect(this, &UDPPort::OnSendPacket);

    到这一步,我们看到 socket 接收数据 pipeline (  参见下面流程 7.7.1 -->  7.7.7 ) 已经建立
        
    7.5

    void BasicPortAllocatorSession::AddAllocatedPort(Port* port,          AllocationSequence* seq, bool prepare_address) {        if (!port)            return;        port->set_content_name(content_name());        port->set_component(component());        port->set_generation(generation());        if (allocator_->proxy().type != rtc::PROXY_NONE)            port->set_proxy(allocator_->user_agent(), allocator_->proxy());        port->set_send_retransmit_count_attribute(            (flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);        PortData data(port, seq);        ports_.push_back(data);        port->SignalCandidateReady.connect(            this, &BasicPortAllocatorSession::OnCandidateReady);        port->SignalCandidateError.connect(            this, &BasicPortAllocatorSession::OnCandidateError);        port->SignalPortComplete.connect(this,            &BasicPortAllocatorSession::OnPortComplete);        port->SignalDestroyed.connect(this,            &BasicPortAllocatorSession::OnPortDestroyed);        port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);            RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";        if (prepare_address)            port->PrepareAddress();    }

    7.6

    void UDPPort::PrepareAddress() {        if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) {            OnLocalAddressReady(socket_, socket_->GetLocalAddress());        }    }        void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,                                  const rtc::SocketAddress& address) {        // When adapter enumeration is disabled and binding to the any address, the        // default local address will be issued as a candidate instead if        // |emit_local_for_anyaddress| is true. This is to allow connectivity for        // applications which absolutely requires a HOST candidate.        rtc::SocketAddress addr = address;        // If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at        // least the port is listening.        MaybeSetDefaultLocalAddress(&addr);        AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",                 LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);        MaybePrepareStunCandidate();    }        void UDPPort::MaybePrepareStunCandidate() {        // Sending binding request to the STUN server if address is available to        // prepare STUN candidate.        if (!server_addresses_.empty()) {            SendStunBindingRequests();        } else {            // Port is done allocating candidates.            MaybeSetPortCompleteOrError();        }    }

   7.7 这个地方发送 stun 的绑定命令到 stun 服务器

    void UDPPort::SendStunBindingRequests() {        // We will keep pinging the stun server to make sure our NAT pin-hole stays        // open until the deadline (specified in SendStunBindingRequest).        RTC_DCHECK(requests_.empty());        for (ServerAddresses::const_iterator it = server_addresses_.begin();            it != server_addresses_.end(); ++it) {            SendStunBindingRequest(*it);        }    }    void UDPPort::SendStunBindingRequest(const rtc::SocketAddress& stun_addr) {         if (stun_addr.IsUnresolvedIP()) {            ResolveStunAddress(stun_addr);         } else if (socket_->GetState() == rtc::AsyncPacketSocket::STATE_BOUND) {            // Check if |server_addr_| is compatible with the port's ip.            if (IsCompatibleAddress(stun_addr)) {                  // 发送 stun 绑定命令                  requests_.Send(new StunBindingRequest(this, stun_addr, rtc::TimeMillis()));            } else {                  // Since we can't send stun messages to the server, we should mark this                  // port ready.                  const char* reason = "STUN server address is incompatible.";                  RTC_LOG(LS_WARNING) << reason;                  OnStunBindingOrResolveRequestFailed(stun_addr, SERVER_NOT_REACHABLE_ERROR, reason);             }          }    }


    //------------------------------------------------------------------------------------------------------------------------------------------
    // stun 服务器响应流程分析 ,顺便也分析了 socket 接收数据的一整个流程
    // MessageQueue::Get 这个流程上面已经分析过了, 就是初始化启动的一个线程  network_thread_
    //------------------------------------------------------------------------------------------------------------------------------------------
    7.7.1 --- 这个 ss_ 就是上面的 SocketServer::CreateDefault() 也就是 PhysicalSocketServer::Wait 接口

    bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io)                if (!ss_->Wait(static_cast<int>(cmsNext), process_io))

    7.7.2  --- 就以 linux 下的 epoll 网络模型进行分析吧,大体差不多

    bool PhysicalSocketServer::Wait(int cmsWait, bool process_io)        return WaitEpoll(cmsWait);

    7.7.3

    bool PhysicalSocketServer::WaitEpoll(int cmsWait)        ProcessEvents(pdispatcher, readable, writable, check_error);

   7.7.4

    static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable, bool check_error)        dispatcher->OnEvent(ff, errcode);

    7.7.5  ---  这个绑定关系参见开篇的 new AsyncUDPSocket(socket)
    ./rtc_base/physical_socket_server.cc

    void SocketDispatcher::OnEvent(uint32_t ff, int err)        SignalReadEvent(this);

   7.7.6  ---  这个的绑定关系参见上面的 7.4.2 UDPPort::Init()
    ./rtc_base/async_udp_socket.cc

    void AsyncUDPSocket::OnReadEvent(AsyncSocket* socket)        SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr, (timestamp > -1 ? timestamp : TimeMicros()));


    7.7.7 ---  requests_ 其实就是对象 StunRequestManager
    ./p2p/base/stun_port.cc

    void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket, const char* data, size_t size,        const rtc::SocketAddress& remote_addr, const int64_t& packet_time_us)        requests_.CheckResponse(data, size);

    7.7.8  ---   对应上面的 requests_.Send stun 的绑定命令          
    ./p2p/base/stun_request.cc

    bool StunRequestManager::CheckResponse(StunMessage* msg)        RequestMap::iterator iter = requests_.find(msg->transaction_id());        StunRequest* request = iter->second;        request->OnResponse(msg);

    7.7.9
    ./p2p/base/stun_port.cc

    void StunBindingRequest::OnResponse(StunMessage* response)         port_->OnStunBindingRequestSucceeded(this->Elapsed(), server_addr_, addr);

    7.8  --- stun 成功,则进入
    ./p2p/base/stun_port.cc

    void UDPPort::OnStunBindingRequestSucceeded(int rtt_ms,        const rtc::SocketAddress& stun_server_addr,        const rtc::SocketAddress& stun_reflected_addr)                AddAddress(stun_reflected_addr, socket_->GetLocalAddress(), related_address,               UDP_PROTOCOL_NAME, "", "", STUN_PORT_TYPE,               ICE_TYPE_PREFERENCE_SRFLX, 0, url.str(), false);

    7.9        

    void Port::AddAddress(const rtc::SocketAddress& address,                      const rtc::SocketAddress& base_address,                      const rtc::SocketAddress& related_address,                      const std::string& protocol,                      const std::string& relay_protocol,                      const std::string& tcptype,                      const std::string& type,                      uint32_t type_preference,                      uint32_t relay_preference,                      const std::string& url, bool is_final)                                                FinishAddingAddress(c, is_final);

    7.10  ---  SignalCandidateReady 的绑定关系见 7.5 

    void Port::FinishAddingAddress(const Candidate& c, bool is_final) {        candidates_.push_back(c);        SignalCandidateReady(this, c);        PostAddAddress(is_final);    }

    7.11 
    ./p2p/client/basic_port_allocator.cc

    void BasicPortAllocatorSession::OnCandidateReady(Port* port, const Candidate& c)             SignalPortReady(this, port);  --->             ./p2p/base/p2p_transport_channel.cc:168:  session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);            void P2PTransportChannel::OnPortReady(PortAllocatorSession* session, PortInterface* port)                CreateConnection(port, *iter, iter->origin_port());            bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate, PortInterface* origin_port)                if (CreateConnection(origin_port, remote_candidate, origin_port))            bool P2PTransportChannel::CreateConnection(PortInterface* port, const Candidate& remote_candidate, PortInterface* origin_port)                // 针对这个 port 创建一个 Connection,后续的数据接收都是通过这个进行了,表明 stun 已经成功                Connection* connection = port->CreateConnection(remote_candidate, origin);                AddConnection(connection);            void P2PTransportChannel::AddConnection(Connection* connection)                connection->SignalReadPacket.connect(this, &P2PTransportChannel::OnReadPacket);                                           SignalCandidatesReady(this, candidates);

    7.12
    ./p2p/base/p2p_transport_channel.cc

    void P2PTransportChannel::OnCandidatesReady(        PortAllocatorSession* session,        const std::vector<Candidate>& candidates) {        RTC_DCHECK_RUN_ON(network_thread_);        for (size_t i = 0; i < candidates.size(); ++i) {            SignalCandidateGathered(this, candidates[i]);        }    }

    7.13
    ./pc/jsep_transport_controller.cc

    void JsepTransportController::OnTransportCandidateGathered_n(        cricket::IceTransportInternal* transport,        const cricket::Candidate& candidate) {        RTC_DCHECK(network_thread_->IsCurrent());        // We should never signal peer-reflexive candidates.        if (candidate.type() == cricket::PRFLX_PORT_TYPE) {            RTC_NOTREACHED();            return;        }        std::string transport_name = transport->transport_name();        invoker_.AsyncInvoke<void>(            RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {            SignalIceCandidatesGathered(transport_name, {candidate});        });    }

    7.14
    ./pc/peer_connection.cc

    void PeerConnection::OnTransportControllerCandidatesGathered(        const std::string& transport_name,        const cricket::Candidates& candidates) {        int sdp_mline_index;        if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) {            RTC_LOG(LS_ERROR)                << "OnTransportControllerCandidatesGathered: content name "                << transport_name << " not found";            return;        }        for (cricket::Candidates::const_iterator citer = candidates.begin();            citer != candidates.end(); ++citer) {            // Use transport_name as the candidate media id.            std::unique_ptr<JsepIceCandidate> candidate(                new JsepIceCandidate(transport_name, sdp_mline_index, *citer));            if (local_description()) {                mutable_local_description()->AddCandidate(candidate.get());            }            OnIceCandidate(std::move(candidate));        }    }        void PeerConnection::OnIceCandidate(        std::unique_ptr<IceCandidateInterface> candidate) {      if (IsClosed()) {        return;      }      ReportIceCandidateCollected(candidate->candidate());      // 这个地方回调到 Java 层的接口,并把自己的 candidate 发送给对方      Observer()->OnIceCandidate(candidate.get());    }

8.

void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() {  RTC_DCHECK_RUN_ON(network_thread_);  allocation_sequences_created_ = true;  // Send candidate allocation complete signal if we have no sequences.  MaybeSignalCandidatesAllocationDone();}

./p2p/base/p2p_transport_channel.cc

void P2PTransportChannel::OnCandidatesAllocationDone(    PortAllocatorSession* session) {  RTC_DCHECK_RUN_ON(network_thread_);  if (config_.gather_continually()) {    RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name()                     << ", component " << component()                     << " gathering complete, but using continual "                        "gathering so not changing gathering state.";    return;  }  gathering_state_ = kIceGatheringComplete;  RTC_LOG(LS_INFO) << "P2PTransportChannel: " << transport_name()                   << ", component " << component() << " gathering complete";  SignalGatheringState(this);}

./pc/jsep_transport_controller.cc

void JsepTransportController::OnTransportGatheringState_n(    cricket::IceTransportInternal* transport) {  RTC_DCHECK(network_thread_->IsCurrent());  UpdateAggregateStates_n();}

 


  • 2020-11-22 22:58:52

    Android LiveData Transformations

    有时候有这样的需求,需要在LiveData将变化的数据通知给观察者前,改变数据的类型;或者是返回一个不一样的LiveData。

  • 2020-11-22 23:00:16

    androidx中的lifecycle组件

    Lifecycle-aware components生命周期感知组件执行操作,以响应另一个组件生命周期状态的更改,例如Activity和Fragment。这些组件可以帮助您生成更有组织、更容易维护的轻量级代码。

  • 2020-11-22 23:02:50

    Android数据存储之DataBase的Room

    Room是Google在AndroidX中提供的一个ORM(Object Relational Mapping,对象关系映射)库。它是在SQLite上提供的一个抽象层,可以使用SQLite的全部功能,同时可以更好更便捷流畅地访问数据库。(关于AndroidX可以参考

  • 2020-11-22 23:04:39

    Android组件 LiveData与MutableLiveData教程

    LiveData与ViewMode是经常搭配在一起使用的,但是为了不太混乱,我还是拆分开来说明,此篇博客只讲解 LiveData 与 MutableLiveData的概念与使用方式(但是会涉及到ViewMode的部分代码).

  • 2020-11-22 23:14:52

    Dagger 2 在 Android 上的用法

    在前面的文章我们介绍了Dagger2 中的大部分注解的使用,接下来我们从源码角度分析下第一篇文章中例子的原理。

  • 2020-11-22 23:18:59

    Android开发从Dagger2迁移至Kodein的感受

    最近个人在尝试构建 Kotlin版本 的Android MVVM开发框架,在依赖注入框架的选型上,我最终选择了 Kodein 。这是一个非常轻量级的DI框架,相比于配置繁琐的Dagger(繁琐的配置也是导致Dagger学习成本一直居高不下的原因!),它的配置过程更清晰且简单,并且,这个库的源码也是 Kotlin 的。

  • 2020-11-22 23:25:56

    Dagger2源码解析inject过程

    添加inject后,通过编译生成的DaggerMainComponent类来导入,说明编译以后生成了一些类,那到底生成了什么类呢。 Module和Component又是什么,该怎么里理解 在这篇文章后里将一一讨论。

  • 2020-11-22 23:27:28

    dagger学习教程

    dagger android 学习(一):dagger基础使用 dagger android 学习(二):AndroidInjector的使用 dagger android 学习(三):ContributesAndroidInjector的进一步优化 dagger android 学习(四):基于dagger2的mvp架构