summaryrefslogtreecommitdiff
path: root/gst/rtpmanager/gstrtprtxsend.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/gstrtprtxsend.c')
-rwxr-xr-xgst/rtpmanager/gstrtprtxsend.c951
1 files changed, 951 insertions, 0 deletions
diff --git a/gst/rtpmanager/gstrtprtxsend.c b/gst/rtpmanager/gstrtprtxsend.c
new file mode 100755
index 0000000..d9cc69f
--- /dev/null
+++ b/gst/rtpmanager/gstrtprtxsend.c
@@ -0,0 +1,951 @@
+/* RTP Retransmission sender element for GStreamer
+ *
+ * gstrtprtxsend.c:
+ *
+ * Copyright (C) 2013 Collabora Ltd.
+ * @author Julien Isorce <julien.isorce@collabora.co.uk>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * SECTION:element-rtprtxsend
+ *
+ * See #GstRtpRtxReceive for examples
+ *
+ * The purpose of the sender RTX object is to keep a history of RTP packets up
+ * to a configurable limit (max-size-time or max-size-packets). It will listen
+ * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
+ * comes from downstream (#GstRtpSession). When receiving a request it will
+ * look up the requested seqnum in its list of stored packets. If the packet
+ * is available, it will create a RTX packet according to RFC 4588 and send
+ * this as an auxiliary stream. RTX is SSRC-multiplexed
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst.h>
+#include <gst/rtp/gstrtpbuffer.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include "gstrtprtxsend.h"
+
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
+#define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
+
+#define DEFAULT_RTX_PAYLOAD_TYPE 0
+#define DEFAULT_MAX_SIZE_TIME 0
+#define DEFAULT_MAX_SIZE_PACKETS 100
+
+enum
+{
+ PROP_0,
+ PROP_SSRC_MAP,
+ PROP_PAYLOAD_TYPE_MAP,
+ PROP_MAX_SIZE_TIME,
+ PROP_MAX_SIZE_PACKETS,
+ PROP_NUM_RTX_REQUESTS,
+ PROP_NUM_RTX_PACKETS
+};
+
+static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp")
+ );
+
+static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
+ );
+
+static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata);
+
+static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer);
+static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
+
+static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
+static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
+ GstObject * parent, GstPadMode mode, gboolean active);
+
+static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
+ element, GstStateChange transition);
+
+static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_rtp_rtx_send_finalize (GObject * object);
+
+G_DEFINE_TYPE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT);
+
+typedef struct
+{
+ guint16 seqnum;
+ guint32 timestamp;
+ GstBuffer *buffer;
+} BufferQueueItem;
+
+static void
+buffer_queue_item_free (BufferQueueItem * item)
+{
+ gst_buffer_unref (item->buffer);
+ g_slice_free (BufferQueueItem, item);
+}
+
+typedef struct
+{
+ guint32 rtx_ssrc;
+ guint16 seqnum_base, next_seqnum;
+ gint clock_rate;
+
+ /* history of rtp packets */
+ GSequence *queue;
+} SSRCRtxData;
+
+static SSRCRtxData *
+ssrc_rtx_data_new (guint32 rtx_ssrc)
+{
+ SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
+
+ data->rtx_ssrc = rtx_ssrc;
+ data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
+ data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
+
+ return data;
+}
+
+static void
+ssrc_rtx_data_free (SSRCRtxData * data)
+{
+ g_sequence_free (data->queue);
+ g_slice_free (SSRCRtxData, data);
+}
+
+static void
+gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstElementClass *gstelement_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstelement_class = (GstElementClass *) klass;
+
+ gobject_class->get_property = gst_rtp_rtx_send_get_property;
+ gobject_class->set_property = gst_rtp_rtx_send_set_property;
+ gobject_class->finalize = gst_rtp_rtx_send_finalize;
+
+ g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
+ g_param_spec_boxed ("ssrc-map", "SSRC Map",
+ "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
+ " (default = random)", GST_TYPE_STRUCTURE,
+ G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
+ g_param_spec_boxed ("payload-type-map", "Payload Type Map",
+ "Map of original payload types to their retransmission payload types",
+ GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
+ g_param_spec_uint ("max-size-time", "Max Size Time",
+ "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
+ DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
+ g_param_spec_uint ("max-size-packets", "Max Size Packets",
+ "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
+ DEFAULT_MAX_SIZE_PACKETS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
+ g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
+ "Number of retransmission events received", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
+ g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
+ " Number of retransmission packets sent", 0, G_MAXUINT,
+ 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&src_factory));
+ gst_element_class_add_pad_template (gstelement_class,
+ gst_static_pad_template_get (&sink_factory));
+
+ gst_element_class_set_static_metadata (gstelement_class,
+ "RTP Retransmission Sender", "Codec",
+ "Retransmit RTP packets when needed, according to RFC4588",
+ "Julien Isorce <julien.isorce@collabora.co.uk>");
+
+ gstelement_class->change_state =
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
+}
+
+static void
+gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_flush (rtx->queue);
+ g_hash_table_remove_all (rtx->ssrc_data);
+ g_hash_table_remove_all (rtx->rtx_ssrcs);
+ rtx->num_rtx_requests = 0;
+ rtx->num_rtx_packets = 0;
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static void
+gst_rtp_rtx_send_finalize (GObject * object)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
+
+ g_hash_table_unref (rtx->ssrc_data);
+ g_hash_table_unref (rtx->rtx_ssrcs);
+ if (rtx->external_ssrc_map)
+ gst_structure_free (rtx->external_ssrc_map);
+ g_hash_table_unref (rtx->rtx_pt_map);
+ if (rtx->rtx_pt_map_structure)
+ gst_structure_free (rtx->rtx_pt_map_structure);
+ g_object_unref (rtx->queue);
+
+ G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
+}
+
+static void
+gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
+{
+ GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
+
+ rtx->srcpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "src"), "src");
+ GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
+ gst_pad_set_event_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
+ gst_pad_set_activatemode_function (rtx->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
+
+ rtx->sinkpad =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "sink"), "sink");
+ GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
+ GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
+ gst_pad_set_event_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
+ gst_pad_set_chain_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
+ gst_pad_set_chain_list_function (rtx->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
+ gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
+
+ rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
+ NULL, rtx);
+ rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL, (GDestroyNotify) ssrc_rtx_data_free);
+ rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
+ rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+
+ rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
+ rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
+}
+
+static void
+gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
+{
+ GST_OBJECT_LOCK (rtx);
+ gst_data_queue_set_flushing (rtx->queue, flush);
+ gst_data_queue_flush (rtx->queue);
+ GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
+ guint visible, guint bytes, guint64 time, gpointer checkdata)
+{
+ return FALSE;
+}
+
+static void
+gst_rtp_rtx_data_queue_item_free (gpointer item)
+{
+ GstDataQueueItem *data = item;
+ if (data->object)
+ gst_mini_object_unref (data->object);
+ g_slice_free (GstDataQueueItem, data);
+}
+
+static gboolean
+gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
+{
+ GstDataQueueItem *data;
+ gboolean success;
+
+ data = g_slice_new0 (GstDataQueueItem);
+ data->object = GST_MINI_OBJECT (object);
+ data->size = 1;
+ data->duration = 1;
+ data->visible = TRUE;
+ data->destroy = gst_rtp_rtx_data_queue_item_free;
+
+ success = gst_data_queue_push (rtx->queue, data);
+
+ if (!success)
+ data->destroy (data);
+
+ return success;
+}
+
+static guint32
+gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
+ gboolean consider_choice)
+{
+ guint32 ssrc = consider_choice ? choice : g_random_int ();
+
+ /* make sure to be different than any other */
+ while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
+ g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
+ ssrc = g_random_int ();
+ }
+
+ return ssrc;
+}
+
+static SSRCRtxData *
+gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
+{
+ SSRCRtxData *data;
+ guint32 rtx_ssrc = 0;
+ gboolean consider = FALSE;
+
+ if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
+ GUINT_TO_POINTER (ssrc)))) {
+ if (rtx->external_ssrc_map) {
+ gchar *ssrc_str;
+ ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
+ consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
+ &rtx_ssrc);
+ g_free (ssrc_str);
+ }
+ rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
+ data = ssrc_rtx_data_new (rtx_ssrc);
+ g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
+ g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
+ GUINT_TO_POINTER (ssrc));
+ } else {
+ data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
+ }
+ return data;
+}
+
+/* Copy fixed header and extension. Add OSN before to copy payload
+ * Copy memory to avoid to manually copy each rtp buffer field.
+ */
+static GstBuffer *
+gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
+{
+ GstMemory *mem = NULL;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
+ GstBuffer *new_buffer = gst_buffer_new ();
+ GstMapInfo map;
+ guint payload_len = 0;
+ SSRCRtxData *data;
+ guint32 ssrc;
+ guint16 seqnum;
+ guint8 fmtp;
+
+ gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
+
+ /* get needed data from GstRtpRtxSend */
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ ssrc = data->rtx_ssrc;
+ seqnum = data->next_seqnum++;
+ fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
+ GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
+
+ GST_DEBUG_OBJECT (rtx,
+ "retransmit seqnum: %" G_GUINT16_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+ seqnum, ssrc);
+
+ /* gst_rtp_buffer_map does not map the payload so do it now */
+ gst_rtp_buffer_get_payload (&rtp);
+
+ /* copy fixed header */
+ mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
+ gst_buffer_append_memory (new_buffer, mem);
+
+ /* copy extension if any */
+ if (rtp.size[1]) {
+ mem = gst_memory_copy (rtp.map[1].memory, 0, rtp.size[1]);
+ gst_buffer_append_memory (new_buffer, mem);
+ }
+
+ /* copy payload and add OSN just before */
+ payload_len = 2 + rtp.size[2];
+ mem = gst_allocator_alloc (NULL, payload_len, NULL);
+
+ gst_memory_map (mem, &map, GST_MAP_WRITE);
+ GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
+ if (rtp.size[2])
+ memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
+ gst_memory_unmap (mem, &map);
+ gst_buffer_append_memory (new_buffer, mem);
+
+ /* everything needed is copied */
+ gst_rtp_buffer_unmap (&rtp);
+
+ /* set ssrc, seqnum and fmtp */
+ gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
+ gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
+ gst_rtp_buffer_set_seq (&new_rtp, seqnum);
+ gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
+ /* RFC 4588: let other elements do the padding, as normal */
+ gst_rtp_buffer_set_padding (&new_rtp, FALSE);
+ gst_rtp_buffer_unmap (&new_rtp);
+
+ /* Copy over timestamps */
+ gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
+
+ return new_buffer;
+}
+
+static gint
+buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
+ gpointer user_data)
+{
+ /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
+ * it returns negative when seqnum1 > seqnum2 and we want negative
+ * when b > a, i.e. a is smaller, so it comes first in the sequence */
+ return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
+}
+
+static gboolean
+gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ gboolean res;
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CUSTOM_UPSTREAM:
+ {
+ const GstStructure *s = gst_event_get_structure (event);
+
+ /* This event usually comes from the downstream gstrtpsession */
+ if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
+ guint seqnum = 0;
+ guint ssrc = 0;
+ GstBuffer *rtx_buf = NULL;
+
+ /* retrieve seqnum of the packet that need to be retransmitted */
+ if (!gst_structure_get_uint (s, "seqnum", &seqnum))
+ seqnum = -1;
+
+ /* retrieve ssrc of the packet that need to be retransmitted */
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+
+ GST_DEBUG_OBJECT (rtx,
+ "request seqnum: %" G_GUINT32_FORMAT ", ssrc: %" G_GUINT32_FORMAT,
+ seqnum, ssrc);
+
+ GST_OBJECT_LOCK (rtx);
+ /* check if request is for us */
+ if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
+ SSRCRtxData *data;
+ GSequenceIter *iter;
+ BufferQueueItem search_item;
+
+ /* update statistics */
+ ++rtx->num_rtx_requests;
+
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+
+ search_item.seqnum = seqnum;
+ iter = g_sequence_lookup (data->queue, &search_item,
+ (GCompareDataFunc) buffer_queue_items_cmp, NULL);
+ if (iter) {
+ BufferQueueItem *item = g_sequence_get (iter);
+ GST_DEBUG_OBJECT (rtx, "found %" G_GUINT16_FORMAT, item->seqnum);
+ rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
+ }
+ }
+ GST_OBJECT_UNLOCK (rtx);
+
+ if (rtx_buf)
+ gst_rtp_rtx_send_push_out (rtx, rtx_buf);
+
+ gst_event_unref (event);
+ res = TRUE;
+
+ /* This event usually comes from the downstream gstrtpsession */
+ } else if (gst_structure_has_name (s, "GstRTPCollision")) {
+ guint ssrc = 0;
+
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+
+ GST_DEBUG_OBJECT (rtx, "collision ssrc: %" G_GUINT32_FORMAT, ssrc);
+
+ GST_OBJECT_LOCK (rtx);
+
+ /* choose another ssrc for our retransmited stream */
+ if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
+ guint master_ssrc;
+ SSRCRtxData *data;
+
+ master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
+ GUINT_TO_POINTER (ssrc)));
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
+
+ /* change rtx_ssrc and update the reverse map */
+ data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
+ g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
+ g_hash_table_insert (rtx->rtx_ssrcs,
+ GUINT_TO_POINTER (data->rtx_ssrc),
+ GUINT_TO_POINTER (master_ssrc));
+
+ GST_OBJECT_UNLOCK (rtx);
+
+ /* no need to forward to payloader because we make sure to have
+ * a different ssrc
+ */
+ gst_event_unref (event);
+ res = TRUE;
+ } else {
+ /* if master ssrc has collided, remove it from our data, as it
+ * is not going to be used any longer */
+ if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
+ SSRCRtxData *data;
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ g_hash_table_remove (rtx->rtx_ssrcs,
+ GUINT_TO_POINTER (data->rtx_ssrc));
+ g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
+ }
+
+ GST_OBJECT_UNLOCK (rtx);
+
+ /* forward event to payloader in case collided ssrc is
+ * master stream */
+ res = gst_pad_event_default (pad, parent, event);
+ }
+ } else {
+ res = gst_pad_event_default (pad, parent, event);
+ }
+ break;
+ }
+ default:
+ res = gst_pad_event_default (pad, parent, event);
+ break;
+ }
+ return res;
+}
+
+static gboolean
+gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ gst_pad_pause_task (rtx->srcpad);
+ return TRUE;
+ case GST_EVENT_FLUSH_STOP:
+ gst_pad_push_event (rtx->srcpad, event);
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ return TRUE;
+ case GST_EVENT_EOS:
+ GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
+ gst_rtp_rtx_send_push_out (rtx, event);
+ return TRUE;
+ case GST_EVENT_CAPS:
+ {
+ GstCaps *caps;
+ GstStructure *s;
+ guint ssrc;
+ gint payload;
+ gpointer rtx_payload;
+ SSRCRtxData *data;
+
+ gst_event_parse_caps (event, &caps);
+
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_uint (s, "ssrc", &ssrc))
+ ssrc = -1;
+ if (!gst_structure_get_int (s, "payload", &payload))
+ payload = -1;
+
+ if (payload == -1)
+ GST_WARNING_OBJECT (rtx, "No payload in caps");
+
+ GST_OBJECT_LOCK (rtx);
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+ if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
+ GUINT_TO_POINTER (payload), NULL, &rtx_payload))
+ rtx_payload = GINT_TO_POINTER (-1);
+
+ if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
+ GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
+
+ GST_DEBUG_OBJECT (rtx,
+ "got caps for payload: %d->%d, ssrc: %u->%d: %" GST_PTR_FORMAT,
+ payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
+
+ gst_structure_get_int (s, "clock-rate", &data->clock_rate);
+
+ /* The session might need to know the RTX ssrc */
+ caps = gst_caps_copy (caps);
+ gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
+ "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
+
+ if (GPOINTER_TO_INT (rtx_payload) != -1)
+ gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
+ GPOINTER_TO_INT (rtx_payload), NULL);
+
+ GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
+ data->clock_rate, ssrc);
+ GST_OBJECT_UNLOCK (rtx);
+
+ gst_event_unref (event);
+ event = gst_event_new_caps (caps);
+ gst_caps_unref (caps);
+ break;
+ }
+ default:
+ break;
+ }
+ return gst_pad_event_default (pad, parent, event);
+}
+
+/* like rtp_jitter_buffer_get_ts_diff() */
+static guint32
+gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
+{
+ guint64 high_ts, low_ts;
+ BufferQueueItem *high_buf, *low_buf;
+ guint32 result;
+
+ high_buf =
+ g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
+ (data->queue)));
+ low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
+
+ if (!high_buf || !low_buf || high_buf == low_buf)
+ return 0;
+
+ high_ts = high_buf->timestamp;
+ low_ts = low_buf->timestamp;
+
+ /* it needs to work if ts wraps */
+ if (high_ts >= low_ts) {
+ result = (guint32) (high_ts - low_ts);
+ } else {
+ result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
+ }
+
+ /* return value in ms instead of clock ticks */
+ return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
+}
+
+/* Must be called with lock */
+static void
+process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
+{
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ BufferQueueItem *item;
+ SSRCRtxData *data;
+ guint16 seqnum;
+ guint8 payload_type;
+ guint32 ssrc, rtptime;
+
+ /* read the information we want from the buffer */
+ gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
+ seqnum = gst_rtp_buffer_get_seq (&rtp);
+ payload_type = gst_rtp_buffer_get_payload_type (&rtp);
+ ssrc = gst_rtp_buffer_get_ssrc (&rtp);
+ rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ gst_rtp_buffer_unmap (&rtp);
+
+ GST_LOG_OBJECT (rtx,
+ "Processing buffer seqnum: %" G_GUINT16_FORMAT ", ssrc: %"
+ G_GUINT32_FORMAT, seqnum, ssrc);
+
+ /* do not store the buffer if it's payload type is unknown */
+ if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
+ data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
+
+ /* add current rtp buffer to queue history */
+ item = g_slice_new0 (BufferQueueItem);
+ item->seqnum = seqnum;
+ item->timestamp = rtptime;
+ item->buffer = gst_buffer_ref (buffer);
+ g_sequence_append (data->queue, item);
+
+ /* remove oldest packets from history if they are too many */
+ if (rtx->max_size_packets) {
+ while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
+ g_sequence_remove (g_sequence_get_begin_iter (data->queue));
+ }
+ if (rtx->max_size_time) {
+ while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
+ g_sequence_remove (g_sequence_get_begin_iter (data->queue));
+ }
+ }
+}
+
+static GstFlowReturn
+gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ GstFlowReturn ret;
+
+ GST_OBJECT_LOCK (rtx);
+ process_buffer (rtx, buffer);
+ GST_OBJECT_UNLOCK (rtx);
+ ret = gst_pad_push (rtx->srcpad, buffer);
+
+ return ret;
+}
+
+static gboolean
+process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+ process_buffer (user_data, *buffer);
+ return TRUE;
+}
+
+static GstFlowReturn
+gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ GstFlowReturn ret;
+
+ GST_OBJECT_LOCK (rtx);
+ gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
+ GST_OBJECT_UNLOCK (rtx);
+
+ ret = gst_pad_push_list (rtx->srcpad, list);
+
+ return ret;
+}
+
+static void
+gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
+{
+ GstDataQueueItem *data;
+
+ if (gst_data_queue_pop (rtx->queue, &data)) {
+ GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
+
+ if (G_LIKELY (GST_IS_BUFFER (data->object))) {
+ gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
+
+ GST_OBJECT_LOCK (rtx);
+ rtx->num_rtx_packets++;
+ GST_OBJECT_UNLOCK (rtx);
+ } else if (GST_IS_EVENT (data->object)) {
+ gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
+
+ /* after EOS, we should not send any more buffers,
+ * even if there are more requests coming in */
+ if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ }
+ } else {
+ g_assert_not_reached ();
+ }
+
+ data->object = NULL; /* we no longer own that object */
+ data->destroy (data);
+ } else {
+ GST_LOG_OBJECT (rtx, "flushing");
+ gst_pad_pause_task (rtx->srcpad);
+ }
+}
+
+static gboolean
+gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
+ GstPadMode mode, gboolean active)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (parent);
+ gboolean ret = FALSE;
+
+ switch (mode) {
+ case GST_PAD_MODE_PUSH:
+ if (active) {
+ gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+ ret = gst_pad_start_task (rtx->srcpad,
+ (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+ } else {
+ gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+ ret = gst_pad_stop_task (rtx->srcpad);
+ }
+ GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
+ break;
+ default:
+ break;
+ }
+ return ret;
+}
+
+static void
+gst_rtp_rtx_send_get_property (GObject * object,
+ guint prop_id, GValue * value, GParamSpec * pspec)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
+
+ switch (prop_id) {
+ case PROP_PAYLOAD_TYPE_MAP:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_boxed (value, rtx->rtx_pt_map_structure);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->max_size_time);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->max_size_packets);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_NUM_RTX_REQUESTS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_requests);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_NUM_RTX_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ g_value_set_uint (value, rtx->num_rtx_packets);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gboolean
+structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
+{
+ const gchar *field_str;
+ guint field_uint;
+ guint value_uint;
+
+ field_str = g_quark_to_string (field_id);
+ field_uint = atoi (field_str);
+ value_uint = g_value_get_uint (value);
+ g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
+ GUINT_TO_POINTER (value_uint));
+
+ return TRUE;
+}
+
+static void
+gst_rtp_rtx_send_set_property (GObject * object,
+ guint prop_id, const GValue * value, GParamSpec * pspec)
+{
+ GstRtpRtxSend *rtx = GST_RTP_RTX_SEND (object);
+
+ switch (prop_id) {
+ case PROP_SSRC_MAP:
+ GST_OBJECT_LOCK (rtx);
+ if (rtx->external_ssrc_map)
+ gst_structure_free (rtx->external_ssrc_map);
+ rtx->external_ssrc_map = g_value_dup_boxed (value);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_PAYLOAD_TYPE_MAP:
+ GST_OBJECT_LOCK (rtx);
+ if (rtx->rtx_pt_map_structure)
+ gst_structure_free (rtx->rtx_pt_map_structure);
+ rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
+ g_hash_table_remove_all (rtx->rtx_pt_map);
+ gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
+ rtx->rtx_pt_map);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_TIME:
+ GST_OBJECT_LOCK (rtx);
+ rtx->max_size_time = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ case PROP_MAX_SIZE_PACKETS:
+ GST_OBJECT_LOCK (rtx);
+ rtx->max_size_packets = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (rtx);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static GstStateChangeReturn
+gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
+{
+ GstStateChangeReturn ret;
+ GstRtpRtxSend *rtx;
+
+ rtx = GST_RTP_RTX_SEND (element);
+
+ switch (transition) {
+ default:
+ break;
+ }
+
+ ret =
+ GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
+ transition);
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_rtp_rtx_send_reset (rtx);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+gboolean
+gst_rtp_rtx_send_plugin_init (GstPlugin * plugin)
+{
+ GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
+ "rtp retransmission sender");
+
+ return gst_element_register (plugin, "rtprtxsend", GST_RANK_NONE,
+ GST_TYPE_RTP_RTX_SEND);
+}