ENDIF()
ENDIF(PLATFORM STREQUAL "tizen")
+OPTION(WITH_RTSP "Build RTSP module?" ON)
+IF(WITH_RTSP)
+ ADD_DEFINITIONS(-DWITH_RTSP)
+ ADD_SUBDIRECTORY(modules/rtsp)
+ENDIF()
+
ADD_SUBDIRECTORY(common)
--- /dev/null
+IF(PLATFORM STREQUAL "android")
+ MESSAGE ( WARNING "Required to write CMakefile for Android Platform" )
+ELSE(PLATFORM STREQUAL "android")
+ SET(AITT_RTSP aitt-stream-rtsp)
+
+ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+ PKG_CHECK_MODULES(AITT_RTSP_NEEDS REQUIRED
+ gstreamer-1.0
+ gstreamer-video-1.0
+ )
+ INCLUDE_DIRECTORIES(${AITT_RTSP_NEEDS_INCLUDE_DIRS})
+ LINK_DIRECTORIES(${AITT_RTSP_NEEDS_LIBRARY_DIRS})
+
+ FILE(GLOB AITT_RTSP_SRC *.cc)
+ list(REMOVE_ITEM AITT_RTSP_SRC ${CMAKE_CURRENT_SOURCE_DIR}/Module.cc)
+ ADD_LIBRARY(RTSP_OBJ OBJECT ${AITT_RTSP_SRC})
+ ADD_LIBRARY(${AITT_RTSP} SHARED $<TARGET_OBJECTS:RTSP_OBJ> Module.cc ../stream_entry.cc)
+ TARGET_LINK_LIBRARIES(${AITT_RTSP} ${AITT_RTSP_NEEDS_LIBRARIES} ${AITT_COMMON})
+ TARGET_COMPILE_OPTIONS(${AITT_RTSP} PUBLIC ${AITT_RTSP_NEEDS_CFLAGS_OTHER})
+ INSTALL(TARGETS ${AITT_RTSP} DESTINATION ${CMAKE_INSTALL_LIBDIR})
+ENDIF(PLATFORM STREQUAL "android")
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Module.h"
+
+#include <flatbuffers/flexbuffers.h>
+
+#include "aitt_internal.h"
+
+namespace AittRTSPNamespace {
+
+Module::Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role)
+ : discovery_(discovery), topic_(topic), role_(role), info(nullptr), client(nullptr), current_state(0)
+{
+ DBG("RTSP Module constructor : %s, role : %d", topic_.c_str(), role_);
+
+ discovery_cb_ = discovery_.AddDiscoveryCB(topic,
+ std::bind(&Module::DiscoveryMessageCallback, this, std::placeholders::_1,
+ std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
+}
+
+Module::~Module(void)
+{
+ DBG("RTSP Module destroyer : %s, role : %d", topic_.c_str(), role_);
+}
+
+void Module::SetConfig(const std::string &key, const std::string &value)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
+ /* Add rtsp server table */
+ }
+}
+
+void Module::SetConfig(const std::string &key, void *obj)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_SUBSCRIBER) {
+ /* Set evas object */
+ }
+}
+
+void Module::Start(void)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER) {
+ /* Update Rtsp Server table with retained message of MQTT broker */
+ }
+ else {
+ /* Check if the topic exists in server_table to find */
+ /* if exists, then create pipeline using that information */
+ /* if not, wait until discovery message from Publisher */
+ }
+}
+
+void Module::SetStateCallback(StateCallback cb, void *user_data)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+ return;
+
+}
+
+void Module::SetReceiveCallback(ReceiveCallback cb, void *user_data)
+{
+ if (role_ == AittStreamRole::AITT_STREAM_ROLE_PUBLISHER)
+ return;
+}
+
+} // namespace AittRTSPNamespace
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <AittStreamModule.h>
+
+#include <map>
+#include <string>
+
+#include "RTSPClient.h"
+#include "RTSPInfo.h"
+
+using AittDiscovery = aitt::AittDiscovery;
+using AittStreamModule = aitt::AittStreamModule;
+
+#define MODULE_NAMESPACE AittRTSPNamespace
+namespace AittRTSPNamespace {
+class Module : public AittStreamModule {
+ public:
+ explicit Module(AittDiscovery &discovery, const std::string &topic, AittStreamRole role);
+ virtual ~Module(void);
+
+ void SetConfig(const std::string &key, const std::string &value) override;
+ void SetConfig(const std::string &key, void *obj) override;
+ void Start(void) override;
+
+ void SetStateCallback(StateCallback cb, void *user_data) override;
+ void SetReceiveCallback(ReceiveCallback cb, void *user_data) override;
+
+ private:
+ void UpdateDiscoveryMsg();
+ void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
+ const void *msg, const int szmsg);
+
+ AittDiscovery &discovery_;
+ std::string topic_;
+ AittStreamRole role_;
+
+ int discovery_cb_;
+ RTSPInfo *info;
+ RTSPClient *client;
+ int current_state;
+};
+} // namespace AittRTSPNamespace
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RTSPClient.h"
+
+#include "aitt_internal.h"
+
+RTSPClient::RTSPClient(const std::string &_url)
+ : url(_url),
+ pipeline(nullptr),
+ state_cb_user_data(nullptr),
+ data_cb_user_data(nullptr),
+ state(0)
+{
+ DBG("RTSPClient constructor");
+}
+
+RTSPClient::~RTSPClient()
+{
+ DBG("RTSPClient destructor");
+}
+
+void RTSPClient::OnPadAddedCB(GstElement *element, GstPad *pad, gpointer data)
+{
+ GstPad *sinkpad;
+
+ GstPadLinkReturn ret;
+
+ GstElement *decoder = (GstElement *)data;
+
+ /* We can now link this pad with the rtsp-decoder sink pad */
+
+ DBG("Dynamic pad created, linking source/demuxer");
+ DBG("element name: [%s], pad name : [%s]", GST_ELEMENT_NAME(element), GST_PAD_NAME(pad));
+
+ sinkpad = gst_element_get_static_pad(decoder, "sink");
+
+ /* If our converter is already linked, we have nothing to do here */
+ if (gst_pad_is_linked(sinkpad)) {
+ DBG("*** We are already linked ***");
+ gst_object_unref(sinkpad);
+ return;
+ } else {
+ DBG("proceeding to linking ...");
+ }
+
+ ret = gst_pad_link(pad, sinkpad);
+
+ if (GST_PAD_LINK_FAILED(ret)) {
+ ERR("failed to link dynamically");
+ } else {
+ DBG("dynamically link successful");
+ }
+
+ gst_object_unref(sinkpad);
+}
+
+void RTSPClient::VideoStreamDecodedCB(GstElement *object, GstBuffer *buffer, GstPad *pad,
+ gpointer data)
+{
+ if (data == nullptr)
+ return;
+
+ RTSPClient *client = static_cast<RTSPClient *>(data);
+ RTSPFrame frame(buffer, pad);
+
+ /* need queueing and delete old frame */
+ std::lock_guard<std::mutex> auto_lock(client->data_cb_lock);
+ if (client->data_cb != nullptr)
+ client->data_cb(frame, client->data_cb_user_data);
+}
+
+gboolean RTSPClient::MessageReceived(GstBus *bus, GstMessage *message, gpointer data)
+{
+ GstMessageType type = message->type;
+
+ switch (type) {
+ case GST_MESSAGE_ERROR:
+ GError *gerror;
+ gchar *debug;
+
+ gst_message_parse_error(message, &gerror, &debug);
+
+ ERR("Error : %s", debug);
+
+ g_error_free(gerror);
+ g_free(debug);
+
+ break;
+ default:
+ break;
+ }
+
+ return TRUE;
+}
+
+void RTSPClient::CreatePipeline()
+{
+ DBG("Create Pipeline with url : %s", url.c_str());
+
+ GstBus *bus;
+ GstElement *rtspsrc;
+ GstElement *rtph264depay;
+ GstElement *h264parse;
+ GstElement *avdec_h264;
+ GstElement *videoqueue0;
+ GstElement *videoconvert;
+ GstElement *video_sink;
+
+ gst_init(nullptr, nullptr);
+
+ pipeline = gst_pipeline_new("rtsp pipeline");
+ if (!pipeline) {
+ ERR("pipeline is null");
+ return;
+ }
+
+ rtph264depay = gst_element_factory_make("rtph264depay", "rtph264depay0");
+ h264parse = gst_element_factory_make("h264parse", "h264parse0");
+ rtspsrc = gst_element_factory_make("rtspsrc", "rtspsrc0");
+ avdec_h264 = gst_element_factory_make("decodebin", "avdec_h2640");
+ videoqueue0 = gst_element_factory_make("queue", "videoqueue0");
+ videoconvert = gst_element_factory_make("videoconvert", "videoconvert0");
+ video_sink = gst_element_factory_make("fakesink", "fakesink0");
+
+ g_object_set(G_OBJECT(video_sink), "sync", FALSE, NULL);
+ g_object_set(G_OBJECT(rtspsrc), "location", url.c_str(), NULL);
+ g_object_set(G_OBJECT(rtspsrc), "latency", 0, NULL);
+ g_object_set(G_OBJECT(rtspsrc), "buffer-mode", 3, NULL);
+
+ gst_bin_add_many(GST_BIN(pipeline), rtspsrc, rtph264depay, h264parse, avdec_h264, videoqueue0,
+ videoconvert, video_sink, NULL);
+
+ bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
+ gst_bus_add_watch(bus, MessageReceived, pipeline);
+
+ if (!gst_element_link_many(rtph264depay, h264parse, avdec_h264, NULL)) {
+ ERR("Linking part (A)-1 Fail...");
+ return;
+ }
+
+ if (!gst_element_link_many(videoqueue0, videoconvert, video_sink, NULL)) {
+ ERR("Linking part (A)-2 Fail...");
+ return;
+ }
+
+ if (!g_signal_connect(rtspsrc, "pad-added", G_CALLBACK(OnPadAddedCB), rtph264depay)) {
+ ERR("Linking part (1) with part (A)-1 Fail...");
+ }
+
+ if (!g_signal_connect(avdec_h264, "pad-added", G_CALLBACK(OnPadAddedCB), videoqueue0)) {
+ ERR("Linking part (2) with part (A)-2 Fail...");
+ }
+
+ g_object_set(G_OBJECT(video_sink), "signal-handoffs", TRUE, NULL);
+ if (!g_signal_connect(video_sink, "handoff", G_CALLBACK(VideoStreamDecodedCB), this)) {
+ ERR("Linking part (2) with part (B)-2 Fail...");
+ }
+
+ DBG("Pipeline created");
+}
+
+void RTSPClient::DestroyPipeline(void)
+{
+ DBG("Destroy Pipeline");
+}
+
+void RTSPClient::SetStateCallback(const StateCallback &cb, void *user_data)
+{
+ std::lock_guard<std::mutex> auto_lock(state_cb_lock);
+ state_cb = cb;
+ state_cb_user_data = user_data;
+}
+
+void RTSPClient::SetDataCallback(const DataCallback &cb, void *user_data)
+{
+ std::lock_guard<std::mutex> auto_lock(data_cb_lock);
+ data_cb = cb;
+ data_cb_user_data = user_data;
+}
+
+void RTSPClient::UnsetStateCallback()
+{
+ std::lock_guard<std::mutex> auto_lock(state_cb_lock);
+ state_cb = nullptr;
+ state_cb_user_data = nullptr;
+}
+
+void RTSPClient::UnsetClientCallback()
+{
+ std::lock_guard<std::mutex> auto_lock(data_cb_lock);
+ data_cb = nullptr;
+ data_cb_user_data = nullptr;
+}
+
+int RTSPClient::GetState()
+{
+ return state;
+}
+
+void RTSPClient::Start()
+{
+ gst_element_set_state(pipeline, GST_STATE_PLAYING);
+}
+
+void RTSPClient::Stop()
+{
+ gst_element_set_state(pipeline, GST_STATE_NULL);
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <gst/gst.h>
+#include <gst/video/video-info.h>
+
+#include <functional>
+#include <mutex>
+#include <string>
+
+#include "RTSPFrame.h"
+
+class RTSPClient {
+ public:
+ explicit RTSPClient(const std::string &url);
+ ~RTSPClient(void);
+
+ using StateCallback = std::function<void(void *user_data)>;
+ using DataCallback = std::function<void(RTSPFrame &frame, void *user_data)>;
+
+ void SetStateCallback(const StateCallback &cb, void *user_data);
+ void SetDataCallback(const DataCallback &cb, void *user_data);
+ void UnsetStateCallback();
+ void UnsetClientCallback();
+ int GetState();
+
+ void Start();
+ void Stop();
+
+ void CreatePipeline();
+ void DestroyPipeline(void);
+
+ private:
+ static void OnPadAddedCB(GstElement *element, GstPad *pad, gpointer data);
+ static void VideoStreamDecodedCB(GstElement *object, GstBuffer *buffer, GstPad *pad,
+ gpointer data);
+ static gboolean MessageReceived(GstBus *bus, GstMessage *message, gpointer data);
+
+ std::string url;
+ GstElement *pipeline;
+
+ StateCallback state_cb;
+ void *state_cb_user_data;
+
+ DataCallback data_cb;
+ void *data_cb_user_data;
+
+ std::mutex state_cb_lock;
+ std::mutex data_cb_lock;
+
+ int state;
+};
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RTSPFrame.h"
+
+#include <gst/video/video-info.h>
+
+#include "aitt_internal.h"
+
+RTSPFrame::RTSPFrame(GstBuffer *buffer, GstPad *pad)
+ : width(0), height(0), data(nullptr), data_size(0)
+{
+ GetDecodedInfoFromPad(buffer, pad);
+}
+
+void RTSPFrame::GetDecodedInfoFromPad(GstBuffer *buffer, GstPad *pad)
+{
+ RET_IF(pad == nullptr);
+
+ GstCaps *caps = gst_pad_get_current_caps(pad);
+ if (caps == nullptr) {
+ ERR("caps is nullptr");
+ return;
+ }
+
+ GstVideoInfo vinfo;
+ if (!gst_video_info_from_caps(&vinfo, caps)) {
+ gst_caps_unref(caps);
+ ERR("failed to gst_video_info_from_caps()");
+ return;
+ }
+
+ GstStructure *structure = gst_caps_get_structure(caps, 0);
+ const gchar *string_format = gst_structure_get_string(structure, "format");
+ if (string_format) {
+ DBG("format %s", string_format);
+
+ format = (std::string)string_format;
+ }
+
+ gst_caps_unref(caps);
+
+ if (vinfo.width == 0 || vinfo.height == 0) {
+ ERR("width, height is %d, %d", vinfo.width, vinfo.height);
+ return;
+ }
+
+ width = vinfo.width;
+ height = vinfo.height;
+ GstMapInfo map;
+ gst_buffer_map(buffer, &map, GST_MAP_READ);
+ data = map.data;
+ data_size = map.size;
+}
+
+int RTSPFrame::GetWidth(void) const
+{
+ return width;
+}
+
+int RTSPFrame::GetHeight(void) const
+{
+ return height;
+}
+
+int RTSPFrame::GetDataSize(void) const
+{
+ return data_size;
+}
+
+void *RTSPFrame::GetData(void) const
+{
+ return data;
+}
+
+std::string RTSPFrame::GetFormat(void)
+{
+ return format;
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <gst/gst.h>
+
+#include <string>
+
+class RTSPFrame {
+ public:
+ RTSPFrame() = default;
+ RTSPFrame(GstBuffer *buffer, GstPad *pad);
+
+ int GetWidth(void) const;
+ int GetHeight(void) const;
+ int GetDataSize(void) const;
+ void *GetData(void) const;
+ std::string GetFormat(void);
+
+ private:
+ int width;
+ int height;
+ void *data;
+ int data_size;
+ std::string format;
+
+ void GetDecodedInfoFromPad(GstBuffer *buffer, GstPad *pad);
+};
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RTSPInfo.h"
+
+RTSPInfo::RTSPInfo(const std::string &_url, const std::string &_id, const std::string &_password)
+ : url(_url), id(_id), password(_password)
+{
+ // encoding secure id and password
+}
+
+RTSPInfo::~RTSPInfo()
+{
+}
+
+std::string RTSPInfo::GetUrl()
+{
+ return url;
+}
+
+std::string RTSPInfo::GetID()
+{
+ // decoding secure id
+ return id;
+}
+
+std::string RTSPInfo::GetPassword()
+{
+ // decoding secure password
+ return password;
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+class RTSPInfo {
+ public:
+ RTSPInfo(const std::string &url, const std::string &id, const std::string &password);
+ ~RTSPInfo(void);
+
+ std::string GetUrl();
+ std::string GetID();
+ std::string GetPassword();
+
+ private:
+ std::string url;
+ std::string id;
+ std::string password;
+};
BuildRequires: pkgconfig(capi-media-camera)
BuildRequires: pkgconfig(json-glib-1.0)
BuildRequires: pkgconfig(openssl1.1)
+BuildRequires: pkgconfig(gstreamer-1.0)
+BuildRequires: pkgconfig(gstreamer-video-1.0)
%if 0%{gcov}
BuildRequires: lcov
%endif
-DPLATFORM="tizen" \
-DVERSIONING:BOOL=OFF \
-DWITH_WEBRTC:BOOL=ON \
+ -DWITH_RTSP:BOOL=ON \
-DCMAKE_INSTALL_PREFIX:PATH=%{_prefix} \
-DCMAKE_VERBOSE_MAKEFILE=OFF \
-DBUILD_TESTING:BOOL=%{test} \
using namespace aitt;
-TEST(AittStreamTest, Full_P)
+TEST(AittStreamTest, Webrtc_Full_P)
{
try {
AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
FAIL() << "Unexpected exception: " << e.what();
}
}
+
+TEST(AittStreamTest, RTSP_Full_P)
+{
+ try {
+ AITT aitt("streamClientId", LOCAL_IP, AittOption(true, false));
+
+ aitt.Connect();
+
+ AittStream *publisher =
+ aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_PUBLISHER);
+ ASSERT_TRUE(publisher) << "CreateStream() Fail";
+
+ AittStream *subscriber =
+ aitt.CreateStream(AITT_STREAM_TYPE_RTSP, "topic", AITT_STREAM_ROLE_SUBSCRIBER);
+ ASSERT_TRUE(subscriber) << "CreateStream() Fail";
+
+ publisher->SetConfig("key", "value");
+ publisher->Start();
+
+ subscriber->SetConfig("key", "value");
+ subscriber->SetStateCallback([](AittStream *stream, int state, void *user_data) {},
+ (void *)"user_data");
+ subscriber->SetReceiveCallback([](AittStream *stream, void *obj, void *user_data) {},
+ (void *)"user-data");
+ subscriber->Start();
+
+ aitt.DestroyStream(publisher);
+ aitt.DestroyStream(subscriber);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}