728x90

Application와 gstreamer의 pipeline 간에 데이터를 주고받는 내용에 대해 살펴봅니다. 이 예제와 basic tutorial 7이 큰 차이가 없어 이 포스팅에서 한번에 다루겠습니다.

 

Basic tutorial 8: Short-cutting the pipeline

Basic tutorial 8: Short-cutting the pipeline Please port this tutorial to python! Please port this tutorial to javascript! Goal Pipelines constructed with GStreamer do not need to be completely closed. Data can be injected into the pipeline and extracted f

gstreamer.freedesktop.org

Application에서 pipeline으로 데이터를 넣을 수 있는 elements를 appsrc, 그 반대를 appsink라고 합니다. 데이터를 넣는 appsrc는 pull mode / push mode가 있는데 pull의 경우 주기적으로 데이터를 넣어주고 push의 경우 원할 때 넣는 방식으로 진행됩니다. 데이터는 Gstbuffer를 통해 파이프라인을 통과합니다.

QT -= gui

CONFIG += c++17 console
CONFIG -= app_bundle

# You can make your code fail to compile if it uses deprecated APIs.
# In order to do so, uncomment the following line.
#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0

SOURCES += \
        main.cpp

DEPENDPATH += \
    $$PWD/include

INCLUDEPATH += \
    $$PWD/include \
    $$PWD/include/gstreamer-1.0 \
    $$PWD/include/glib-2.0/ \
    $$PWD/include/glib-2.0/include \
    $$PWD/include/orc-0.4 \
    $$PWD/lib/glib-2.0/include \

win32: LIBS += -L$$PWD/lib/ -lgstreamer-1.0 -lgobject-2.0 -lglib-2.0 -lintl -lgstaudio-1.0 -lgstbase-1.0

DESTDIR += \
    $$PWD/bin

# Default rules for deployment.
qnx: target.path = /tmp/$${TARGET}/bin
else: unix:!android: target.path = /opt/$${TARGET}/bin
!isEmpty(target.path): INSTALLS += target

gstaudio-1.0 dll 사용이 필요해 pro 파일도 일부 수정하였습니다.

#include <QCoreApplication>
#include <QDebug>
#include <gst/gst.h>
#include <gst/audio/audio.h>
#include <string.h>

#define CHUNK_SIZE 1024   /* Amount of bytes we are sending in each buffer */
#define SAMPLE_RATE 44100 /* Samples per second we are sending */

/* Structure to contain all our information, so we can pass it to callbacks */
typedef struct _CustomData {
  GstElement *pipeline, *app_source, *tee, *audio_queue, *audio_convert1, *audio_resample, *audio_sink;
  GstElement *video_queue, *audio_convert2, *visual, *video_convert, *video_sink;
  GstElement *app_queue, *app_sink;

  guint64 num_samples;   /* Number of samples generated so far (for timestamp generation) */
  gfloat a, b, c, d;     /* For waveform generation */

  guint sourceid;        /* To control the GSource */

  GMainLoop *main_loop;  /* GLib's Main Loop */
} CustomData;

/* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE bytes into appsrc.
 * The idle handler is added to the mainloop when appsrc requests us to start sending data (need-data signal)
 * and is removed when appsrc has enough data (enough-data signal).
 */
static gboolean push_data (CustomData *data) {
  GstBuffer *buffer;
  GstFlowReturn ret;
  int i;
  GstMapInfo map;
  gint16 *raw;
  gint num_samples = CHUNK_SIZE / 2; /* Because each sample is 16 bits */
  gfloat freq;

  /* Create a new empty buffer */
  buffer = gst_buffer_new_and_alloc (CHUNK_SIZE);

  /* Set its timestamp and duration */
  GST_BUFFER_TIMESTAMP (buffer) = gst_util_uint64_scale (data->num_samples, GST_SECOND, SAMPLE_RATE);
  GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale (num_samples, GST_SECOND, SAMPLE_RATE);

  /* Generate some psychodelic waveforms */
  gst_buffer_map (buffer, &map, GST_MAP_WRITE);
  raw = (gint16 *)map.data;
  data->c += data->d;
  data->d -= data->c / 1000;
  freq = 1100 + 1000 * data->d;
  for (i = 0; i < num_samples; i++) {
    data->a += data->b;
    data->b -= data->a / freq;
    raw[i] = (gint16)(500 * data->a);
  }
  gst_buffer_unmap (buffer, &map);
  data->num_samples += num_samples;

  /* Push the buffer into the appsrc */
  g_signal_emit_by_name (data->app_source, "push-buffer", buffer, &ret);

  /* Free the buffer now that we are done with it */
  gst_buffer_unref (buffer);

  if (ret != GST_FLOW_OK) {
    /* We got some error, stop sending data */
    return FALSE;
  }

  return TRUE;
}

/* This signal callback triggers when appsrc needs data. Here, we add an idle handler
 * to the mainloop to start pushing data into the appsrc */
static void start_feed (GstElement *source, guint size, CustomData *data) {
  if (data->sourceid == 0) {
    g_print ("Start feeding\n");
    data->sourceid = g_idle_add ((GSourceFunc) push_data, data);
  }
}

/* This callback triggers when appsrc has enough data and we can stop sending.
 * We remove the idle handler from the mainloop */
static void stop_feed (GstElement *source, CustomData *data) {
  if (data->sourceid != 0) {
    g_print ("Stop feeding\n");
    g_source_remove (data->sourceid);
    data->sourceid = 0;
  }
}

/* The appsink has received a buffer */
static GstFlowReturn new_sample (GstElement *sink, CustomData *data) {
  GstSample *sample;

  /* Retrieve the buffer */
  g_signal_emit_by_name (sink, "pull-sample", &sample);
  if (sample) {
    /* The only thing we do in this example is print a * to indicate a received buffer */
    g_print ("*");
    gst_sample_unref (sample);
    return GST_FLOW_OK;
  }

  return GST_FLOW_ERROR;
}

/* This function is called when an error message is posted on the bus */
static void error_cb (GstBus *bus, GstMessage *msg, CustomData *data) {
  GError *err;
  gchar *debug_info;

  /* Print error details on the screen */
  gst_message_parse_error (msg, &err, &debug_info);
  g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
  g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
  g_clear_error (&err);
  g_free (debug_info);

  g_main_loop_quit (data->main_loop);
}

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    CustomData data;
      GstPad *tee_audio_pad, *tee_video_pad, *tee_app_pad;
      GstPad *queue_audio_pad, *queue_video_pad, *queue_app_pad;
      GstAudioInfo info;
      GstCaps *audio_caps;
      GstBus *bus;

      /* Initialize custom data structure */
      memset (&data, 0, sizeof (data));
      data.b = 1; /* For waveform generation */
      data.d = 1;

      /* Initialize GStreamer */
      gst_init (&argc, &argv);

      /* Create the elements */
      data.app_source = gst_element_factory_make ("appsrc", "audio_source");
      data.tee = gst_element_factory_make ("tee", "tee");
      data.audio_queue = gst_element_factory_make ("queue", "audio_queue");
      data.audio_convert1 = gst_element_factory_make ("audioconvert", "audio_convert1");
      data.audio_resample = gst_element_factory_make ("audioresample", "audio_resample");
      data.audio_sink = gst_element_factory_make ("autoaudiosink", "audio_sink");
      data.video_queue = gst_element_factory_make ("queue", "video_queue");
      data.audio_convert2 = gst_element_factory_make ("audioconvert", "audio_convert2");
      data.visual = gst_element_factory_make ("wavescope", "visual");
      data.video_convert = gst_element_factory_make ("videoconvert", "video_convert");
      data.video_sink = gst_element_factory_make ("autovideosink", "video_sink");
      data.app_queue = gst_element_factory_make ("queue", "app_queue");
      data.app_sink = gst_element_factory_make ("appsink", "app_sink");

      /* Create the empty pipeline */
      data.pipeline = gst_pipeline_new ("test-pipeline");

      if (!data.pipeline || !data.app_source || !data.tee || !data.audio_queue || !data.audio_convert1 ||
          !data.audio_resample || !data.audio_sink || !data.video_queue || !data.audio_convert2 || !data.visual ||
          !data.video_convert || !data.video_sink || !data.app_queue || !data.app_sink) {
        g_printerr ("Not all elements could be created.\n");
        return -1;
      }

      /* Configure wavescope */
      g_object_set (data.visual, "shader", 0, "style", 0, NULL);

      /* Configure appsrc */
      gst_audio_info_set_format (&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
      audio_caps = gst_audio_info_to_caps (&info);
      g_object_set (data.app_source, "caps", audio_caps, "format", GST_FORMAT_TIME, NULL);
      g_signal_connect (data.app_source, "need-data", G_CALLBACK (start_feed), &data);
      g_signal_connect (data.app_source, "enough-data", G_CALLBACK (stop_feed), &data);

      /* Configure appsink */
      g_object_set (data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
      g_signal_connect (data.app_sink, "new-sample", G_CALLBACK (new_sample), &data);
      gst_caps_unref (audio_caps);

      /* Link all elements that can be automatically linked because they have "Always" pads */
      gst_bin_add_many (GST_BIN (data.pipeline), data.app_source, data.tee, data.audio_queue, data.audio_convert1, data.audio_resample,
          data.audio_sink, data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sink, data.app_queue,
          data.app_sink, NULL);
      if (gst_element_link_many (data.app_source, data.tee, NULL) != TRUE ||
          gst_element_link_many (data.audio_queue, data.audio_convert1, data.audio_resample, data.audio_sink, NULL) != TRUE ||
          gst_element_link_many (data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sink, NULL) != TRUE ||
          gst_element_link_many (data.app_queue, data.app_sink, NULL) != TRUE) {
        g_printerr ("Elements could not be linked.\n");
        gst_object_unref (data.pipeline);
        return -1;
      }

      /* Manually link the Tee, which has "Request" pads */
      tee_audio_pad = gst_element_request_pad_simple (data.tee, "src_%u");
      g_print ("Obtained request pad %s for audio branch.\n", gst_pad_get_name (tee_audio_pad));
      queue_audio_pad = gst_element_get_static_pad (data.audio_queue, "sink");
      tee_video_pad = gst_element_request_pad_simple (data.tee, "src_%u");
      g_print ("Obtained request pad %s for video branch.\n", gst_pad_get_name (tee_video_pad));
      queue_video_pad = gst_element_get_static_pad (data.video_queue, "sink");
      tee_app_pad = gst_element_request_pad_simple (data.tee, "src_%u");
      g_print ("Obtained request pad %s for app branch.\n", gst_pad_get_name (tee_app_pad));
      queue_app_pad = gst_element_get_static_pad (data.app_queue, "sink");
      if (gst_pad_link (tee_audio_pad, queue_audio_pad) != GST_PAD_LINK_OK ||
          gst_pad_link (tee_video_pad, queue_video_pad) != GST_PAD_LINK_OK ||
          gst_pad_link (tee_app_pad, queue_app_pad) != GST_PAD_LINK_OK) {
        g_printerr ("Tee could not be linked\n");
        gst_object_unref (data.pipeline);
        return -1;
      }
      gst_object_unref (queue_audio_pad);
      gst_object_unref (queue_video_pad);
      gst_object_unref (queue_app_pad);

      /* Instruct the bus to emit signals for each received message, and connect to the interesting signals */
      bus = gst_element_get_bus (data.pipeline);
      gst_bus_add_signal_watch (bus);
      g_signal_connect (G_OBJECT (bus), "message::error", (GCallback)error_cb, &data);
      gst_object_unref (bus);

      /* Start playing the pipeline */
      gst_element_set_state (data.pipeline, GST_STATE_PLAYING);

      /* Create a GLib Main Loop and set it to run */
      data.main_loop = g_main_loop_new (NULL, FALSE);
      g_main_loop_run (data.main_loop);

      /* Release the request pads from the Tee, and unref them */
      gst_element_release_request_pad (data.tee, tee_audio_pad);
      gst_element_release_request_pad (data.tee, tee_video_pad);
      gst_element_release_request_pad (data.tee, tee_app_pad);
      gst_object_unref (tee_audio_pad);
      gst_object_unref (tee_video_pad);
      gst_object_unref (tee_app_pad);

      /* Free resources */
      gst_element_set_state (data.pipeline, GST_STATE_NULL);
      gst_object_unref (data.pipeline);


    return a.exec();
}

 

예제의 pipeline은 아래와 같습니다. Tee라는 elements를 통해 하나의 appsrc에서 오는 데이터를 3 갈래로 나눠줍니다.

/* Create the elements */
data.app_source = gst_element_factory_make ("appsrc", "audio_source");
data.tee = gst_element_factory_make ("tee", "tee");
data.audio_queue = gst_element_factory_make ("queue", "audio_queue");
data.audio_convert1 = gst_element_factory_make ("audioconvert", "audio_convert1");
data.audio_resample = gst_element_factory_make ("audioresample", "audio_resample");
data.audio_sink = gst_element_factory_make ("autoaudiosink", "audio_sink");
data.video_queue = gst_element_factory_make ("queue", "video_queue");
data.audio_convert2 = gst_element_factory_make ("audioconvert", "audio_convert2");
data.visual = gst_element_factory_make ("wavescope", "visual");
data.video_convert = gst_element_factory_make ("videoconvert", "video_convert");
data.video_sink = gst_element_factory_make ("autovideosink", "video_sink");
data.app_queue = gst_element_factory_make ("queue", "app_queue");
data.app_sink = gst_element_factory_make ("appsink", "app_sink");

queue를 사용하면 multi thread 환경처럼 사용할 수 있습니다.

appsrc elements에 audio caps를 설정하고 set해줍니다.

/* Configure appsrc */
gst_audio_info_set_format (&info, GST_AUDIO_FORMAT_S16, SAMPLE_RATE, 1, NULL);
audio_caps = gst_audio_info_to_caps (&info);
g_object_set (data.app_source, "caps", audio_caps, "format", GST_FORMAT_TIME, NULL);
g_signal_connect (data.app_source, "need-data", G_CALLBACK (start_feed), &data);
g_signal_connect (data.app_source, "enough-data", G_CALLBACK (stop_feed), &data);

여기서 need-data, enough-data signal이 나오는데 source가 data가 필요할때 need-data signal을 이제 충분히 데이터를 받았을 때 enough-data signal을 발생합니다. 이 기준은 내부 데이터 queue에 있는 데이터 양으로 판단합니다.

appsrc 내부 queue의 데이터가 필요할 때 g_idle_add를 통해 등록한 GLib 함수를 호출하게 됩니다. 이 함수는 mainloop가 idle 상태 즉 우선순위가 급한 일이 없을 때 호출됩니다. 

/* This signal callback triggers when appsrc needs data. Here, we add an idle handler
 * to the mainloop to start pushing data into the appsrc */
static void start_feed (GstElement *source, guint size, CustomData *data) {
    if (data->sourceid == 0) {
        g_print ("Start feeding\n");
        data->sourceid = g_idle_add ((GSourceFunc) push_data, data);
    }
}

push_data 함수에서는 buffer를 생성하여 wave 데이터를 생성하고 push-buffer signal을 발생시켜 buffer를 app_src elements의 source pad로 전달합니다. 이 source pad의 buffer는 결국 Tee->App_queue를 통해 app_sink로 전달될 것입니다.

/* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE bytes into appsrc.
 * The idle handler is added to the mainloop when appsrc requests us to start sending data (need-data signal)
 * and is removed when appsrc has enough data (enough-data signal).
 */
static gboolean push_data (CustomData *data) {
    GstBuffer *buffer;
    GstFlowReturn ret;
    int i;
    GstMapInfo map;
    gint16 *raw;
    gint num_samples = CHUNK_SIZE / 2; /* Because each sample is 16 bits */
    gfloat freq;

    /* Create a new empty buffer */
    buffer = gst_buffer_new_and_alloc (CHUNK_SIZE);

    /* Set its timestamp and duration */
    GST_BUFFER_TIMESTAMP (buffer) = gst_util_uint64_scale (data->num_samples, GST_SECOND, SAMPLE_RATE);
    GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale (num_samples, GST_SECOND, SAMPLE_RATE);

    /* Generate some psychodelic waveforms */
    gst_buffer_map (buffer, &map, GST_MAP_WRITE);
    raw = (gint16 *)map.data;
    data->c += data->d;
    data->d -= data->c / 1000;
    freq = 1100 + 1000 * data->d;
    for (i = 0; i < num_samples; i++) {
        data->a += data->b;
        data->b -= data->a / freq;
        raw[i] = (gint16)(500 * data->a);
    }
    gst_buffer_unmap (buffer, &map);
    data->num_samples += num_samples;

    /* Push the buffer into the appsrc */
    g_signal_emit_by_name (data->app_source, "push-buffer", buffer, &ret);

    /* Free the buffer now that we are done with it */
    gst_buffer_unref (buffer);

    if (ret != GST_FLOW_OK) {
        /* We got some error, stop sending data */
        return FALSE;
    }

    return TRUE;
}

app_sink에 new-sample signal을 등록해서 sample이 들어올 때 new_sample 함수가 호출됩니다. 

/* Configure appsink */
g_object_set (data.app_sink, "emit-signals", TRUE, "caps", audio_caps, NULL);
g_signal_connect (data.app_sink, "new-sample", G_CALLBACK (new_sample), &data);
gst_caps_unref (audio_caps);

/* The appsink has received a buffer */
static GstFlowReturn new_sample (GstElement *sink, CustomData *data) {
    GstSample *sample;

    /* Retrieve the buffer */
    g_signal_emit_by_name (sink, "pull-sample", &sample);
    if (sample) {
        /* The only thing we do in this example is print a * to indicate a received buffer */
        g_print ("*");
        gst_sample_unref (sample);
        return GST_FLOW_OK;
    }

    return GST_FLOW_ERROR;
}

내부 queue가 어느정도 찼다고 판단되면 stop_feed 함수가 호출되고 등록한 함수를 제거합니다.

/* This callback triggers when appsrc has enough data and we can stop sending.
 * We remove the idle handler from the mainloop */
static void stop_feed (GstElement *source, CustomData *data) {
    if (data->sourceid != 0) {
        g_print ("Stop feeding\n");
        g_source_remove (data->sourceid);
        data->sourceid = 0;
    }
}

위와 같이 GLib를 사용하기 위해서는 main loop를 run상태로 설정해줘야 합니다.

/* Create a GLib Main Loop and set it to run */
data.main_loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (data.main_loop);
728x90

+ Recent posts