/ in Russian, на русском

Флюссоник: раздача MPEGTS видео по UDP

В сети можно часто встретить статьи вида «Проблема → Решение», и часто трудно понять почему автор выбрал именно такое решение. Эта статья следует принципу «Проблема → N Попыток → Решение» и отражает реальный ход разработки.

Не так давно я устроился работать в компанию Erlyvideo. Компания занимается разработкой и поддержкой Flussonic сервера потокового видео, головного продукта компании, а также нескольких побочных продуктов.

Расскажу об одной интересной задаче, которую мне выпало решать.

Одна из фич, которую умеет Flussonic — раздача MPEGTS потока по UDP. Всё отлично, но бывает что Flussonic отдаёт его не совсем равномерно.

К примеру, мы получили трёхсекундный сегмент от HLS источника, порезали на пакеты и отправили по UDP. Потом затишье до следующей порции.

На самом деле всё здесь в порядке, такой поток будет нормально проигрываться на вашем компьютере (через VLC, например). Но в случае Flussonic клиентом может быть какая-нибудь видеоприставка с небольшим буфером.

Как оказалось, видеоприставка проигрывает видео более адекватно, если оно поступает небольшими порциями, ровно столько сколько нужно проиграть.

Существующее решение

Старое решение написанное до меня работало так: вычислить длительность между кадрами, с этой длительностью слать кадры клиенту.

Всё бы ничего, но похоже что эти вычисления были неточны. Длительность (между DTS) между кадрами довольно маленькая, возможно где-то накапливалась ошибка, и мы отправляли все пакеты до того, как получали новую порцию.

DTS — Decoder Timestamp. Относительное время декодирования кадра.

На графике показана дельта времени между отправленными нами UDP пакетами. На самом деле эти скачки ещё выше, их пришлось порезать, чтобы график влезал полностью.

Эти скачки—большой интервал тишины между пакетами, это и есть то от чего видеоприставку дёргает.

Синяя линия — среднее квадратическое значение дельт. Добавил для самоконтроля. Если синяя линия проходит в аккурат по сине-зелёной кривой, значит нам удаётся более-менее равномерно отправлять пакеты. Пока это не так.

На самом деле алгоритм был немного сложнее, было нескольких хаков доводящих решение до более-менее работоспособного, какой-то способ устранения отставания.

Была ещё одна попытка, считать длительности не между кадрами, а между пачками кадров, но в чём-то это решение было даже хуже.

Исследование проблемы

Первым делом мне захотелось посмотреть как вообще выглядят эти интервалы между кадрами, на которые мы так надеемся. По-идее они должны быть очень равномерными.

Скормил флюссонику тестовый кусок мультфильма (низкое качество, есть аудио). Вот так выглядят дельты DTS кадров:

Ого! Очень интересно.

Попробуем теперь камеру с хорошим битрейтом, но без звука:

Две совсем разные картинки.

На самом деле мы отсылаем разные кадры. Есть аудиокадры, есть видеокадры, которые объединяются в GOP-ы (один keyframe и связанные с ним кадры). Есть ещё config-кадры, в которых содержится информация для декодера, но не сами данные.

Попробуем разбить эти разные типы кадров, и отобразим как изменяется DTS. График камеры без аудио почти не изменится, не особенно интересно. А вот на график дельт DTS мультфильма с аудиодорожкой очень даже интересно посмотреть:

Теперь всё равномерно. Те красивые узоры получались лишь потому, что мы вычисляли дельты между кадрами разного типа (аудио и видео). Если рассматривать дельту между кадрами одного типа, но всё становится понятным.

Хорошо виден резкий спуск вниз после config-кадра. Оно и понятно, config-кадр не имеет длительности, поэтому и дельта его DTS будет равна нулю.

С дельтой DTS теперь стало более-менее понятно. Теперь покажем с какими рывками эти кадры мы получаем. На графике показаны дельты времени получения кадров от источника:

С такими рывками получаем данные. Заголовок врёт, это не DTS, а реальное время.

Нам нужно отдавать эти полученные данные, но равномерно.

Итак, как будем решать? На самом деле стоит попробовать старое решение с измерением длительностей между кадрами и отправки по кусочкам. Только теперь будем учитывать, что DTS изменяется неравномерно, и могут присутствовать периодические спуски и взлёты. Также теперь нам понятно, что если брать DTS только видеофреймов, игнорируя аудио и config-фреймы, то можно измерять длительность куска более точно.

Решение

Алгоритм такой:

Выделить участок из N кадров, найти два локальных максимума дельт видео-кадров справа и слева. Кадры между этими максимумами следует порезать на равные по длительности DTS порции и отправлять.

После того как мы всё отправили на следует выделить следующий кусок. Ищем следующий максимум дельт DTS справа. Только на этот раз участок берём по длительности предыдущего.

Константа начального участка, а также окрестность (эпсилон) в которой искать локальный максимум дельт нужно будет грамотно подобрать, поглядев на реальные данные.

Вроде бы всё неплохо, но при таком подходе у нас может накопиться ошибка, т.к. мы пока никак не синхронизируемся с реальным временем. Само вычисление тоже потребляет какое-то время, мы постепенно будет отправлять данные чуть позже.

Кроме того, значения DTS которые мы получаем могут быть попросту недостаточно точными.

Поэтому добавляем в алгоритм третий пункт:

  • Перед отправкой первой порции фиксируем реальное время. Перед отправкой первой порции следующего участка снова фиксируем время, считаем дельту реального времени, и вычли дельту DTS внутри участка — так мы получим насколько мы отстаём. Полученное значение следует разделить на количество порций, и прибавить к длительности отправки каждой порции.

Итак, формула длительности отправки одного участка:

t — это реальное время между участками. k — коэффициент.

С помощью изменения k мы будем менять насколько быстро мы будем исправлять ошибку отставания.

Так мы регулярно будем исправлять ошибки времени в обе стороны, и это будет выглядеть довольно гладко.

Итак, мы получили такой трёхслойный алгоритм обрабатывающий поток данных:

Участки выделенные серым — сегмент между локальными максимумами дельт, где мы замеряем сколько времени нужно отправлять этот кусок.

Участки между измерениями t, это и есть порции кадров которые мы равномерно отправляем. Как можно заметить, из-за смещения количество порций не всегда одинаковое: 4, 4, 3.

Реализация

Алгоритм придуман, осталось закодить его на Erlang.

Сперва реализуем то что более-менее очевидно. Функция нахождения локального максимума дельт DTS:

-type dts_dmax() :: {
  FramesBefore::[frame()],
  Frame::frame(),
  Delta::float(),
  RestFrames::[frame()]
}.

-spec local_dts_delta_max(Frames::[frame()]) -> {ok, Max::dts_dmax()}.
local_dts_delta_max(Frames) ->
  local_dts_delta_max(Frames, [], undefined, undefined).

local_dts_delta_max([#frame{content=video, flavor=Flavor} = Frame | Frames], BeforeRev, undefined, Max)
when Flavor =:= keyframe orelse Flavor =:= frame ->
  local_dts_delta_max(Frames, [Frame | BeforeRev], Frame, Max);

local_dts_delta_max([#frame{content=video, flavor=Flavor} = Frame | Frames], BeforeRev, PrevVFrame, Max)
when Flavor =:= keyframe orelse Flavor =:= frame ->
  case dts_delta_cmp(PrevVFrame, Frame, Max) of
    old_is_greater ->
      local_dts_delta_max(Frames, [Frame | BeforeRev], Frame, Max);
    {new_is_greater, DeltaDTS} ->
      NewMax = {BeforeRev, Frame, DeltaDTS, Frames}, % BeforeRev is in reverse order yet
      local_dts_delta_max(Frames, [Frame | BeforeRev], Frame, NewMax)
  end;

% not a video frame, just skip
local_dts_delta_max([Frame | Frames], BeforeRev, PrevVFrame, Max) ->
  local_dts_delta_max(Frames, [Frame | BeforeRev], PrevVFrame, Max);

local_dts_delta_max([], _BeforeRev, _PrevVideoFrame, undefined) -> {error, no_two_videoframes};
local_dts_delta_max([], _BeforeRev, _PrevVFrame, {BeforeRev, Frame, DeltaDTS, Rest}) ->
  Before = lists:reverse(BeforeRev),
  Max = {Before, Frame, DeltaDTS, Rest},
  {ok, Max}.



-spec dts_delta_cmp(PrevFrame::frame(), Frame::frame(), OldMax::dts_dmax() | undefined) ->
  old_is_greater | {new_is_greater, DeltaDTS::float()}.
dts_delta_cmp(#frame{dts=PrevDTS}, #frame{dts=DTS}, undefined) ->
  {new_is_greater, (DTS - PrevDTS)};
dts_delta_cmp(#frame{dts=PrevDTS}, #frame{dts=DTS}, {_, _, OldDeltaDTS, _}) ->
  case (DTS - PrevDTS) of
    DeltaDTS when DeltaDTS > OldDeltaDTS -> {new_is_greater, DeltaDTS};
    _ -> old_is_greater
  end.
13:47:01.738 ------- delta: 32.399999998509884
13:47:01.738 ------- delta: 32.5333333350718
13:47:01.738 ------- delta: 32.55555555596948
13:47:01.739 ------- delta: 32.599999997764826
13:47:01.739 ------- delta: 32.400000002235174
13:47:01.739 ------- delta: 65.3111111111939
13:47:01.739 ------- delta: 32.63333333283663
13:47:01.739 ------- delta: 32.522222220897675
13:47:01.739 ------- delta: 32.58888889104128
13:47:01.739 ------- delta: 32.399999998509884
13:47:01.739 ------- delta: 32.61111110821366
13:47:01.739 --- max delta: 65.42222222313285
13:47:01.739 ------- delta: 32.48888888955116
13:47:01.739 ------- delta: 32.62222222238779
13:47:01.739 ------- delta: 33.144444447010756
13:47:01.739 ------- delta: 32.04444444179535
13:47:01.740 ------- delta: 32.33333333581686
13:47:01.740 ------- delta: 32.4666666649282
13:47:01.740 ------- delta: 32.600000001490116
13:47:01.740 ------- delta: 32.55555555596948
13:47:01.740 ------- delta: 32.38888888806105
13:47:01.740 ------- delta: 65.32222222164273
13:47:01.740 ------- delta: 32.688888888806105
13:47:01.740 ------- delta: 32.600000001490116
13:47:01.740 --- max delta: 65.44444444403052
13:47:01.740 ------- delta: 32.57777777686715
13:47:01.740 ------- delta: 32.66666666790843
13:47:01.740 ------- delta: 32.5
13:47:01.740 ------- delta: 32.477777775377035
13:47:01.740 ------- delta: 65.24444444477558
13:47:01.740 ------- delta: 32.65555555745959
13:47:01.740 ------- delta: 32.48888888582587
13:47:01.740 ------- delta: 32.644444447010756
13:47:01.740 ------- delta: 32.34444444254041
13:47:01.740 ------- delta: 65.32222222164273
13:47:01.740 ------- delta: 32.57777778059244
13:47:01.740 ------- delta: 32.688888888806105
13:47:01.740 ------- delta: 32.477777775377035
13:47:01.740 --- max delta: 65.32222222536802

Похоже работает =)

Собрался писать функцию выбора двух локальных максимумов в участке, и понял что можно всё немного упростить. Можно отказаться отказаться от плавающей длины участков измерения (там где мы ищем два локальных максимума). Можно взять эти участки фиксированными, длиной кратной длине области локальных максимумов.

Так мы избавимся от проблем со смещением отправляемых пакетов, но зато больше ответственности ляжет на устранение ошибки отставания, так как если такое зафиксированное разбиение будет плохо ложиться на реальные скачки DTS, то мы будем регулярно получать ошибку во времени.

Сперва попробую такую упрощённую версию алгоритма, а там посмотрим. Итак теперь наш трёхслойный алгоритм превратился в двухслойный и выглядит так:

На рисунке хорошо видно как лажает этот алгоритм на всех участках — максимумы оказываются вне выделенных областей справа, где мы ищем максимальную дельту.

На деле у нас будут достаточно большие области поиска максимума (эпсилон) чтобы их поймать. Если же всё таки мы промахнулись с максимумом и неправильно вычислили скорость потребления видео — будем надеяться на наше вычисление и устранение ошибки (real time delay synchronization).

Итак, первые результаты:

Мда, пока выглядит плохо. Регулярные маленькие пики — это результат нашей корректировки отставания, нужно поднастроить. А вот большой пик меня беспокоит. Возможно мы тупо отправили все полученные данные, а источник ещё не успел ничего прислать. Лечится увеличением размера участка.

Вот так ведёт себя очередь кадров от источника:

Вполне адекватно, нет никакого падения до нуля. Значит версию о том что у нас кончились данные и нечего отправлять отбрасываем.

Похоже мы неправильно корректируем ошибку. Вот так изменяется наша ошибка:

Ничего себе! Скачок от -4000 миллисекунд аж до 6000 миллисекунд отставания. В идеале я ожидал изменений в пределах 100–200 миллисекунд.

Попробуем снизить амплитуду. Для этого нужно снизить коэффициент k. Кроме того, стоит увеличить размер участка.

Пока проделывал всё это, заметил что допустил ошибку в вычислении дистанции между пакетами. Починил:

Отлично! Синяя линия как раз в аккурат по кривой, то что нужно. Есть заметный скачок, но это всего лишь 60 миллисекунд, наверное сгодится.

Снова измерил как меняется очередь кадров от источника:

И честно говоря, такой результат вогнал меня в ступор. Потребление памяти просто взлетает. Очередь растёт, значит мы не успеваем отправить всё. Но при этом отдаём мы всё равномерно.

Учитывая как мы исправляем ошибку проблема может быть только в одном — мы неправильно считаем длительность участка. Почему-то она выходит короче чем на самом деле.

Конечно это может быть это плохой источник, который выставляет неправильные DTS своим кадрам, но это очень маловероятно.

Тут меня осенило, но первым делом нужно проверить свою догадку. До этого я тестировал на камере, с скачкообразным DTS. Посмотрим как ведёт себя очередь на ролике с более ровном:

Ага! Совсем другая картина, никакого роста. А разгадка проста:

Красные промежутки — отрезки отправку которых я замеряю. И ниже точки замера реального времени. Видно смещение, всё потому что я ищу локальный максимум, для того чтобы более точно вычислять скорость с какой нужно отправлять участок.

Я имел глупость надеяться, что эти красные промежутки будут равны по длительности. В случае когда они действительно равны никакого роста памяти не было. А в случае камеры с неровным DTS у нас накапливаются кадры в очереди.

Как лечить? Очень просто, нужно выкинуть все эти трюки с локальными максимумами, и мерить ровно те участки которые были отправлены, избавиться от смещения.

Да, ту простыню кода которую я приводил можно выкинуть, она нам не понадобится.
KISS (Keep It Simple, Stupid) принцип в действии — все проблемы были от того что выпендривался и усложнял =). Итак, теперь мы будем измерять так:

Кроме того, а нашёл ещё одну ошибку. Формула устранения ошибки неправильная:

Эта формула будет работать, если после попытки устранения ошибки она действительно устранилась. На самом деле мы лишь приблизительно корректируем скорость отдачи данных.

Вместо того чтобы вычитать из предыдущего DTS и предыдущего замера времени, мне следует вычитать из самого первого DTS и самого первого замера времени. Ещё одна глупая ошибка.

Смотрим как теперь ведёт себя очередь:

Ура, теперь всё в порядке. Вдруг мы сломали что-то? Смотрим как ведут себя дельты отправленных UDP пакетов:

Красота, самый большой скачок — 12 миллисекунд. Ну и для интереса, посмотрим как ведёт себя ошибка:

Интересно, что теперь мы только опаздываем и никогда не спешим.


Итак, задача решена, и причём довольно тупым и простым способом. Ура!

В заключение статьи прикладываю код который получился:

-define(REGION_SIZE, 50).
-define(TIME_CORRECTION_COEF, 1.0).
-define(DELTA_BETWEEN_PACKETS, 5).

handle_info(#frame{mpegts = Bin} = Frame, Pusher) when is_binary(Bin) ->
  case put_to_buffer(Pusher, Frame) of
    {ok, Pusher1} -> {noreply, Pusher1};
    {filled_in, Pusher1} ->
      {ok, Pusher2} = send_buffer(Pusher1),
      {noreply, Pusher2}
  end.

put_to_buffer(#pusher{buffer = undefined} = Pusher, Frame) ->
  put_to_buffer(Pusher#pusher{buffer = {undefined, <<>>, 0, undefined}}, Frame);
put_to_buffer(#pusher{buffer = Buffer} = Pusher, Frame) ->
  Buffer1 = case {Buffer, Frame} of
    {{undefined, Data, N, undefined}, #frame{content = video, flavor = Flavor, dts = DTS, mpegts = Bin}}
    when Flavor =:= frame orelse Flavor =:= keyframe ->
      {DTS, <<Data/binary, Bin/binary>>, N+1, DTS};

    {{FirstDTS, Data, N, _}, #frame{content = video, flavor = Flavor, dts = DTS, mpegts = Bin}}
    when Flavor =:= frame orelse Flavor =:= keyframe ->
      {FirstDTS, <<Data/binary, Bin/binary>>, N+1, DTS};

    {{FirstDTS, Data, N, LastDTS}, #frame{mpegts = Bin}} ->
      {FirstDTS, <<Data/binary, Bin/binary>>, N+1, LastDTS}
  end,
  case Buffer1 of
    {_, _, ?REGION_SIZE, _} -> {filled_in, Pusher#pusher{buffer = Buffer1}};
    {_, _, N1, _} when N1 < ?REGION_SIZE -> {ok, Pusher#pusher{buffer = Buffer1}};
    _ -> {error, already_filled}
  end.

send_buffer(Pusher) ->
  {ok, RegionDuration} = buffer_duration(Pusher),
  {ok, Delay, Pusher1} = delay_upto_now(Pusher),
  % if delay is positive, then we're late. Otherwise we're going too quick
  Duration = (RegionDuration - Delay*?TIME_CORRECTION_COEF),
  if Duration < 0 -> {error, region_duration_less_than_delay};
    true -> send_buffer_with_duration(Pusher1, trunc(Duration))
  end.

buffer_duration(#pusher{prev_last_dts = undefined, buffer = {FirstDTS, _, _, LastDTS}}) ->
  {ok, (LastDTS - FirstDTS)};
buffer_duration(#pusher{prev_last_dts = PrevLastDTS, buffer = {_, _, _, LastDTS}}) ->
  {ok, (LastDTS - PrevLastDTS)}.

delay_upto_now(#pusher{first_timestamp = undefined, first_dts = undefined, buffer = {FirstDTS, _, _, LastDTS}} = Pusher) ->
  {Mega, Secs, Micro} = os:timestamp(),
  Timestamp = Mega*1000000*1000000 + Secs*1000000 + Micro,
  {ok, 0, Pusher#pusher{first_timestamp = Timestamp, first_dts = FirstDTS, prev_last_dts = LastDTS}};
delay_upto_now(#pusher{first_timestamp = FirstTimestamp, first_dts = FirstDTS, prev_last_dts = PrevLastDTS, buffer = {_, _, _, LastDTS}} = Pusher) ->
  {Mega, Secs, Micro} = os:timestamp(),
  Timestamp = Mega*1000000*1000000 + Secs*1000000 + Micro,
  TotalRealDuration = (Timestamp - FirstTimestamp)/1000, % milliseconds
  TotalDuration = PrevLastDTS - FirstDTS,
  Delay = TotalRealDuration - TotalDuration,
  {ok, Delay, Pusher#pusher{prev_last_dts = LastDTS}}.

send_buffer_with_duration(#pusher{buffer = {_, Data, _, _}, socket = Socket} = Pusher, Duration) ->
  Chunks = mpegts_decoder:split_chunks(Data),
  % if chunks count is significally less than duration milliseconds,
  % then we should send each chunk with sleep after it
  %
  % if number of chunks is pretty big, then we should split them into groups
  % and send each group with sleep after it
  %
  % it should be done in such way because minimal sleep value is 1 millisecond
  Groups = split_chunks_in_groups(Chunks, Duration),
  GroupDuration = trunc(Duration / length(Groups)),
  lists:foreach(fun (Group) ->
    [gen_udp:send(Socket, Chunk) || Chunk <- Group],
    ok = timer:sleep(GroupDuration)
  end, Groups),
  {ok, Pusher#pusher{buffer = {undefined, <<>>, 0, undefined}}}.

split_chunks_in_groups(Chunks, Duration) ->
  {ok, GroupSize} = best_group_size(length(Chunks), Duration, 1),
  split_in_groups(Chunks, GroupSize, []).

split_in_groups([], _, Acc) -> lists:reverse(Acc);
split_in_groups(Chunks, Size, Acc) when length(Chunks) < Size -> lists:reverse([Chunks | Acc]);
split_in_groups(Chunks, Size, Acc) ->
  {Group, Rest} = lists:split(Size, Chunks),
  split_in_groups(Rest, Size, [Group | Acc]).

best_group_size(ChunksLength, Duration, GroupSize) ->
  GroupDuration = Duration/(ChunksLength/GroupSize),
  if GroupDuration > 50 -> {error, missed_good_mean};
    GroupDuration < ?DELTA_BETWEEN_PACKETS -> best_group_size(ChunksLength, Duration, GroupSize+1);
    true -> {ok, GroupSize}
  end.