当前位置:首页 » 《随便一记》 » 正文

流媒体学习之路(WebRTC)——Pacer与GCC(5)

1 人参与  2024年02月15日 13:26  分类 : 《随便一记》  评论

点击全文阅读


流媒体学习之路(WebRTC)——Pacer与GCC(5)

——我正在的github给大家开发一个用于做实验的项目 —— github.com/qw225967/Bifrost目标:可以让大家熟悉各类Qos能力、带宽估计能力,提供每个环节关键参数调节接口并实现一个json全配置,提供全面的可视化算法观察能力。欢迎大家使用——

文章目录

流媒体学习之路(WebRTC)——Pacer与GCC(5)一、PacingController1.1 背景介绍1.2 代码 二、IntervalBudget2.1 背景2.2 代码 三、PacedSender四、总结


  在讲具体内容之前插一句嘴,从GCC分析(3)开始,我们将针对GCC的实现细节去分析它设计的原理,让我们理解这些类存在的意义,不再带大家去串具体的流程了。

一、PacingController

1.1 背景介绍

  Pacer(Packet Pacing)的作用是在传输数据时能平滑的发送出去,减少对网络冲击和抖动的产生,提高通信质量。在一次数据传输中,如果所有包几乎同时发送,网络就可能会遭遇到冲击,这就可能导致网络拥塞,数据包丢失等问题。为了避免这样的问题,需要通过一个定时器均匀分散发送数据包。
  特别是在音视频传输中,PACER更是非常重要的一部分。因为音视频的传输对于网络的稳定性和实时性要求非常高,任何形式的网络抖动或者丢包都会造成音视频的卡顿,延迟等问题。所以在WebRTC中使用Pacer,就是为了使音视频传输更加平滑,减少由于网络抖动造成的影响,从而达到提高实时音视频通信质量的目的。

  提到WebRTC的Pacer就需要讲述它码率控制的逻辑:
在这里插入图片描述
  从GCC输出的码率会设置给编码器以及pacer。pacer并不是完全严格设置多少就发多少,而是留有2.5倍的空间去发送。真正控制发送码率的则是输出给编码器的部分,期望控制编码器的输出码率。同时,pacer还对所有数据设置了优先级,优先级如下:

int GetPriorityForType(RtpPacketToSend::Type type) {  // Lower number takes priority over higher.  switch (type) {    case RtpPacketToSend::Type::kAudio:      // Audio is always prioritized over other packet types.      return kFirstPriority + 1;    case RtpPacketToSend::Type::kRetransmission:      // Send retransmissions before new media.      return kFirstPriority + 2;    case RtpPacketToSend::Type::kVideo:    case RtpPacketToSend::Type::kForwardErrorCorrection:      // Video has "normal" priority, in the old speak.      // Send redundancy concurrently to video. If it is delayed it might have a      // lower chance of being useful.      return kFirstPriority + 3;    case RtpPacketToSend::Type::kPadding:      // Packets that are in themselves likely useless, only sent to keep the      // BWE high.      return kFirstPriority + 4;  }}

  Pacer之所设计成这样,是因为我们向编码器设置码率之后想要保证丝滑清晰的画面,不可能完全控制输出码率,有时候画面复杂码率就大一些,画面简单码率就小一些。所以Pacer为了保证延迟预留了2.5倍的发送空间,也就是说真正控制码率的位置其实是编码器的输出

1.2 代码

  接下来我看看看pacer的核心代码——PacingController。这个类包含了优先级设置以及发送的逻辑,前面提到了优先级的内容下面只介绍发送逻辑:

void PacingController::ProcessPackets() {  Timestamp now = CurrentTime(); // 当前时间  TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); // 与上次process的间隔  // 发送保活,每500ms发送一个padding包,一旦发送的数据大于拥塞窗口则不发送  if (ShouldSendKeepalive(now)) {    DataSize keepalive_data_sent = DataSize::Zero();    // 产生padding包    std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =        packet_sender_->GeneratePadding(DataSize::bytes(1));    for (auto& packet : keepalive_packets) {      keepalive_data_sent +=          DataSize::bytes(packet->payload_size() + packet->padding_size());      packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo());    }    OnPaddingSent(keepalive_data_sent);  }  // 处于暂停直接返回  if (paused_)    return;    // 进入发送间隔开始计算  if (elapsed_time > TimeDelta::Zero()) {    DataRate target_rate = pacing_bitrate_;    DataSize queue_size_data = packet_queue_.Size();    // 队列中有数据才能发送    if (queue_size_data > DataSize::Zero()) {      // Assuming equal size packets and input/output rate, the average packet      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if      // time constraint shall be met. Determine bitrate needed for that.      //       packet_queue_.UpdateQueueTime(CurrentTime());      if (drain_large_queues_) {        // 平均发送时间 = 最大队列时长(2s)- 平均排队时间        TimeDelta avg_time_left =            std::max(TimeDelta::ms(1),                     queue_time_limit - packet_queue_.AverageQueueTime());        DataRate min_rate_needed = queue_size_data / avg_time_left;        // 最发送码率大于目标码率,则目标码率等于最小需求码率        if (min_rate_needed > target_rate) {          target_rate = min_rate_needed;          RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="                              << target_rate.kbps();        }      }    }    // 设置媒体桶    media_budget_.set_target_rate_kbps(target_rate.kbps());    UpdateBudgetWithElapsedTime(elapsed_time);  }  bool first_packet_in_probe = false;  bool is_probing = prober_.IsProbing();  PacedPacketInfo pacing_info;  absl::optional<DataSize> recommended_probe_size;  // 正在探测则获取探测数据信息  if (is_probing) {    pacing_info = prober_.CurrentCluster();    first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;    recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());  }  DataSize data_sent = DataSize::Zero();  // The paused state is checked in the loop since it leaves the critical  // section allowing the paused state to be changed from other code.  //   while (!paused_) {    if (small_first_probe_packet_ && first_packet_in_probe) {      // If first packet in probe, insert a small padding packet so we have a      // more reliable start window for the rate estimation.      // 产生padding包      auto padding = packet_sender_->GeneratePadding(DataSize::bytes(1));      // If no RTP modules sending media are registered, we may not get a      // padding packet back.      if (!padding.empty()) {        // Insert with high priority so larger media packets don't preempt it.        EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);        // We should never get more than one padding packets with a requested        // size of 1 byte.        RTC_DCHECK_EQ(padding.size(), 1u);      }      first_packet_in_probe = false;    }    // 获取待发送包    auto* packet = GetPendingPacket(pacing_info);    // 一旦产生不了数据,证明队列为空,则放入padding数据    if (packet == nullptr) {      // No packet available to send, check if we should send padding.      DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);      if (padding_to_add > DataSize::Zero()) {        std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =            packet_sender_->GeneratePadding(padding_to_add);        if (padding_packets.empty()) {          // No padding packets were generated, quite send loop.          break;        }        for (auto& packet : padding_packets) {          EnqueuePacket(std::move(packet));        }        // Continue loop to send the padding that was just added.        continue;      }      // Can't fetch new packet and no padding to send, exit send loop.      break;    }    // 发送数据    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();    RTC_DCHECK(rtp_packet);    packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);    data_sent += packet->size();    // Send succeeded, remove it from the queue.    OnPacketSent(packet);    if (recommended_probe_size && data_sent > *recommended_probe_size)      break;  }    if (is_probing) {    probing_send_failure_ = data_sent == DataSize::Zero();    if (!probing_send_failure_) {      prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());    }  }}RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(    const PacedPacketInfo& pacing_info) {  if (packet_queue_.Empty()) {    return nullptr;  }  // Since we need to release the lock in order to send, we first pop the  // element from the priority queue but keep it in storage, so that we can  // reinsert it if send fails.    // 取出第一个包  RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;  bool apply_pacing = !audio_packet || pace_audio_;  // 如果处于拥塞状态或者剩余数据为0则取消弹出  if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&                                       pacing_info.probe_cluster_id ==                                           PacedPacketInfo::kNotAProbe))) {        packet_queue_.CancelPop();    return nullptr;  }  return packet;}

二、IntervalBudget

2.1 背景

  PacingController上述用到了IntervalBudget这个类,这个类用于做数据统计和预估。并且它作为一个抽象预估类,并不会真正的存数据,只是做了数据统计,每次排出数据后都按时间更新一次桶的容量,发送时则会把已发送的数据更新到桶数据中。
在这里插入图片描述

2.2 代码

  头文件:

class IntervalBudget { public:  explicit IntervalBudget(int initial_target_rate_kbps);  IntervalBudget(int initial_target_rate_kbps, bool can_build_up_underuse);  void set_target_rate_kbps(int target_rate_kbps);  // TODO(tschumim): Unify IncreaseBudget and UseBudget to one function.  void IncreaseBudget(int64_t delta_time_ms);  void UseBudget(size_t bytes);  size_t bytes_remaining() const;  double budget_ratio() const;  int target_rate_kbps() const; private:  int target_rate_kbps_;  int64_t max_bytes_in_budget_;  int64_t bytes_remaining_;  bool can_build_up_underuse_;};

  CPP文件:

constexpr int64_t kWindowMs = 500;}IntervalBudget::IntervalBudget(int initial_target_rate_kbps)    : IntervalBudget(initial_target_rate_kbps, false) {}IntervalBudget::IntervalBudget(int initial_target_rate_kbps,                               bool can_build_up_underuse)    : bytes_remaining_(0), can_build_up_underuse_(can_build_up_underuse) {  set_target_rate_kbps(initial_target_rate_kbps);}void IntervalBudget::set_target_rate_kbps(int target_rate_kbps) {  target_rate_kbps_ = target_rate_kbps;  // 默认按500ms计算最大桶码率  max_bytes_in_budget_ = (kWindowMs * target_rate_kbps_) / 8;  // 计算剩余码率  bytes_remaining_ = std::min(std::max(-max_bytes_in_budget_, bytes_remaining_),                              max_bytes_in_budget_);}void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {  // 按时换算桶的码率  int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;  if (bytes_remaining_ < 0 || can_build_up_underuse_) {    // We overused last interval, compensate this interval.    // 把当前的码率加上    bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);  } else {    // If we underused last interval we can't use it this interval.    // 一旦剩余码率为负则重新使用新计算的码率    bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);  }}void IntervalBudget::UseBudget(size_t bytes) {  // 把使用的数据进行统计  bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),                              -max_bytes_in_budget_);}size_t IntervalBudget::bytes_remaining() const {  return rtc::saturated_cast<size_t>(std::max<int64_t>(0, bytes_remaining_));}double IntervalBudget::budget_ratio() const {  if (max_bytes_in_budget_ == 0)    return 0.0;  return static_cast<double>(bytes_remaining_) / max_bytes_in_budget_;}int IntervalBudget::target_rate_kbps() const {  return target_rate_kbps_;}

三、PacedSender

  上述的PacingController把具体的发送数据进行具体的计算,WebRTC把发送的逻辑和控制逻辑抽离了出来,其实PacingSender在构造时创建了PacingController并传入了this指针。因此对于PacingController来说PacingSender作为控制器在内部进行了回调。

  其他的函数我们不做具体的描述,只介绍定时函数:

int64_t PacedSender::TimeUntilNextProcess() {  rtc::CritScope cs(&critsect_);  // When paused we wake up every 500 ms to send a padding packet to ensure  // we won't get stuck in the paused state due to no feedback being received.  // 从controller中获取间隔  TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();  if (pacing_controller_.IsPaused()) {    // 最大间隔为500ms    return std::max(PacingController::kPausedProcessInterval - elapsed_time,                    TimeDelta::Zero())        .ms();  }  auto next_probe = pacing_controller_.TimeUntilNextProbe();  if (next_probe) {    return next_probe->ms();  }  const TimeDelta min_packet_limit = TimeDelta::ms(5);  return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();}

四、总结

  本文介绍了Pacer相关的内容,但我们的目的是通过Pacer去理解GCC的逻辑,在经过多个版本的迭代,Pacer与GCC的配合已经非常娴熟,同时耦合也是非常严重的:

每次Pacer的溢出发送,都需要GCC兜底(GCC的灵敏可以有效地检测到网络的排队,任何一个溢出的数据都能快速的下调码率,在遇到瓶颈带宽的时候出现了明显的锯齿状发送曲线);
在这里插入图片描述

码率不足与拥塞探测的矛盾(编码器的输出往往会收到一定的限制不可能无线地上涨,在当今环境下很难探测到带宽瓶颈。Pacer的做法是提供Padding的数据作为补充探测,但大部分厂商为了避免流量过度消耗,就把探测的逻辑关闭了。在这方面来看,Pacer真是没有完全听GCC的话);

  也正是因为这样,WebRTC的Pacer是GCC的Pacer其他的拥塞算法来了,估计都水土不服,参考BBR被移除可知。


点击全文阅读


本文链接:http://zhangshiyu.com/post/68569.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1