From: Seungha Yang Date: Fri, 18 Nov 2022 17:56:27 +0000 (+0900) Subject: win32ipc: Add WIN32 shared memory videosrc/sink elements X-Git-Tag: 1.22.0~277 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=4f846540cb02177ad224966f52866f930f46d7d3;p=platform%2Fupstream%2Fgstreamer.git win32ipc: Add WIN32 shared memory videosrc/sink elements Windows supports various IPC methods but that's completely different form that of *nix from implementation point of view. So, instead of adding shared memory functionality to existing shm plugin, new WIN32 shared memory source/sink elements are implemented in this commit. Each videosink (server) and videosrc (client) pair will communicate using WIN32 named pipe and thus user should configure unique/proper pipe name to them (e.g., \\.\pipe\MyPipeName). Once connection is established, videosink will create named shared memory object per frame and client will be able to consume the object (i.e., memory mapped file handle) without additional copy operation. Note that implementations under "protocol" directory are almost pure C/C++ with WIN32 APIs except for a few defines and debug functions. So, applications can take only the protocol part so that the application can send/receive shared-memory object from/to the other end even if it's not an actual GStreamer element. Part-of: --- diff --git a/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json b/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json index 8703b4a..3b147a8 100644 --- a/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json +++ b/subprojects/gst-plugins-bad/docs/plugins/gst_plugins_cache.json @@ -237685,6 +237685,102 @@ "tracers": {}, "url": "Unknown package origin" }, + "win32ipc": { + "description": "Windows IPC plugin", + "elements": { + "win32ipcvideosink": { + "author": "Seungha Yang ", + "description": "Send video frames to win32ipcvideosrc elements", + "hierarchy": [ + "GstWin32IpcVideoSink", + "GstBaseSink", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Sink/Video", + "pad-templates": { + "sink": { + "caps": "video/x-raw:\n format: { ABGR64_LE, BGRA64_LE, AYUV64, ARGB64_LE, ARGB64, RGBA64_LE, ABGR64_BE, BGRA64_BE, ARGB64_BE, RGBA64_BE, GBRA_12LE, GBRA_12BE, Y412_LE, Y412_BE, A444_10LE, GBRA_10LE, A444_10BE, GBRA_10BE, A422_10LE, A422_10BE, A420_10LE, A420_10BE, RGB10A2_LE, BGR10A2_LE, Y410, GBRA, ABGR, VUYA, BGRA, AYUV, ARGB, RGBA, A420, AV12, Y444_16LE, Y444_16BE, v216, P016_LE, P016_BE, Y444_12LE, GBR_12LE, Y444_12BE, GBR_12BE, I422_12LE, I422_12BE, Y212_LE, Y212_BE, I420_12LE, I420_12BE, P012_LE, P012_BE, Y444_10LE, GBR_10LE, Y444_10BE, GBR_10BE, r210, I422_10LE, I422_10BE, NV16_10LE32, Y210, v210, UYVP, I420_10LE, I420_10BE, P010_10LE, NV12_10LE32, NV12_10LE40, P010_10BE, NV12_10BE_8L128, Y444, RGBP, GBR, BGRP, NV24, xBGR, BGRx, xRGB, RGBx, BGR, IYU2, v308, RGB, Y42B, NV61, NV16, VYUY, UYVY, YVYU, YUY2, I420, YV12, NV21, NV12, NV12_8L128, NV12_64Z32, NV12_4L4, NV12_32L32, NV12_16L32S, Y41B, IYU1, YVU9, YUV9, RGB16, BGR16, RGB15, BGR15, RGB8P, GRAY16_LE, GRAY16_BE, GRAY10_LE32, GRAY8 }\n width: [ 1, 2147483647 ]\n height: [ 1, 2147483647 ]\n framerate: [ 0/1, 2147483647/1 ]\n", + "direction": "sink", + "presence": "always" + } + }, + "properties": { + "pipe-name": { + "blurb": "The name of Win32 named pipe to communicate with clients. Validation of the pipe name is caller's responsibility", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "\\\\.\\pipe\\gst.win32.ipc.video", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + } + }, + "rank": "none" + }, + "win32ipcvideosrc": { + "author": "Seungha Yang ", + "description": "Receive video frames from the win32ipcvideosink", + "hierarchy": [ + "GstWin32IpcVideoSrc", + "GstBaseSrc", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Source/Video", + "pad-templates": { + "src": { + "caps": "video/x-raw:\n format: { ABGR64_LE, BGRA64_LE, AYUV64, ARGB64_LE, ARGB64, RGBA64_LE, ABGR64_BE, BGRA64_BE, ARGB64_BE, RGBA64_BE, GBRA_12LE, GBRA_12BE, Y412_LE, Y412_BE, A444_10LE, GBRA_10LE, A444_10BE, GBRA_10BE, A422_10LE, A422_10BE, A420_10LE, A420_10BE, RGB10A2_LE, BGR10A2_LE, Y410, GBRA, ABGR, VUYA, BGRA, AYUV, ARGB, RGBA, A420, AV12, Y444_16LE, Y444_16BE, v216, P016_LE, P016_BE, Y444_12LE, GBR_12LE, Y444_12BE, GBR_12BE, I422_12LE, I422_12BE, Y212_LE, Y212_BE, I420_12LE, I420_12BE, P012_LE, P012_BE, Y444_10LE, GBR_10LE, Y444_10BE, GBR_10BE, r210, I422_10LE, I422_10BE, NV16_10LE32, Y210, v210, UYVP, I420_10LE, I420_10BE, P010_10LE, NV12_10LE32, NV12_10LE40, P010_10BE, NV12_10BE_8L128, Y444, RGBP, GBR, BGRP, NV24, xBGR, BGRx, xRGB, RGBx, BGR, IYU2, v308, RGB, Y42B, NV61, NV16, VYUY, UYVY, YVYU, YUY2, I420, YV12, NV21, NV12, NV12_8L128, NV12_64Z32, NV12_4L4, NV12_32L32, NV12_16L32S, Y41B, IYU1, YVU9, YUV9, RGB16, BGR16, RGB15, BGR15, RGB8P, GRAY16_LE, GRAY16_BE, GRAY10_LE32, GRAY8 }\n width: [ 1, 2147483647 ]\n height: [ 1, 2147483647 ]\n framerate: [ 0/1, 2147483647/1 ]\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "pipe-name": { + "blurb": "The name of Win32 named pipe to communicate with server. Validation of the pipe name is caller's responsibility", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "\\\\.\\pipe\\gst.win32.ipc.video", + "mutable": "ready", + "readable": true, + "type": "gchararray", + "writable": true + }, + "processing-deadline": { + "blurb": "Maximum processing time for a buffer in nanoseconds", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "20000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "playing", + "readable": true, + "type": "guint64", + "writable": true + } + }, + "rank": "none" + } + }, + "filename": "gstwin32ipc", + "license": "LGPL", + "other-types": {}, + "package": "GStreamer Bad Plug-ins", + "source": "gst-plugins-bad", + "tracers": {}, + "url": "Unknown package origin" + }, "winks": { "description": "Windows kernel streaming plugin", "elements": { diff --git a/subprojects/gst-plugins-bad/meson_options.txt b/subprojects/gst-plugins-bad/meson_options.txt index ad12351..55f8e9e 100644 --- a/subprojects/gst-plugins-bad/meson_options.txt +++ b/subprojects/gst-plugins-bad/meson_options.txt @@ -174,6 +174,7 @@ option('webrtc', type : 'feature', value : 'auto', description : 'WebRTC audio/v option('webrtcdsp', type : 'feature', value : 'auto', description : 'Plugin with various audio filters provided by the WebRTC audio processing library') option('wildmidi', type : 'feature', value : 'auto', description : 'WildMidi midi soft synth plugin') option('wic', type : 'feature', value : 'auto', description : 'Windows Imaging Component plugin') +option('win32ipc', type : 'feature', value : 'auto', description : 'Windows IPC plugin') option('winks', type : 'feature', value : 'auto', description : 'Windows Kernel Streaming video source plugin') option('winscreencap', type : 'feature', value : 'auto', description : 'Windows Screen Capture video source plugin') option('x265', type : 'feature', value : 'auto', description : 'HEVC/H.265 video encoder plugin (GPL - only built if gpl option is also enabled!)') diff --git a/subprojects/gst-plugins-bad/sys/meson.build b/subprojects/gst-plugins-bad/sys/meson.build index c95c873..cfe1778 100644 --- a/subprojects/gst-plugins-bad/sys/meson.build +++ b/subprojects/gst-plugins-bad/sys/meson.build @@ -26,5 +26,6 @@ subdir('va') subdir('wasapi') subdir('wasapi2') subdir('wic') +subdir('win32ipc') subdir('winks') subdir('winscreencap') diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.cpp new file mode 100644 index 0000000..a8becfd --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.cpp @@ -0,0 +1,71 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "gstwin32ipcutils.h" +#include +#include +#include + +static ULONG global_index = 0; + +static DWORD +gst_win32_ipc_get_pid (void) +{ + static std::once_flag once_flag; + static DWORD pid = 0; + + std::call_once (once_flag,[&]() { + pid = GetCurrentProcessId (); + }); + + return pid; +} + +/* Create unique prefix for named shared memory */ +gchar * +gst_win32_ipc_get_mmf_prefix (void) +{ + std::string prefix = "Local\\gst.win32.ipc." + + std::to_string (gst_win32_ipc_get_pid ()) + std::string (".") + + std::to_string (InterlockedIncrement (&global_index)) + std::string ("."); + + return g_strdup (prefix.c_str ()); +} + +gboolean +gst_win32_ipc_clock_is_qpc (GstClock * clock) +{ + GstClockType clock_type = GST_CLOCK_TYPE_MONOTONIC; + GstClock *mclock; + + if (G_OBJECT_TYPE (clock) != GST_TYPE_SYSTEM_CLOCK) + return FALSE; + + g_object_get (clock, "clock-type", &clock_type, nullptr); + if (clock_type != GST_CLOCK_TYPE_MONOTONIC) + return FALSE; + + mclock = gst_clock_get_master (clock); + if (!mclock) + return TRUE; + + gst_object_unref (mclock); + + return FALSE; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.h b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.h new file mode 100644 index 0000000..5422c3c --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcutils.h @@ -0,0 +1,30 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#pragma once + +#include + +G_BEGIN_DECLS + +gchar * gst_win32_ipc_get_mmf_prefix (void); + +gboolean gst_win32_ipc_clock_is_qpc (GstClock * clock); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.cpp new file mode 100644 index 0000000..e2a2756 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.cpp @@ -0,0 +1,463 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-win32ipcvideosink + * @title: win32ipcvideosink + * @short_description: Windows shared memory video sink + * + * win32ipcvideosink provides raw video memory to connected win32ipcvideossrc + * elements + * + * ## Example launch line + * ``` + * gst-launch-1.0 videotestsrc ! queue ! win32ipcvideosink + * ``` + * + * Since: 1.22 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstwin32ipcvideosink.h" +#include "gstwin32ipcutils.h" +#include "protocol/win32ipcpipeserver.h" +#include +#include + +GST_DEBUG_CATEGORY_STATIC (gst_win32_ipc_video_sink_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_video_sink_debug + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (GST_VIDEO_CAPS_MAKE (GST_VIDEO_FORMATS_ALL))); + +enum +{ + PROP_0, + PROP_PIPE_NAME, +}; + +#define DEFAULT_PIPE_NAME "\\\\.\\pipe\\gst.win32.ipc.video" + +struct _GstWin32IpcVideoSink +{ + GstBaseSink parent; + + GstVideoInfo info; + Win32IpcPipeServer *pipe; + gchar *mmf_prefix; + guint64 seq_num; + LARGE_INTEGER frequency; + + Win32IpcMmf *mmf; + Win32IpcVideoInfo minfo; + + /* properties */ + gchar *pipe_name; +}; + +static void gst_win32_ipc_video_sink_finalize (GObject * object); +static void gst_win32_ipc_video_sink_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_win32_video_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static GstClock *gst_win32_ipc_video_sink_provide_clock (GstElement * elem); + +static gboolean gst_win32_ipc_video_sink_start (GstBaseSink * sink); +static gboolean gst_win32_ipc_video_sink_stop (GstBaseSink * sink); +static gboolean gst_win32_ipc_video_sink_unlock_stop (GstBaseSink * sink); +static gboolean gst_win32_ipc_video_sink_set_caps (GstBaseSink * sink, + GstCaps * caps); +static void gst_win32_ipc_video_sink_get_time (GstBaseSink * sink, + GstBuffer * buf, GstClockTime * start, GstClockTime * end); +static gboolean gst_win32_ipc_video_sink_propose_allocation (GstBaseSink * sink, + GstQuery * query); +static GstFlowReturn gst_win32_ipc_video_sink_prepare (GstBaseSink * sink, + GstBuffer * buf); +static GstFlowReturn gst_win32_ipc_video_sink_render (GstBaseSink * sink, + GstBuffer * buf); + +#define gst_win32_ipc_video_sink_parent_class parent_class +G_DEFINE_TYPE (GstWin32IpcVideoSink, gst_win32_ipc_video_sink, + GST_TYPE_BASE_SINK); + +static void +gst_win32_ipc_video_sink_class_init (GstWin32IpcVideoSinkClass * klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + GstBaseSinkClass *sink_class = GST_BASE_SINK_CLASS (klass); + + object_class->finalize = gst_win32_ipc_video_sink_finalize; + object_class->set_property = gst_win32_ipc_video_sink_set_property; + object_class->get_property = gst_win32_video_sink_get_property; + + g_object_class_install_property (object_class, PROP_PIPE_NAME, + g_param_spec_string ("pipe-name", "Pipe Name", + "The name of Win32 named pipe to communicate with clients. " + "Validation of the pipe name is caller's responsibility", + DEFAULT_PIPE_NAME, (GParamFlags) (G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_READY))); + + gst_element_class_set_static_metadata (element_class, + "Win32 IPC Video Sink", "Sink/Video", + "Send video frames to win32ipcvideosrc elements", + "Seungha Yang "); + gst_element_class_add_static_pad_template (element_class, &sink_template); + + element_class->provide_clock = + GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_provide_clock); + + sink_class->start = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_start); + sink_class->stop = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_stop); + sink_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_unlock_stop); + sink_class->set_caps = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_set_caps); + sink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_propose_allocation); + sink_class->get_times = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_get_time); + sink_class->prepare = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_prepare); + sink_class->render = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_sink_render); + + GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_video_sink_debug, "win32ipcvideosink", + 0, "win32ipcvideosink"); +} + +static void +gst_win32_ipc_video_sink_init (GstWin32IpcVideoSink * self) +{ + self->pipe_name = g_strdup (DEFAULT_PIPE_NAME); + QueryPerformanceFrequency (&self->frequency); + + GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_REQUIRE_CLOCK); +} + +static void +gst_win32_ipc_video_sink_finalize (GObject * object) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object); + + g_free (self->pipe_name); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_win32_ipc_video_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object); + + switch (prop_id) { + case PROP_PIPE_NAME: + GST_OBJECT_LOCK (self); + g_free (self->pipe_name); + self->pipe_name = g_value_dup_string (value); + if (!self->pipe_name) + self->pipe_name = g_strdup (DEFAULT_PIPE_NAME); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_win32_video_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (object); + + switch (prop_id) { + case PROP_PIPE_NAME: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->pipe_name); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstClock * +gst_win32_ipc_video_sink_provide_clock (GstElement * elem) +{ + return gst_system_clock_obtain (); +} + +static gboolean +gst_win32_ipc_video_sink_start (GstBaseSink * sink) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + + GST_DEBUG_OBJECT (self, "Start"); + + self->pipe = win32_ipc_pipe_server_new (self->pipe_name); + if (!self->pipe) { + GST_ERROR_OBJECT (self, "Couldn't create pipe server"); + return FALSE; + } + + self->mmf_prefix = gst_win32_ipc_get_mmf_prefix (); + self->seq_num = 0; + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_sink_stop (GstBaseSink * sink) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + + GST_DEBUG_OBJECT (self, "Stop"); + + g_clear_pointer (&self->pipe, win32_ipc_pipe_server_unref); + g_clear_pointer (&self->mmf_prefix, g_free); + g_clear_pointer (&self->mmf, win32_ipc_mmf_unref); + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_sink_unlock_stop (GstBaseSink * sink) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + + g_clear_pointer (&self->mmf, win32_ipc_mmf_unref); + + return TRUE; +} + +static void +gst_win32_ipc_video_sink_get_time (GstBaseSink * sink, GstBuffer * buf, + GstClockTime * start, GstClockTime * end) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + GstClockTime timestamp; + + timestamp = GST_BUFFER_PTS (buf); + if (!GST_CLOCK_TIME_IS_VALID (timestamp)) + timestamp = GST_BUFFER_DTS (buf); + + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + *start = timestamp; + if (GST_BUFFER_DURATION_IS_VALID (buf)) { + *end = timestamp + GST_BUFFER_DURATION (buf); + } else if (self->info.fps_n > 0) { + *end = timestamp + + gst_util_uint64_scale_int (GST_SECOND, self->info.fps_d, + self->info.fps_n); + } else if (sink->segment.rate < 0) { + *end = timestamp; + } + } +} + +static gboolean +gst_win32_ipc_video_sink_set_caps (GstBaseSink * sink, GstCaps * caps) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + + if (!gst_video_info_from_caps (&self->info, caps)) { + GST_WARNING_OBJECT (self, "Invalid caps"); + return FALSE; + } + + memset (&self->minfo, 0, sizeof (Win32IpcVideoInfo)); + self->minfo.format = + (Win32IpcVideoFormat) GST_VIDEO_INFO_FORMAT (&self->info); + self->minfo.width = GST_VIDEO_INFO_WIDTH (&self->info); + self->minfo.height = GST_VIDEO_INFO_HEIGHT (&self->info); + self->minfo.fps_n = self->info.fps_n; + self->minfo.fps_d = self->info.fps_d; + self->minfo.par_n = self->info.par_n; + self->minfo.par_d = self->info.par_d; + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_sink_propose_allocation (GstBaseSink * sink, + GstQuery * query) +{ + GstCaps *caps; + GstBufferPool *pool = nullptr; + GstVideoInfo info; + guint size; + gboolean need_pool; + + gst_query_parse_allocation (query, &caps, &need_pool); + if (!caps) { + GST_WARNING_OBJECT (sink, "No caps specified"); + return FALSE; + } + + if (!gst_video_info_from_caps (&info, caps)) { + GST_WARNING_OBJECT (sink, "Invalid caps %" GST_PTR_FORMAT, caps); + return FALSE; + } + + /* the normal size of a frame */ + size = info.size; + if (need_pool) { + GstStructure *config; + + pool = gst_video_buffer_pool_new (); + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_add_option (config, + GST_BUFFER_POOL_OPTION_VIDEO_META); + + size = GST_VIDEO_INFO_SIZE (&info); + + gst_buffer_pool_config_set_params (config, caps, (guint) size, 0, 0); + + if (!gst_buffer_pool_set_config (pool, config)) { + GST_ERROR_OBJECT (pool, "Couldn't set config"); + gst_object_unref (pool); + + return FALSE; + } + } + + gst_query_add_allocation_pool (query, pool, size, 0, 0); + gst_clear_object (&pool); + + gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL); + + return TRUE; +} + +static GstFlowReturn +gst_win32_ipc_video_sink_prepare (GstBaseSink * sink, GstBuffer * buf) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + std::string mmf_name; + GstVideoFrame frame; + GstMapInfo info; + + g_clear_pointer (&self->mmf, win32_ipc_mmf_unref); + + if (!gst_video_frame_map (&frame, &self->info, buf, GST_MAP_READ)) { + GST_ERROR_OBJECT (self, "Couldn't map frame"); + return GST_FLOW_ERROR; + } + + mmf_name = std::string (self->mmf_prefix) + std::to_string (self->seq_num); + self->seq_num++; + + self->mmf = win32_ipc_mmf_alloc (GST_VIDEO_FRAME_SIZE (&frame), + mmf_name.c_str ()); + if (!self->mmf) { + GST_ERROR_OBJECT (self, "Couldn't create memory with name %s", + mmf_name.c_str ()); + gst_video_frame_unmap (&frame); + return GST_FLOW_ERROR; + } + + self->minfo.size = GST_VIDEO_FRAME_SIZE (&frame); + for (guint i = 0; i < GST_VIDEO_FRAME_N_PLANES (&frame); i++) { + self->minfo.offset[i] = GST_VIDEO_FRAME_PLANE_OFFSET (&frame, i); + self->minfo.stride[i] = GST_VIDEO_FRAME_PLANE_STRIDE (&frame, i); + } + gst_video_frame_unmap (&frame); + + gst_buffer_map (buf, &info, GST_MAP_READ); + memcpy (win32_ipc_mmf_get_raw (self->mmf), info.data, self->minfo.size); + gst_buffer_unmap (buf, &info); + + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_win32_ipc_video_sink_render (GstBaseSink * sink, GstBuffer * buf) +{ + GstWin32IpcVideoSink *self = GST_WIN32_IPC_VIDEO_SINK (sink); + LARGE_INTEGER cur_time; + GstClockTime pts; + GstClockTime now_qpc; + GstClockTime buf_pts; + GstClockTime buffer_clock = GST_CLOCK_TIME_NONE; + + QueryPerformanceCounter (&cur_time); + pts = now_qpc = gst_util_uint64_scale (cur_time.QuadPart, GST_SECOND, + self->frequency.QuadPart); + + buf_pts = GST_BUFFER_PTS (buf); + if (!GST_CLOCK_TIME_IS_VALID (buf_pts)) + buf_pts = GST_BUFFER_DTS (buf); + + if (GST_CLOCK_TIME_IS_VALID (buf_pts)) { + buffer_clock = gst_segment_to_running_time (&sink->segment, + GST_FORMAT_TIME, buf_pts) + + GST_ELEMENT_CAST (sink)->base_time + gst_base_sink_get_latency (sink); + } + + if (GST_CLOCK_TIME_IS_VALID (buffer_clock)) { + GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (sink)); + gboolean is_qpc = TRUE; + + is_qpc = gst_win32_ipc_clock_is_qpc (clock); + if (!is_qpc) { + GstClockTime now_gst = gst_clock_get_time (clock); + GstClockTimeDiff converted = buffer_clock; + + GST_LOG_OBJECT (self, "Clock is not QPC"); + + converted -= now_gst; + converted += now_qpc; + + if (converted < 0) { + /* Shouldn't happen */ + GST_WARNING_OBJECT (self, "Negative buffer clock"); + pts = 0; + } else { + pts = converted; + } + } else { + GST_LOG_OBJECT (self, "Clock is QPC already"); + /* buffer clock is already QPC time */ + pts = buffer_clock; + } + gst_object_unref (clock); + } + + self->minfo.qpc = pts; + + if (!self->pipe) { + GST_ERROR_OBJECT (self, "Pipe server was not configured"); + return GST_FLOW_ERROR; + } + + /* win32_ipc_pipe_server_send_mmf() takes ownership of mmf */ + if (!win32_ipc_pipe_server_send_mmf (self->pipe, + (Win32IpcMmf *) g_steal_pointer (&self->mmf), &self->minfo)) { + GST_ERROR_OBJECT (self, "Couldn't send buffer"); + return GST_FLOW_ERROR; + } + + return GST_FLOW_OK; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.h b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.h new file mode 100644 index 0000000..b2cbac0 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosink.h @@ -0,0 +1,32 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#pragma once + +#include +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_WIN32_IPC_VIDEO_SINK (gst_win32_ipc_video_sink_get_type()) +G_DECLARE_FINAL_TYPE (GstWin32IpcVideoSink, gst_win32_ipc_video_sink, + GST, WIN32_IPC_VIDEO_SINK, GstBaseSink); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.cpp new file mode 100644 index 0000000..e90832c --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.cpp @@ -0,0 +1,535 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-win32ipcvideosrc + * @title: win32ipcvideosrc + * @short_description: Windows shared memory video source + * + * win32ipcvideosrc receives raw video frames from win32ipcvideosink + * and outputs the received video frames + * + * ## Example launch line + * ``` + * gst-launch-1.0 win32ipcvideosrc ! queue ! videoconvert ! d3d11videosink + * ``` + * + * Since: 1.22 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstwin32ipcvideosrc.h" +#include "gstwin32ipcutils.h" +#include "protocol/win32ipcpipeclient.h" +#include + +GST_DEBUG_CATEGORY_STATIC (gst_win32_ipc_video_src_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_video_src_debug + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (GST_VIDEO_CAPS_MAKE (GST_VIDEO_FORMATS_ALL))); + +enum +{ + PROP_0, + PROP_PIPE_NAME, + PROP_PROCESSING_DELAY, +}; + +#define DEFAULT_PIPE_NAME "\\\\.\\pipe\\gst.win32.ipc.video" +#define DEFAULT_PROCESSING_DEADLINE (20 * GST_MSECOND) + +struct _GstWin32IpcVideoSrc +{ + GstBaseSrc parent; + + GstVideoInfo info; + + Win32IpcPipeClient *pipe; + GstCaps *caps; + gboolean flushing; + SRWLOCK lock; + gboolean have_video_meta; + gsize offset[GST_VIDEO_MAX_PLANES]; + gint stride[GST_VIDEO_MAX_PLANES]; + LARGE_INTEGER frequency; + GstBufferPool *pool; + + /* properties */ + gchar *pipe_name; + GstClockTime processing_deadline; +}; + +static void gst_win32_ipc_video_src_finalize (GObject * object); +static void gst_win32_ipc_video_src_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_win32_video_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static GstClock *gst_win32_video_src_provide_clock (GstElement * elem); + +static gboolean gst_win32_ipc_video_src_start (GstBaseSrc * src); +static gboolean gst_win32_ipc_video_src_stop (GstBaseSrc * src); +static gboolean gst_win32_ipc_video_src_unlock (GstBaseSrc * src); +static gboolean gst_win32_ipc_video_src_unlock_stop (GstBaseSrc * src); +static gboolean gst_win32_ipc_video_src_query (GstBaseSrc * src, + GstQuery * query); +static gboolean gst_win32_ipc_video_src_decide_allocation (GstBaseSrc * src, + GstQuery * query); +static GstFlowReturn gst_win32_ipc_video_src_create (GstBaseSrc * src, + guint64 offset, guint size, GstBuffer ** buf); + +#define gst_win32_ipc_video_src_parent_class parent_class +G_DEFINE_TYPE (GstWin32IpcVideoSrc, gst_win32_ipc_video_src, GST_TYPE_BASE_SRC); + +static void +gst_win32_ipc_video_src_class_init (GstWin32IpcVideoSrcClass * klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS (klass); + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + GstBaseSrcClass *src_class = GST_BASE_SRC_CLASS (klass); + + object_class->finalize = gst_win32_ipc_video_src_finalize; + object_class->set_property = gst_win32_ipc_video_src_set_property; + object_class->get_property = gst_win32_video_src_get_property; + + g_object_class_install_property (object_class, PROP_PIPE_NAME, + g_param_spec_string ("pipe-name", "Pipe Name", + "The name of Win32 named pipe to communicate with server. " + "Validation of the pipe name is caller's responsibility", + DEFAULT_PIPE_NAME, (GParamFlags) (G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_READY))); + g_object_class_install_property (object_class, PROP_PROCESSING_DELAY, + g_param_spec_uint64 ("processing-deadline", "Processing deadline", + "Maximum processing time for a buffer in nanoseconds", 0, G_MAXUINT64, + DEFAULT_PROCESSING_DEADLINE, (GParamFlags) (G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS | GST_PARAM_MUTABLE_PLAYING))); + + gst_element_class_set_static_metadata (element_class, + "Win32 IPC Video Source", "Source/Video", + "Receive video frames from the win32ipcvideosink", + "Seungha Yang "); + gst_element_class_add_static_pad_template (element_class, &src_template); + + element_class->provide_clock = + GST_DEBUG_FUNCPTR (gst_win32_video_src_provide_clock); + + src_class->start = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_start); + src_class->stop = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_stop); + src_class->unlock = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_unlock); + src_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_unlock_stop); + src_class->query = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_query); + src_class->decide_allocation = + GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_decide_allocation); + src_class->create = GST_DEBUG_FUNCPTR (gst_win32_ipc_video_src_create); + + GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_video_src_debug, "win32ipcvideosrc", + 0, "win32ipcvideosrc"); +} + +static void +gst_win32_ipc_video_src_init (GstWin32IpcVideoSrc * self) +{ + gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME); + gst_base_src_set_live (GST_BASE_SRC (self), TRUE); + self->pipe_name = g_strdup (DEFAULT_PIPE_NAME); + self->processing_deadline = DEFAULT_PROCESSING_DEADLINE; + QueryPerformanceFrequency (&self->frequency); + + GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_REQUIRE_CLOCK); +} + +static void +gst_win32_ipc_video_src_finalize (GObject * object) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object); + + g_free (self->pipe_name); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_win32_ipc_video_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object); + + switch (prop_id) { + case PROP_PIPE_NAME: + GST_OBJECT_LOCK (self); + g_free (self->pipe_name); + self->pipe_name = g_value_dup_string (value); + if (!self->pipe_name) + self->pipe_name = g_strdup (DEFAULT_PIPE_NAME); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PROCESSING_DELAY: + { + GstClockTime prev_val, new_val; + GST_OBJECT_LOCK (self); + prev_val = self->processing_deadline; + new_val = g_value_get_uint64 (value); + self->processing_deadline = new_val; + GST_OBJECT_UNLOCK (self); + + if (prev_val != new_val) { + GST_DEBUG_OBJECT (self, "Posting latency message"); + gst_element_post_message (GST_ELEMENT_CAST (self), + gst_message_new_latency (GST_OBJECT_CAST (self))); + } + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_win32_video_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (object); + + switch (prop_id) { + case PROP_PIPE_NAME: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->pipe_name); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PROCESSING_DELAY: + GST_OBJECT_LOCK (self); + g_value_set_uint64 (value, self->processing_deadline); + GST_OBJECT_UNLOCK (self); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstClock * +gst_win32_video_src_provide_clock (GstElement * elem) +{ + return gst_system_clock_obtain (); +} + +static gboolean +gst_win32_ipc_video_src_start (GstBaseSrc * src) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + + GST_DEBUG_OBJECT (self, "Start"); + + gst_video_info_init (&self->info); + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_src_stop (GstBaseSrc * src) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + + GST_DEBUG_OBJECT (self, "Stop"); + + g_clear_pointer (&self->pipe, win32_ipc_pipe_client_unref); + gst_clear_caps (&self->caps); + if (self->pool) { + gst_buffer_pool_set_active (self->pool, FALSE); + gst_clear_object (&self->pool); + } + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_src_unlock (GstBaseSrc * src) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + + GST_DEBUG_OBJECT (self, "Unlock"); + + AcquireSRWLockExclusive (&self->lock); + self->flushing = TRUE; + if (self->pipe) + win32_ipc_pipe_client_shutdown (self->pipe); + ReleaseSRWLockExclusive (&self->lock); + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_src_unlock_stop (GstBaseSrc * src) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + + GST_DEBUG_OBJECT (self, "Unlock"); + + AcquireSRWLockExclusive (&self->lock); + g_clear_pointer (&self->pipe, win32_ipc_pipe_client_unref); + gst_clear_caps (&self->caps); + self->flushing = FALSE; + ReleaseSRWLockExclusive (&self->lock); + + return TRUE; +} + +static gboolean +gst_win32_ipc_video_src_query (GstBaseSrc * src, GstQuery * query) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY: + { + GST_OBJECT_LOCK (self); + if (GST_CLOCK_TIME_IS_VALID (self->processing_deadline)) { + gst_query_set_latency (query, TRUE, self->processing_deadline, + /* pipe server can hold up to 5 memory objects */ + 5 * self->processing_deadline); + } else { + gst_query_set_latency (query, TRUE, 0, 0); + } + GST_OBJECT_UNLOCK (self); + return TRUE; + } + default: + break; + } + + return GST_BASE_SRC_CLASS (parent_class)->query (src, query); +} + +static gboolean +gst_win32_ipc_video_src_decide_allocation (GstBaseSrc * src, GstQuery * query) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + gboolean ret; + + ret = GST_BASE_SRC_CLASS (parent_class)->decide_allocation (src, query); + if (!ret) + return ret; + + self->have_video_meta = gst_query_find_allocation_meta (query, + GST_VIDEO_META_API_TYPE, nullptr); + GST_DEBUG_OBJECT (self, "Downstream supports video meta: %d", + self->have_video_meta); + + return TRUE; +} + +static GstCaps * +gst_win32_ipc_video_src_update_info_and_get_caps (GstWin32IpcVideoSrc * self, + const Win32IpcVideoInfo * info) +{ + GstVideoInfo vinfo; + + gst_video_info_set_format (&vinfo, (GstVideoFormat) info->format, + info->width, info->height); + vinfo.fps_n = info->fps_n; + vinfo.fps_d = info->fps_d; + vinfo.par_n = info->par_n; + vinfo.par_d = info->par_d; + + if (!self->caps || !gst_video_info_is_equal (&self->info, &vinfo)) { + self->info = vinfo; + return gst_video_info_to_caps (&vinfo); + } + + return nullptr; +} + +static gboolean +gst_win32_ipc_ensure_fallback_pool (GstWin32IpcVideoSrc * self) +{ + GstStructure *config; + + if (self->pool) + return TRUE; + + self->pool = gst_video_buffer_pool_new (); + config = gst_buffer_pool_get_config (self->pool); + gst_buffer_pool_config_set_params (config, self->caps, + GST_VIDEO_INFO_SIZE (&self->info), 0, 0); + if (!gst_buffer_pool_set_config (self->pool, config)) { + GST_ERROR_OBJECT (self, "Couldn't set config"); + goto error; + } + + if (!gst_buffer_pool_set_active (self->pool, TRUE)) { + GST_ERROR_OBJECT (self, "Couldn't set active"); + goto error; + } + + return TRUE; + +error: + gst_clear_object (&self->pool); + return FALSE; +} + +static GstFlowReturn +gst_win32_ipc_video_src_create (GstBaseSrc * src, guint64 offset, guint size, + GstBuffer ** buf) +{ + GstWin32IpcVideoSrc *self = GST_WIN32_IPC_VIDEO_SRC (src); + GstCaps *caps; + Win32IpcMmf *mmf; + Win32IpcVideoInfo info; + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer; + GstClock *clock; + GstClockTime pts; + GstClockTime base_time; + GstClockTime now_qpc; + GstClockTime now_gst; + LARGE_INTEGER cur_time; + gboolean is_qpc = TRUE; + gboolean need_video_meta = FALSE; + + if (!self->pipe) { + self->pipe = win32_ipc_pipe_client_new (self->pipe_name); + if (!self->pipe) { + GST_ERROR_OBJECT (self, "Couldn't create pipe"); + return GST_FLOW_ERROR; + } + } + + if (!win32_ipc_pipe_client_get_mmf (self->pipe, &mmf, &info)) { + AcquireSRWLockExclusive (&self->lock); + if (self->flushing) { + ret = GST_FLOW_FLUSHING; + GST_DEBUG_OBJECT (self, "Flushing"); + } else { + ret = GST_FLOW_EOS; + GST_WARNING_OBJECT (self, "Couldn't get buffer from server"); + } + ReleaseSRWLockExclusive (&self->lock); + return ret; + } + + caps = gst_win32_ipc_video_src_update_info_and_get_caps (self, &info); + for (guint i = 0; i < GST_VIDEO_INFO_N_PLANES (&self->info); i++) { + self->offset[i] = (gsize) info.offset[i]; + self->stride[i] = (gint) info.stride[i]; + + if (self->offset[i] != self->info.offset[i] || + self->stride[i] != self->info.stride[i]) { + need_video_meta = TRUE; + } + } + + if (caps) { + if (self->pool) { + gst_buffer_pool_set_active (self->pool, FALSE); + gst_clear_object (&self->pool); + } + + gst_caps_replace (&self->caps, caps); + GST_DEBUG_OBJECT (self, "Setting caps %" GST_PTR_FORMAT, caps); + gst_pad_set_caps (GST_BASE_SRC_PAD (src), caps); + gst_caps_unref (caps); + } + + if (self->have_video_meta || !need_video_meta) { + buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, + win32_ipc_mmf_get_raw (mmf), win32_ipc_mmf_get_size (mmf), + 0, win32_ipc_mmf_get_size (mmf), mmf, + (GDestroyNotify) win32_ipc_mmf_unref); + + if (self->have_video_meta) { + gst_buffer_add_video_meta_full (buffer, + GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT (&self->info), + GST_VIDEO_INFO_WIDTH (&self->info), + GST_VIDEO_INFO_HEIGHT (&self->info), + GST_VIDEO_INFO_N_PLANES (&self->info), self->offset, self->stride); + } + } else { + GstVideoFrame mmf_frame, frame; + + if (!gst_win32_ipc_ensure_fallback_pool (self)) { + win32_ipc_mmf_unref (mmf); + return GST_FLOW_ERROR; + } + + ret = gst_buffer_pool_acquire_buffer (self->pool, &buffer, nullptr); + if (ret != GST_FLOW_OK) { + GST_ERROR_OBJECT (self, "Couldn't acquire buffer"); + win32_ipc_mmf_unref (mmf); + return GST_FLOW_ERROR; + } + + gst_video_frame_map (&frame, &self->info, buffer, GST_MAP_WRITE); + mmf_frame.info = self->info; + + for (guint i = 0; i < GST_VIDEO_FRAME_N_PLANES (&frame); i++) { + mmf_frame.info.offset[i] = self->offset[i]; + mmf_frame.info.stride[i] = self->stride[i]; + mmf_frame.data[i] = (guint8 *) win32_ipc_mmf_get_raw (mmf) + + self->offset[i]; + } + + gst_video_frame_copy (&frame, &mmf_frame); + gst_video_frame_unmap (&frame); + } + + QueryPerformanceCounter (&cur_time); + now_qpc = gst_util_uint64_scale (cur_time.QuadPart, GST_SECOND, + self->frequency.QuadPart); + clock = gst_element_get_clock (GST_ELEMENT_CAST (self)); + now_gst = gst_clock_get_time (clock); + base_time = GST_ELEMENT_CAST (self)->base_time; + + is_qpc = gst_win32_ipc_clock_is_qpc (clock); + gst_object_unref (clock); + + if (!is_qpc) { + GstClockTimeDiff now_pts = now_gst - base_time + info.qpc - now_qpc; + + if (now_pts >= 0) + pts = now_pts; + else + pts = 0; + } else { + if (info.qpc >= base_time) { + /* Our base_time is also QPC */ + pts = info.qpc - base_time; + } else { + GST_WARNING_OBJECT (self, "Server QPC is smaller than our QPC base time"); + pts = 0; + } + } + + GST_BUFFER_PTS (buffer) = pts; + GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE; + GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE; + + *buf = buffer; + + return GST_FLOW_OK; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.h b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.h new file mode 100644 index 0000000..ca835da --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/gstwin32ipcvideosrc.h @@ -0,0 +1,32 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#pragma once + +#include +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_WIN32_IPC_VIDEO_SRC (gst_win32_ipc_video_src_get_type()) +G_DECLARE_FINAL_TYPE (GstWin32IpcVideoSrc, gst_win32_ipc_video_src, + GST, WIN32_IPC_VIDEO_SRC, GstBaseSrc); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/meson.build b/subprojects/gst-plugins-bad/sys/win32ipc/meson.build new file mode 100644 index 0000000..5ffdf5b --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/meson.build @@ -0,0 +1,39 @@ +win32ipc_sources = [ + 'protocol/win32ipcmmf.cpp', + 'protocol/win32ipcpipeclient.cpp', + 'protocol/win32ipcpipeserver.cpp', + 'protocol/win32ipcprotocol.cpp', + 'protocol/win32ipcutils.cpp', + 'gstwin32ipcutils.cpp', + 'gstwin32ipcvideosink.cpp', + 'gstwin32ipcvideosrc.cpp', + 'plugin.cpp', +] + +if host_system != 'windows' or get_option('win32ipc').disabled() + subdir_done() +endif + +code = ''' +#include +#if !(WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_APP) && !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP)) +#error "Not building for UWP" +#endif''' +if cc.compiles(code, name : 'building for UWP') + if get_option('win32ipc').enabled() + error('win32ipc plugin does not support UWP') + else + subdir_done() + endif +endif + +gstwin32ipc = library('gstwin32ipc', + win32ipc_sources, + c_args : gst_plugins_bad_args, + cpp_args: gst_plugins_bad_args, + include_directories : [configinc], + dependencies : [gstbase_dep, gstvideo_dep], + install : true, + install_dir : plugins_install_dir, +) +plugins += [gstwin32ipc] diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/plugin.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/plugin.cpp new file mode 100644 index 0000000..b086986 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/plugin.cpp @@ -0,0 +1,56 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * plugin-win32ipc: + * + * Windows IPC plugin + * + * Since: 1.22 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include "gstwin32ipcvideosink.h" +#include "gstwin32ipcvideosrc.h" + +GST_DEBUG_CATEGORY (gst_win32_ipc_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_debug + +static gboolean +plugin_init (GstPlugin * plugin) +{ + GST_DEBUG_CATEGORY_INIT (gst_win32_ipc_debug, "win32ipc", 0, "win32ipc"); + + gst_element_register (plugin, + "win32ipcvideosink", GST_RANK_NONE, GST_TYPE_WIN32_IPC_VIDEO_SINK); + gst_element_register (plugin, + "win32ipcvideosrc", GST_RANK_NONE, GST_TYPE_WIN32_IPC_VIDEO_SRC); + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + win32ipc, + "Windows IPC plugin", + plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.cpp new file mode 100644 index 0000000..713a910 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.cpp @@ -0,0 +1,241 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#include "win32ipcmmf.h" +#include "win32ipcutils.h" +#include + +GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_debug + +struct Win32IpcMmf +{ + explicit Win32IpcMmf (HANDLE f, void * b, UINT32 s, const std::string & n) + : file (f), buffer (b), size (s), name (n), ref_count (1) + { + } + + ~Win32IpcMmf () + { + GST_TRACE ("Freeing %p (%s)", this, name.c_str ()); + if (buffer) + UnmapViewOfFile (buffer); + if (file) + CloseHandle (file); + } + + HANDLE file; + void *buffer; + UINT32 size; + std::string name; + ULONG ref_count; +}; + +static Win32IpcMmf * +win32_pic_mmf_new (HANDLE file, UINT32 size, const char * name) +{ + Win32IpcMmf *self; + void *buffer; + std::string msg; + UINT err_code; + + buffer = MapViewOfFile (file, FILE_MAP_ALL_ACCESS, 0, 0, size); + if (!buffer) { + err_code = GetLastError (); + msg = win32_ipc_error_message (err_code); + GST_ERROR ("MapViewOfFile failed with 0x%x (%s)", + err_code, msg.c_str ()); + CloseHandle (file); + return nullptr; + } + + self = new Win32IpcMmf (file, buffer, size, name); + + return self; +} + +/** + * win32_ipc_mmf_alloc: + * @size: Size of memory to allocate + * @name: The name of Memory Mapped File + * + * Creates named shared memory + * + * Returns: a new Win32IpcMmf object + */ +Win32IpcMmf * +win32_ipc_mmf_alloc (UINT32 size, const char * name) +{ + HANDLE file; + std::string msg; + UINT err_code; + + if (!size) { + GST_ERROR ("Zero size is not allowed"); + return nullptr; + } + + if (!name) { + GST_ERROR ("Name must be specified"); + return nullptr; + } + + file = CreateFileMappingA (INVALID_HANDLE_VALUE, nullptr, + PAGE_READWRITE | SEC_COMMIT, 0, size, name); + if (!file) { + err_code = GetLastError (); + msg = win32_ipc_error_message (err_code); + GST_ERROR ("CreateFileMappingA failed with 0x%x (%s)", + err_code, msg.c_str ()); + return nullptr; + } + + /* The name is already occupied, it's caller's fault... */ + if (GetLastError () == ERROR_ALREADY_EXISTS) { + GST_ERROR ("File already exists"); + CloseHandle (file); + return nullptr; + } + + return win32_pic_mmf_new (file, size, name); +} + +/** + * win32_ipc_mmf_open: + * @size: Size of memory to allocate + * @name: The name of Memory Mapped File + * + * Opens named shared memory + * + * Returns: a new Win32IpcMmf object + */ +Win32IpcMmf * +win32_ipc_mmf_open (UINT32 size, const char * name) +{ + HANDLE file; + std::string msg; + UINT err_code; + + if (!size) { + GST_ERROR ("Zero size is not allowed"); + return nullptr; + } + + if (!name) { + GST_ERROR ("Name must be specified"); + return nullptr; + } + + file = OpenFileMappingA (FILE_MAP_ALL_ACCESS, FALSE, name); + if (!file) { + err_code = GetLastError (); + msg = win32_ipc_error_message (err_code); + GST_ERROR ("OpenFileMappingA failed with 0x%x (%s)", + err_code, msg.c_str ()); + return nullptr; + } + + return win32_pic_mmf_new (file, size, name); +} + +/** + * win32_ipc_mmf_get_name: + * @mmf: a Win32IpcMmf object + * + * Returns: the name of @mmf + */ +const char * +win32_ipc_mmf_get_name (Win32IpcMmf * mmf) +{ + if (!mmf) + return nullptr; + + return mmf->name.c_str (); +} + +/** + * win32_ipc_mmf_get_size: + * @mmf: a Win32IpcMmf object + * + * Returns: the size of allocated memory + */ +UINT32 +win32_ipc_mmf_get_size (Win32IpcMmf * mmf) +{ + if (!mmf) + return 0; + + return mmf->size; +} + +/** + * win32_ipc_mmf_get_raw: + * @mmf: a Win32IpcMmf object + * + * Returns: the address of allocated memory + */ +void * +win32_ipc_mmf_get_raw (Win32IpcMmf * mmf) +{ + if (!mmf) + return nullptr; + + return mmf->buffer; +} + +/** + * win32_ipc_mmf_ref: + * @mmf: a Win32IpcMmf object + * + * Increase ref count + */ +Win32IpcMmf * +win32_ipc_mmf_ref (Win32IpcMmf * mmf) +{ + if (!mmf) + return nullptr; + + InterlockedIncrement (&mmf->ref_count); + + return mmf; +} + +/** + * win32_ipc_mmf_unref: + * @mmf: a Win32IpcMmf object + * + * Decrease ref count + */ +void +win32_ipc_mmf_unref (Win32IpcMmf * mmf) +{ + ULONG count; + + if (!mmf) + return; + + count = InterlockedDecrement (&mmf->ref_count); + if (count == 0) + delete mmf; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.h b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.h new file mode 100644 index 0000000..51b0ea6 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcmmf.h @@ -0,0 +1,50 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#pragma once + +#include +#include + +G_BEGIN_DECLS + +struct Win32IpcMmf; + +Win32IpcMmf * win32_ipc_mmf_alloc (UINT32 size, + const char * name); + +Win32IpcMmf * win32_ipc_mmf_open (UINT32 size, + const char * name); + +const char * win32_ipc_mmf_get_name (Win32IpcMmf * mmf); + +UINT32 win32_ipc_mmf_get_size (Win32IpcMmf * mmf); + +void * win32_ipc_mmf_get_raw (Win32IpcMmf * mmf); + +Win32IpcMmf * win32_ipc_mmf_ref (Win32IpcMmf * mmf); + +void win32_ipc_mmf_unref (Win32IpcMmf * mmf); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.cpp new file mode 100644 index 0000000..11d6934 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.cpp @@ -0,0 +1,448 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#include "win32ipcpipeclient.h" +#include "win32ipcutils.h" +#include +#include +#include +#include +#include +#include + +GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_debug + +#define CONN_BUFFER_SIZE 1024 + +struct MmfInfo +{ + Win32IpcMmf *mmf; + Win32IpcVideoInfo info; +}; + +struct ClientConnection : public OVERLAPPED +{ + ClientConnection () : pipe (INVALID_HANDLE_VALUE), to_read (0), to_write (0), + seq_num (0) + { + OVERLAPPED *parent = dynamic_cast (this); + parent->Internal = 0; + parent->InternalHigh = 0; + parent->Offset = 0; + parent->OffsetHigh = 0; + } + + Win32IpcPipeClient *self; + HANDLE pipe; + UINT8 client_msg[CONN_BUFFER_SIZE]; + UINT32 to_read; + UINT8 server_msg[CONN_BUFFER_SIZE]; + UINT32 to_write; + UINT64 seq_num; +}; + +struct Win32IpcPipeClient +{ + explicit Win32IpcPipeClient (const std::string & n) + : name (n), ref_count(1), last_err (ERROR_SUCCESS) + { + cancellable = CreateEventA (nullptr, TRUE, FALSE, nullptr); + conn.pipe = INVALID_HANDLE_VALUE; + conn.self = this; + } + + ~Win32IpcPipeClient () + { + GST_DEBUG ("Free client %p", this); + win32_ipc_pipe_client_shutdown (this); + CloseHandle (cancellable); + } + + std::mutex lock; + std::condition_variable cond; + std::unique_ptr thread; + std::queue queue; + std::string name; + + ULONG ref_count; + HANDLE cancellable; + UINT last_err; + ClientConnection conn; +}; + +static DWORD +win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self); + +static VOID WINAPI +win32_ipc_pipe_client_send_read_done_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ClientConnection *conn = (ClientConnection *) overlapped; + Win32IpcPipeClient *self = conn->self; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + self->last_err = error_code; + GST_WARNING ("READ-DONE failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + goto error; + } + + GST_TRACE ("READ-DONE sent"); + + self->last_err = win32_ipc_pipe_client_send_need_data_async (self); + if (self->last_err != ERROR_SUCCESS) + goto error; + + /* All done, back to need-data state */ + return; + +error: + SetEvent (self->cancellable); +} + +static DWORD +win32_ipc_pipe_client_send_read_done_async (Win32IpcPipeClient * self) +{ + ClientConnection *conn = &self->conn; + + conn->to_write = win32_ipc_pkt_build_read_done (conn->client_msg, + CONN_BUFFER_SIZE, conn->seq_num); + if (conn->to_write == 0) { + GST_ERROR ("Couldn't build READ-DONE pkt"); + return ERROR_BAD_FORMAT; + } + + GST_TRACE ("Sending READ-DONE"); + + if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write, + (OVERLAPPED *) conn, win32_ipc_pipe_client_send_read_done_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + + GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + return last_err; + } + + return ERROR_SUCCESS; +} + +static VOID WINAPI +win32_ipc_pipe_client_receive_have_data_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ClientConnection *conn = (ClientConnection *) overlapped; + Win32IpcPipeClient *self = conn->self; + char mmf_name[1024] = { '\0', }; + Win32IpcVideoInfo info; + Win32IpcMmf *mmf; + MmfInfo minfo; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + self->last_err = error_code; + GST_WARNING ("HAVE-DATA failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + goto error; + } + + if (!win32_ipc_pkt_parse_have_data (conn->server_msg, n_bytes, + &conn->seq_num, mmf_name, &info)) { + self->last_err = ERROR_BAD_FORMAT; + GST_WARNING ("Couldn't parse HAVE-DATA pkg"); + goto error; + } + + mmf = win32_ipc_mmf_open (info.size, mmf_name); + if (!mmf) { + GST_ERROR ("Couldn't open file %s", mmf_name); + self->last_err = ERROR_BAD_FORMAT; + goto error; + } + + GST_TRACE ("Got HAVE-DATA %s", mmf_name); + + minfo.mmf = mmf; + minfo.info = info; + + { + std::lock_guard lk (self->lock); + /* Drops too old data */ + while (self->queue.size () > 5) { + MmfInfo info = self->queue.front (); + + self->queue.pop (); + win32_ipc_mmf_unref (info.mmf); + } + + self->queue.push (minfo); + self->cond.notify_all (); + } + + self->last_err = win32_ipc_pipe_client_send_read_done_async (self); + if (self->last_err != ERROR_SUCCESS) + goto error; + + return; + +error: + SetEvent (self->cancellable); +} + +static DWORD +win32_ipc_pipe_client_receive_have_data_async (Win32IpcPipeClient * self) +{ + ClientConnection *conn = &self->conn; + + GST_TRACE ("Waiting HAVE-DATA"); + + if (!ReadFileEx (conn->pipe, conn->server_msg, CONN_BUFFER_SIZE, + (OVERLAPPED *) conn, win32_ipc_pipe_client_receive_have_data_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + return last_err; + } + + return ERROR_SUCCESS; +} + +static VOID WINAPI +pipe_clinet_send_need_data_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ClientConnection *conn = (ClientConnection *) overlapped; + Win32IpcPipeClient *self = conn->self; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + self->last_err = error_code; + GST_WARNING ("NEED-DATA failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + goto error; + } + + self->last_err = win32_ipc_pipe_client_receive_have_data_async (self); + if (self->last_err != ERROR_SUCCESS) + goto error; + + return; + +error: + SetEvent (self->cancellable); +} + +static DWORD +win32_ipc_pipe_client_send_need_data_async (Win32IpcPipeClient * self) +{ + ClientConnection *conn = &self->conn; + + conn->to_write = win32_ipc_pkt_build_need_data (conn->client_msg, + CONN_BUFFER_SIZE, conn->seq_num); + if (conn->to_write == 0) { + GST_ERROR ("Couldn't build NEED-DATA pkt"); + return ERROR_BAD_FORMAT; + } + + GST_TRACE ("Sending NEED-DATA"); + + if (!WriteFileEx (conn->pipe, conn->client_msg, conn->to_write, + (OVERLAPPED *) conn, pipe_clinet_send_need_data_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + return last_err; + } + + return ERROR_SUCCESS; +} + +static VOID +win32_ipc_pipe_client_loop (Win32IpcPipeClient * self) +{ + DWORD mode = PIPE_READMODE_MESSAGE; + std::unique_lock lk (self->lock); + ClientConnection *conn = &self->conn; + + conn->pipe = CreateFileA (self->name.c_str (), + GENERIC_READ | GENERIC_WRITE, 0, nullptr, OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, nullptr); + self->last_err = GetLastError (); + if (conn->pipe == INVALID_HANDLE_VALUE) { + std::string msg = win32_ipc_error_message (self->last_err); + GST_WARNING ("CreateFileA failed with 0x%x (%s)", self->last_err, + msg.c_str ()); + self->cond.notify_all (); + return; + } + + if (!SetNamedPipeHandleState (conn->pipe, &mode, nullptr, nullptr)) { + self->last_err = GetLastError (); + std::string msg = win32_ipc_error_message (self->last_err); + GST_WARNING ("SetNamedPipeHandleState failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + CloseHandle (conn->pipe); + conn->pipe = INVALID_HANDLE_VALUE; + self->cond.notify_all (); + return; + } + + self->last_err = ERROR_SUCCESS; + self->cond.notify_all (); + lk.unlock (); + + /* Once connection is established, send NEED-DATA message to server, + * and then it will loop NEED-DATA -> HAVE-DATA -> READ-DONE */ + self->last_err = win32_ipc_pipe_client_send_need_data_async (self); + if (self->last_err != ERROR_SUCCESS) + goto out; + + do { + /* Enters alertable thread state and wait for I/O completion event + * or cancellable event */ + DWORD ret = WaitForSingleObjectEx (self->cancellable, INFINITE, TRUE); + if (ret == WAIT_OBJECT_0) { + GST_DEBUG ("Operation cancelled"); + CancelIoEx (conn->pipe, (OVERLAPPED *) &conn); + break; + } else if (ret != WAIT_IO_COMPLETION) { + GST_WARNING ("Unexpected wait return 0x%x", (UINT) ret); + CancelIoEx (conn->pipe, (OVERLAPPED *) &conn); + break; + } + } while (true); + +out: + if (conn->pipe != INVALID_HANDLE_VALUE) + CloseHandle (conn->pipe); + + lk.lock (); + self->last_err = ERROR_OPERATION_ABORTED; + conn->pipe = INVALID_HANDLE_VALUE; + self->cond.notify_all (); +} + +static BOOL +win32_ipc_pipe_client_run (Win32IpcPipeClient * self) +{ + std::unique_lock lk (self->lock); + + self->thread = std::make_unique + (std::thread (win32_ipc_pipe_client_loop, self)); + self->cond.wait (lk); + + if (self->last_err != ERROR_SUCCESS) { + self->thread->join (); + self->thread = nullptr; + return FALSE; + } + + return TRUE; +} + +Win32IpcPipeClient * +win32_ipc_pipe_client_new (const char * pipe_name) +{ + Win32IpcPipeClient *self; + + if (!pipe_name) { + GST_ERROR ("Pipe name must be specified"); + return nullptr; + } + + self = new Win32IpcPipeClient (pipe_name); + + if (!win32_ipc_pipe_client_run (self)) { + win32_ipc_pipe_client_unref (self); + return nullptr; + } + + return self; +} + +Win32IpcPipeClient * +win32_ipc_pipe_client_ref (Win32IpcPipeClient * client) +{ + InterlockedIncrement (&client->ref_count); + + return client; +} + +void +win32_ipc_pipe_client_unref (Win32IpcPipeClient * client) +{ + ULONG ref_count; + + ref_count = InterlockedDecrement (&client->ref_count); + if (ref_count == 0) + delete client; +} + +void +win32_ipc_pipe_client_shutdown (Win32IpcPipeClient * client) +{ + GST_DEBUG ("Shutting down %p", client); + + SetEvent (client->cancellable); + if (client->thread) { + client->thread->join (); + client->thread = nullptr; + } + + std::lock_guard lk (client->lock); + client->last_err = ERROR_OPERATION_ABORTED; + while (!client->queue.empty ()) { + MmfInfo info = client->queue.front (); + + client->queue.pop (); + win32_ipc_mmf_unref (info.mmf); + } + client->cond.notify_all (); +} + +BOOL +win32_ipc_pipe_client_get_mmf (Win32IpcPipeClient * client, Win32IpcMmf ** mmf, + Win32IpcVideoInfo * info) +{ + std::unique_lock lk (client->lock); + if (client->last_err != ERROR_SUCCESS) { + GST_WARNING ("Last error code was 0x%x", client->last_err); + return FALSE; + } + + while (client->queue.empty () && client->last_err == ERROR_SUCCESS) + client->cond.wait (lk); + + if (client->last_err != ERROR_SUCCESS || client->queue.empty ()) + return FALSE; + + MmfInfo mmf_info = client->queue.front (); + client->queue.pop (); + + *mmf = mmf_info.mmf; + *info = mmf_info.info; + + return TRUE; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.h b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.h new file mode 100644 index 0000000..4188832 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeclient.h @@ -0,0 +1,49 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#pragma once + +#include +#include +#include "win32ipcmmf.h" +#include "win32ipcprotocol.h" +#include + +G_BEGIN_DECLS + +struct Win32IpcPipeClient; + +Win32IpcPipeClient * win32_ipc_pipe_client_new (const char * pipe_name); + +Win32IpcPipeClient * win32_ipc_pipe_client_ref (Win32IpcPipeClient * client); + +void win32_ipc_pipe_client_unref (Win32IpcPipeClient * client); + +void win32_ipc_pipe_client_shutdown (Win32IpcPipeClient * client); + +BOOL win32_ipc_pipe_client_get_mmf (Win32IpcPipeClient * client, + Win32IpcMmf ** mmf, + Win32IpcVideoInfo * info); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.cpp new file mode 100644 index 0000000..53ebdf4 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.cpp @@ -0,0 +1,550 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#include "win32ipcpipeserver.h" +#include "win32ipcutils.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +GST_DEBUG_CATEGORY_EXTERN (gst_win32_ipc_debug); +#define GST_CAT_DEFAULT gst_win32_ipc_debug + +#define CONN_BUFFER_SIZE 1024 + +struct MmfInfo +{ + explicit MmfInfo (Win32IpcMmf * m, const Win32IpcVideoInfo * i, UINT64 s) + { + mmf = m; + info = *i; + seq_num = s; + } + + ~MmfInfo() + { + if (mmf) + win32_ipc_mmf_unref (mmf); + } + + Win32IpcMmf *mmf = nullptr; + Win32IpcVideoInfo info; + UINT64 seq_num; +}; + +struct ServerConnection : public OVERLAPPED +{ + ServerConnection(Win32IpcPipeServer * server, HANDLE p) + : self(server), pipe(p) + { + OVERLAPPED *parent = dynamic_cast (this); + parent->Internal = 0; + parent->InternalHigh = 0; + parent->Offset = 0; + parent->OffsetHigh = 0; + } + + Win32IpcPipeServer *self; + std::shared_ptr minfo; + HANDLE pipe = INVALID_HANDLE_VALUE; + UINT8 client_msg[CONN_BUFFER_SIZE]; + UINT32 to_read = 0; + UINT8 server_msg[CONN_BUFFER_SIZE]; + UINT32 to_write = 0; + UINT64 seq_num = 0; + BOOL pending_have_data = FALSE; +}; + +struct Win32IpcPipeServer +{ + explicit Win32IpcPipeServer (const std::string & n) + : name (n), ref_count (1), last_err (ERROR_SUCCESS), seq_num (0) + { + enqueue_event = CreateEventA (nullptr, FALSE, FALSE, nullptr); + cancellable = CreateEventA (nullptr, TRUE, FALSE, nullptr); + } + + ~Win32IpcPipeServer () + { + win32_ipc_pipe_server_shutdown (this); + CloseHandle (cancellable); + CloseHandle (enqueue_event); + } + + std::mutex lock; + std::condition_variable cond; + std::unique_ptr thread; + std::shared_ptr minfo; + std::string name; + std::vector conn; + + ULONG ref_count; + HANDLE enqueue_event; + HANDLE cancellable; + UINT last_err; + UINT64 seq_num; +}; + +static void +win32_ipc_pipe_server_receive_need_data_async (ServerConnection * conn); + +static void +win32_ipc_pipe_server_close_connection (ServerConnection * conn, + BOOL remove_from_list) +{ + Win32IpcPipeServer *self = conn->self; + + GST_DEBUG ("Closing connection %p", conn); + + if (remove_from_list) { + self->conn.erase (std::remove (self->conn.begin (), self->conn.end (), + conn), self->conn.end ()); + } + + if (!DisconnectNamedPipe (conn->pipe)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("DisconnectNamedPipe failed with 0x%x (%s)", + last_err, msg.c_str ()); + } + + CloseHandle (conn->pipe); + delete conn; +} + +static void WINAPI +win32_ipc_pipe_server_receive_read_done_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ServerConnection *conn = (ServerConnection *) overlapped; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + + GST_WARNING ("READ-DONE failed with 0x%x (%s)", + (UINT) error_code, msg.c_str ()); + win32_ipc_pipe_server_close_connection (conn, TRUE); + return; + } + + GST_TRACE ("Got READ-DONE %p", conn); + + conn->minfo = nullptr; + + /* All done, wait for need-data again */ + win32_ipc_pipe_server_receive_need_data_async (conn); +} + +static void +win32_ipc_pipe_server_receive_read_done_async (ServerConnection * conn) +{ + GST_TRACE ("Waiting READ-DONE %p", conn); + + if (!ReadFileEx (conn->pipe, conn->client_msg, CONN_BUFFER_SIZE, + (OVERLAPPED *) conn, win32_ipc_pipe_server_receive_read_done_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + + win32_ipc_pipe_server_close_connection (conn, TRUE); + } +} + +static void WINAPI +win32_ipc_pipe_server_send_have_data_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ServerConnection *conn = (ServerConnection *) overlapped; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + GST_WARNING ("HAVE-DATA failed with 0x%x (%s)", + (UINT) error_code, msg.c_str ()); + win32_ipc_pipe_server_close_connection (conn, TRUE); + return; + } + + GST_TRACE ("HAVE-DATA done with %s", + win32_ipc_mmf_get_name (conn->minfo->mmf)); + + win32_ipc_pipe_server_receive_read_done_async (conn); +} + +static void +win32_ipc_pipe_server_send_have_data_async (ServerConnection * conn) +{ + assert (conn->minfo != nullptr); + + conn->pending_have_data = FALSE; + conn->seq_num = conn->minfo->seq_num; + + conn->to_write = win32_ipc_pkt_build_have_data (conn->server_msg, + CONN_BUFFER_SIZE, conn->seq_num, + win32_ipc_mmf_get_name (conn->minfo->mmf), &conn->minfo->info); + if (conn->to_write == 0) { + GST_ERROR ("Couldn't build HAVE-DATA pkt"); + win32_ipc_pipe_server_close_connection (conn, TRUE); + return; + } + + conn->seq_num++; + + GST_TRACE ("Sending HAVE-DATA"); + + if (!WriteFileEx (conn->pipe, conn->server_msg, conn->to_write, + (OVERLAPPED *) conn, win32_ipc_pipe_server_send_have_data_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("WriteFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + win32_ipc_pipe_server_close_connection (conn, TRUE); + } +} + +static void WINAPI +win32_ipc_pipe_server_receive_need_data_finish (DWORD error_code, DWORD n_bytes, + LPOVERLAPPED overlapped) +{ + ServerConnection *conn = (ServerConnection *) overlapped; + UINT64 seq_num; + + if (error_code != ERROR_SUCCESS) { + std::string msg = win32_ipc_error_message (error_code); + GST_WARNING ("NEED-DATA failed with 0x%x (%s)", + (UINT) error_code, msg.c_str ()); + win32_ipc_pipe_server_close_connection (conn, TRUE); + return; + } + + if (!win32_ipc_pkt_parse_need_data (conn->client_msg, CONN_BUFFER_SIZE, + &seq_num)) { + GST_ERROR ("Couldn't parse NEED-DATA message"); + win32_ipc_pipe_server_close_connection (conn, TRUE); + return; + } + + GST_TRACE ("Got NEED-DATA"); + + /* Will response later once data is available */ + if (!conn->minfo) { + GST_LOG ("No data available, waiting"); + conn->pending_have_data = TRUE; + return; + } + + win32_ipc_pipe_server_send_have_data_async (conn); +} + +static void +win32_ipc_pipe_server_receive_need_data_async (ServerConnection * conn) +{ + GST_TRACE ("Waiting NEED-DATA"); + + if (!ReadFileEx (conn->pipe, conn->client_msg, CONN_BUFFER_SIZE, + (OVERLAPPED *) conn, win32_ipc_pipe_server_receive_need_data_finish)) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + + GST_WARNING ("ReadFileEx failed with 0x%x (%s)", last_err, msg.c_str ()); + win32_ipc_pipe_server_close_connection (conn, TRUE); + } +} + +static HANDLE +win32_ipc_pipe_server_create_pipe (Win32IpcPipeServer * self, + OVERLAPPED * overlap, BOOL * io_pending) +{ + HANDLE pipe = CreateNamedPipeA (self->name.c_str (), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + CONN_BUFFER_SIZE, CONN_BUFFER_SIZE, 5000, nullptr); + if (pipe == INVALID_HANDLE_VALUE) { + self->last_err = GetLastError (); + std::string msg = win32_ipc_error_message (self->last_err); + GST_WARNING ("CreateNamedPipeA failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + return INVALID_HANDLE_VALUE; + } + + /* Async pipe should return FALSE */ + if (ConnectNamedPipe (pipe, overlap)) { + self->last_err = GetLastError (); + std::string msg = win32_ipc_error_message (self->last_err); + GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + CloseHandle (pipe); + return INVALID_HANDLE_VALUE; + } + + *io_pending = FALSE; + self->last_err = GetLastError (); + switch (self->last_err) { + case ERROR_IO_PENDING: + *io_pending = TRUE; + break; + case ERROR_PIPE_CONNECTED: + SetEvent (overlap->hEvent); + break; + default: + { + std::string msg = win32_ipc_error_message (self->last_err); + GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)", + self->last_err, msg.c_str ()); + CloseHandle (pipe); + return INVALID_HANDLE_VALUE; + } + } + + self->last_err = ERROR_SUCCESS; + + return pipe; +} + +static void +win32_ipc_pipe_server_loop (Win32IpcPipeServer * self) +{ + BOOL io_pending = FALSE; + DWORD n_bytes; + DWORD wait_ret; + HANDLE waitables[3]; + HANDLE pipe; + OVERLAPPED overlap; + std::unique_lock lk (self->lock); + + overlap.hEvent = CreateEvent (nullptr, TRUE, TRUE, nullptr); + pipe = win32_ipc_pipe_server_create_pipe (self, &overlap, &io_pending); + if (pipe == INVALID_HANDLE_VALUE) { + CloseHandle (overlap.hEvent); + self->cond.notify_all (); + return; + } + + self->last_err = ERROR_SUCCESS; + self->cond.notify_all (); + lk.unlock (); + + do { + ServerConnection *conn; + + waitables[0] = overlap.hEvent; + waitables[1] = self->enqueue_event; + waitables[2] = self->cancellable; + + /* Enters alertable state and wait for + * 1) Client's connection request + * (similar to socket listen/accept in async manner) + * 2) Or, performs completion routines (finish APC) + * 3) Or, terminates if cancellable event was signalled + */ + wait_ret = WaitForMultipleObjectsEx (3, waitables, FALSE, INFINITE, TRUE); + if (wait_ret == WAIT_OBJECT_0 + 2) { + GST_DEBUG ("Operation cancelled"); + goto out; + } + + switch (wait_ret) { + case WAIT_OBJECT_0: + if (io_pending) { + BOOL ret = GetOverlappedResult (pipe, &overlap, &n_bytes, FALSE); + if (!ret) { + UINT last_err = GetLastError (); + std::string msg = win32_ipc_error_message (last_err); + GST_WARNING ("ConnectNamedPipe failed with 0x%x (%s)", + last_err, msg.c_str ()); + CloseHandle (pipe); + break; + } + } + + conn = new ServerConnection (self, pipe); + GST_DEBUG ("New connection is established %p", conn); + + /* Stores current buffer if available */ + lk.lock(); + conn->minfo = self->minfo; + lk.unlock (); + + pipe = INVALID_HANDLE_VALUE; + self->conn.push_back (conn); + win32_ipc_pipe_server_receive_need_data_async (conn); + pipe = win32_ipc_pipe_server_create_pipe (self, &overlap, &io_pending); + if (pipe == INVALID_HANDLE_VALUE) + goto out; + break; + case WAIT_OBJECT_0 + 1: + case WAIT_IO_COMPLETION: + { + std::vector pending_conns; + std::shared_ptr minfo; + + lk.lock(); + minfo = self->minfo; + lk.unlock(); + + if (minfo) { + for (auto iter: self->conn) { + if (iter->pending_have_data && iter->seq_num <= minfo->seq_num) { + iter->minfo = minfo; + pending_conns.push_back (iter); + } + } + } + + for (auto iter: pending_conns) { + GST_LOG ("Sending pending have data to %p", iter); + win32_ipc_pipe_server_send_have_data_async (iter); + } + + break; + } + default: + GST_WARNING ("Unexpected WaitForMultipleObjectsEx return 0x%x", + (UINT) wait_ret); + goto out; + } + } while (true); + +out: + /* Cancels all I/O event issued from this thread */ + { + std::vector pipes; + for (auto iter: self->conn) { + if (iter->pipe != INVALID_HANDLE_VALUE) + pipes.push_back (iter->pipe); + } + + for (auto iter: pipes) + CancelIo (iter); + } + + for (auto iter: self->conn) + win32_ipc_pipe_server_close_connection (iter, FALSE); + + self->conn.clear (); + + lk.lock (); + CloseHandle (overlap.hEvent); + self->last_err = ERROR_OPERATION_ABORTED; + self->cond.notify_all (); +} + +static BOOL +win32_ipc_pipe_server_run (Win32IpcPipeServer * self) +{ + std::unique_lock lk (self->lock); + + self->thread = std::make_unique + (std::thread (win32_ipc_pipe_server_loop, self)); + self->cond.wait (lk); + + if (self->last_err != ERROR_SUCCESS) { + self->thread->join (); + self->thread = nullptr; + return FALSE; + } + + return TRUE; +} + +Win32IpcPipeServer * +win32_ipc_pipe_server_new (const char * pipe_name) +{ + Win32IpcPipeServer *self; + + if (!pipe_name) + return nullptr; + + self = new Win32IpcPipeServer (pipe_name); + + if (!win32_ipc_pipe_server_run (self)) { + win32_ipc_pipe_server_unref (self); + return nullptr; + } + + return self; +} + +Win32IpcPipeServer * +win32_ipc_pipe_server_ref (Win32IpcPipeServer * server) +{ + if (!server) + return nullptr; + + InterlockedIncrement (&server->ref_count); + + return server; +} + +void +win32_ipc_pipe_server_unref (Win32IpcPipeServer * server) +{ + ULONG ref_count; + + if (!server) + return; + + ref_count = InterlockedDecrement (&server->ref_count); + if (ref_count == 0) + delete server; +} + +void +win32_ipc_pipe_server_shutdown (Win32IpcPipeServer * server) +{ + GST_DEBUG ("Shutting down"); + + SetEvent (server->cancellable); + if (server->thread) { + server->thread->join (); + server->thread = nullptr; + } + + std::lock_guard lk (server->lock); + server->last_err = ERROR_OPERATION_ABORTED; + server->minfo = nullptr; + server->cond.notify_all (); +} + +BOOL +win32_ipc_pipe_server_send_mmf (Win32IpcPipeServer * server, Win32IpcMmf * mmf, + const Win32IpcVideoInfo * info) +{ + std::lock_guard lk (server->lock); + server->minfo = std::make_shared (mmf, info, server->seq_num); + + GST_LOG ("Enqueue mmf %s", win32_ipc_mmf_get_name (mmf)); + + server->seq_num++; + + /* Wakeup event loop */ + SetEvent (server->enqueue_event); + + return TRUE; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.h b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.h new file mode 100644 index 0000000..2a7ccc0 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcpipeserver.h @@ -0,0 +1,50 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#pragma once + +#include +#include +#include "win32ipcmmf.h" +#include "win32ipcprotocol.h" +#include + +G_BEGIN_DECLS + +struct Win32IpcPipeServer; + +Win32IpcPipeServer * win32_ipc_pipe_server_new (const char * pipe_name); + +Win32IpcPipeServer * win32_ipc_pipe_server_ref (Win32IpcPipeServer * server); + +void win32_ipc_pipe_server_unref (Win32IpcPipeServer * server); + +void win32_ipc_pipe_server_shutdown (Win32IpcPipeServer * server); + +BOOL win32_ipc_pipe_server_send_mmf (Win32IpcPipeServer * server, + Win32IpcMmf * mmf, + const Win32IpcVideoInfo * info); + +G_END_DECLS + diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.cpp new file mode 100644 index 0000000..cc78211 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.cpp @@ -0,0 +1,237 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#include "win32ipcprotocol.h" +#include + +const char * +win32_ipc_pkt_type_to_string (Win32IpcPktType type) +{ + switch (type) { + case WIN32_IPC_PKT_NEED_DATA: + return "NEED-DATA"; + case WIN32_IPC_PKT_HAVE_DATA: + return "HAVE-DATA"; + case WIN32_IPC_PKT_READ_DONE: + return "READ-DONE"; + default: + break; + } + + return "Unknown"; +} + +Win32IpcPktType +win32_ipc_pkt_type_from_raw (UINT8 type) +{ + return (Win32IpcPktType) type; +} + +UINT8 +win32_ipc_pkt_type_to_raw (Win32IpcPktType type) +{ + return (UINT8) type; +} + +#define READ_UINT32(d,v) do { \ + (*((UINT32 *) v)) = *((UINT32 *) d); \ + (d) += sizeof (UINT32); \ +} while (0) + +#define WRITE_UINT32(d,v) do { \ + *((UINT32 *) d) = v; \ + (d) += sizeof (UINT32); \ +} while (0) + +#define READ_UINT64(d,v) do { \ + (*((UINT64 *) v)) = *((UINT64 *) d); \ + (d) += sizeof (UINT64); \ +} while (0) + +#define WRITE_UINT64(d,v) do { \ + *((UINT64 *) d) = v; \ + (d) += sizeof (UINT64); \ +} while (0) + +UINT32 +win32_ipc_pkt_build_need_data (UINT8 * pkt, UINT32 pkt_len, UINT64 seq_num) +{ + UINT8 *data = pkt; + + if (!pkt || pkt_len < WIN32_IPC_PKT_NEED_DATA_SIZE) + return 0; + + data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_NEED_DATA); + data++; + + WRITE_UINT64 (data, seq_num); + + return WIN32_IPC_PKT_NEED_DATA_SIZE; +} + +BOOL +win32_ipc_pkt_parse_need_data (UINT8 * pkt, UINT32 pkt_len, UINT64 * seq_num) +{ + UINT8 *data = pkt; + + if (!pkt || pkt_len < WIN32_IPC_PKT_NEED_DATA_SIZE) + return FALSE; + + if (win32_ipc_pkt_type_from_raw (data[0]) != WIN32_IPC_PKT_NEED_DATA) + return FALSE; + + data++; + + READ_UINT64 (data, seq_num); + + return TRUE; +} + +UINT32 +win32_ipc_pkt_build_have_data (UINT8 * pkt, UINT32 pkt_size, UINT64 seq_num, + const char * mmf_name, const Win32IpcVideoInfo * info) +{ + UINT8 *data = pkt; + size_t len; + + if (!pkt || !mmf_name || !info) + return 0; + + len = strlen (mmf_name); + if (len == 0) + return 0; + + len++; + if (pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE + len) + return 0; + + data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_HAVE_DATA); + data++; + + WRITE_UINT64 (data, seq_num); + + strcpy ((char *) data, mmf_name); + data += len; + + WRITE_UINT32 (data, info->format); + WRITE_UINT32 (data, info->width); + WRITE_UINT32 (data, info->height); + WRITE_UINT32 (data, info->fps_n); + WRITE_UINT32 (data, info->fps_d); + WRITE_UINT32 (data, info->par_n); + WRITE_UINT32 (data, info->par_d); + WRITE_UINT64 (data, info->size); + + for (UINT i = 0; i < 4; i++) + WRITE_UINT64 (data, info->offset[i]); + + for (UINT i = 0; i < 4; i++) + WRITE_UINT32 (data, info->stride[i]); + + WRITE_UINT64 (data, info->qpc); + + return data - pkt; +} + +BOOL +win32_ipc_pkt_parse_have_data (UINT8 * pkt, UINT32 pkt_size, UINT64 * seq_num, + char * mmf_name, Win32IpcVideoInfo * info) +{ + UINT8 *data = pkt; + size_t len; + + if (!pkt || pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE) + return FALSE; + + if (win32_ipc_pkt_type_from_raw (pkt[0]) != WIN32_IPC_PKT_HAVE_DATA) + return FALSE; + + data++; + + READ_UINT64 (data, seq_num); + + len = strnlen ((const char *) data, pkt_size - (data - pkt)); + if (len == 0) + return FALSE; + + len++; + if (pkt_size < WIN32_IPC_PKT_HAVE_DATA_SIZE + len) + return FALSE; + + strcpy (mmf_name, (const char *) data); + data += len; + + READ_UINT32 (data, &info->format); + READ_UINT32 (data, &info->width); + READ_UINT32 (data, &info->height); + READ_UINT32 (data, &info->fps_n); + READ_UINT32 (data, &info->fps_d); + READ_UINT32 (data, &info->par_n); + READ_UINT32 (data, &info->par_d); + READ_UINT64 (data, &info->size); + + for (UINT i = 0; i < 4; i++) + READ_UINT64 (data, &info->offset[i]); + + for (UINT i = 0; i < 4; i++) + READ_UINT32 (data, &info->stride[i]); + + READ_UINT64 (data, &info->qpc); + + return TRUE; +} + +UINT32 +win32_ipc_pkt_build_read_done (UINT8 * pkt, UINT32 pkt_len, UINT64 seq_num) +{ + UINT8 *data = pkt; + + if (!pkt || pkt_len < WIN32_IPC_PKT_READ_DONE_SIZE) + return 0; + + data[0] = win32_ipc_pkt_type_to_raw (WIN32_IPC_PKT_READ_DONE); + data++; + + WRITE_UINT64 (data, seq_num); + + return WIN32_IPC_PKT_READ_DONE_SIZE; +} + +BOOL +win32_ipc_pkt_parse_read_done (UINT8 * pkt, UINT32 pkt_len, UINT64 * seq_num) +{ + UINT8 *data = pkt; + + if (!pkt || pkt_len < WIN32_IPC_PKT_READ_DONE_SIZE) + return FALSE; + + if (win32_ipc_pkt_type_from_raw (data[0]) != WIN32_IPC_PKT_READ_DONE) + return FALSE; + + data++; + + READ_UINT64 (data, seq_num); + + return TRUE; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.h b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.h new file mode 100644 index 0000000..2a21694 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcprotocol.h @@ -0,0 +1,243 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#pragma once + +#include +#include + +G_BEGIN_DECLS + +/* + * Communication Sequence + * + * +--------+ +--------+ + * | client | | server | + * +--------+ +--------+ + * | | + * +--------- NEED-DATA ---------->| + * | +-------+ + * | | prepare named + * | | shared-memory + * | +<------+ + * +<-- HAVE-DATA (w/ shm name) ---| + * +--------+ | + * Open named | | + * shared-memory | | + * +------->+ | + * |--------- READ-DONE ---------->| + */ + +typedef enum +{ + WIN32_IPC_PKT_UNKNOWN, + WIN32_IPC_PKT_NEED_DATA, + WIN32_IPC_PKT_HAVE_DATA, + WIN32_IPC_PKT_READ_DONE, +} Win32IpcPktType; + +/* Same as GstVideoFormat */ +typedef enum +{ + WIN32_IPC_VIDEO_FORMAT_UNKNOWN, + WIN32_IPC_VIDEO_FORMAT_ENCODED, + WIN32_IPC_VIDEO_FORMAT_I420, + WIN32_IPC_VIDEO_FORMAT_YV12, + WIN32_IPC_VIDEO_FORMAT_YUY2, + WIN32_IPC_VIDEO_FORMAT_UYVY, + WIN32_IPC_VIDEO_FORMAT_AYUV, + WIN32_IPC_VIDEO_FORMAT_RGBx, + WIN32_IPC_VIDEO_FORMAT_BGRx, + WIN32_IPC_VIDEO_FORMAT_xRGB, + WIN32_IPC_VIDEO_FORMAT_xBGR, + WIN32_IPC_VIDEO_FORMAT_RGBA, + WIN32_IPC_VIDEO_FORMAT_BGRA, + WIN32_IPC_VIDEO_FORMAT_ARGB, + WIN32_IPC_VIDEO_FORMAT_ABGR, + WIN32_IPC_VIDEO_FORMAT_RGB, + WIN32_IPC_VIDEO_FORMAT_BGR, + WIN32_IPC_VIDEO_FORMAT_Y41B, + WIN32_IPC_VIDEO_FORMAT_Y42B, + WIN32_IPC_VIDEO_FORMAT_YVYU, + WIN32_IPC_VIDEO_FORMAT_Y444, + WIN32_IPC_VIDEO_FORMAT_v210, + WIN32_IPC_VIDEO_FORMAT_v216, + WIN32_IPC_VIDEO_FORMAT_NV12, + WIN32_IPC_VIDEO_FORMAT_NV21, + WIN32_IPC_VIDEO_FORMAT_GRAY8, + WIN32_IPC_VIDEO_FORMAT_GRAY16_BE, + WIN32_IPC_VIDEO_FORMAT_GRAY16_LE, + WIN32_IPC_VIDEO_FORMAT_v308, + WIN32_IPC_VIDEO_FORMAT_RGB16, + WIN32_IPC_VIDEO_FORMAT_BGR16, + WIN32_IPC_VIDEO_FORMAT_RGB15, + WIN32_IPC_VIDEO_FORMAT_BGR15, + WIN32_IPC_VIDEO_FORMAT_UYVP, + WIN32_IPC_VIDEO_FORMAT_A420, + WIN32_IPC_VIDEO_FORMAT_RGB8P, + WIN32_IPC_VIDEO_FORMAT_YUV9, + WIN32_IPC_VIDEO_FORMAT_YVU9, + WIN32_IPC_VIDEO_FORMAT_IYU1, + WIN32_IPC_VIDEO_FORMAT_ARGB64, + WIN32_IPC_VIDEO_FORMAT_AYUV64, + WIN32_IPC_VIDEO_FORMAT_r210, + WIN32_IPC_VIDEO_FORMAT_I420_10BE, + WIN32_IPC_VIDEO_FORMAT_I420_10LE, + WIN32_IPC_VIDEO_FORMAT_I422_10BE, + WIN32_IPC_VIDEO_FORMAT_I422_10LE, + WIN32_IPC_VIDEO_FORMAT_Y444_10BE, + WIN32_IPC_VIDEO_FORMAT_Y444_10LE, + WIN32_IPC_VIDEO_FORMAT_GBR, + WIN32_IPC_VIDEO_FORMAT_GBR_10BE, + WIN32_IPC_VIDEO_FORMAT_GBR_10LE, + WIN32_IPC_VIDEO_FORMAT_NV16, + WIN32_IPC_VIDEO_FORMAT_NV24, + WIN32_IPC_VIDEO_FORMAT_NV12_64Z32, + WIN32_IPC_VIDEO_FORMAT_A420_10BE, + WIN32_IPC_VIDEO_FORMAT_A420_10LE, + WIN32_IPC_VIDEO_FORMAT_A422_10BE, + WIN32_IPC_VIDEO_FORMAT_A422_10LE, + WIN32_IPC_VIDEO_FORMAT_A444_10BE, + WIN32_IPC_VIDEO_FORMAT_A444_10LE, + WIN32_IPC_VIDEO_FORMAT_NV61, + WIN32_IPC_VIDEO_FORMAT_P010_10BE, + WIN32_IPC_VIDEO_FORMAT_P010_10LE, + WIN32_IPC_VIDEO_FORMAT_IYU2, + WIN32_IPC_VIDEO_FORMAT_VYUY, + WIN32_IPC_VIDEO_FORMAT_GBRA, + WIN32_IPC_VIDEO_FORMAT_GBRA_10BE, + WIN32_IPC_VIDEO_FORMAT_GBRA_10LE, + WIN32_IPC_VIDEO_FORMAT_GBR_12BE, + WIN32_IPC_VIDEO_FORMAT_GBR_12LE, + WIN32_IPC_VIDEO_FORMAT_GBRA_12BE, + WIN32_IPC_VIDEO_FORMAT_GBRA_12LE, + WIN32_IPC_VIDEO_FORMAT_I420_12BE, + WIN32_IPC_VIDEO_FORMAT_I420_12LE, + WIN32_IPC_VIDEO_FORMAT_I422_12BE, + WIN32_IPC_VIDEO_FORMAT_I422_12LE, + WIN32_IPC_VIDEO_FORMAT_Y444_12BE, + WIN32_IPC_VIDEO_FORMAT_Y444_12LE, + WIN32_IPC_VIDEO_FORMAT_GRAY10_LE32, + WIN32_IPC_VIDEO_FORMAT_NV12_10LE32, + WIN32_IPC_VIDEO_FORMAT_NV16_10LE32, + WIN32_IPC_VIDEO_FORMAT_NV12_10LE40, + WIN32_IPC_VIDEO_FORMAT_Y210, + WIN32_IPC_VIDEO_FORMAT_Y410, + WIN32_IPC_VIDEO_FORMAT_VUYA, + WIN32_IPC_VIDEO_FORMAT_BGR10A2_LE, + WIN32_IPC_VIDEO_FORMAT_RGB10A2_LE, + WIN32_IPC_VIDEO_FORMAT_Y444_16BE, + WIN32_IPC_VIDEO_FORMAT_Y444_16LE, + WIN32_IPC_VIDEO_FORMAT_P016_BE, + WIN32_IPC_VIDEO_FORMAT_P016_LE, + WIN32_IPC_VIDEO_FORMAT_P012_BE, + WIN32_IPC_VIDEO_FORMAT_P012_LE, + WIN32_IPC_VIDEO_FORMAT_Y212_BE, + WIN32_IPC_VIDEO_FORMAT_Y212_LE, + WIN32_IPC_VIDEO_FORMAT_Y412_BE, + WIN32_IPC_VIDEO_FORMAT_Y412_LE, + WIN32_IPC_VIDEO_FORMAT_NV12_4L4, + WIN32_IPC_VIDEO_FORMAT_NV12_32L32, + WIN32_IPC_VIDEO_FORMAT_RGBP, + WIN32_IPC_VIDEO_FORMAT_BGRP, + WIN32_IPC_VIDEO_FORMAT_AV12, + WIN32_IPC_VIDEO_FORMAT_ARGB64_LE, + WIN32_IPC_VIDEO_FORMAT_ARGB64_BE, + WIN32_IPC_VIDEO_FORMAT_RGBA64_LE, + WIN32_IPC_VIDEO_FORMAT_RGBA64_BE, + WIN32_IPC_VIDEO_FORMAT_BGRA64_LE, + WIN32_IPC_VIDEO_FORMAT_BGRA64_BE, + WIN32_IPC_VIDEO_FORMAT_ABGR64_LE, + WIN32_IPC_VIDEO_FORMAT_ABGR64_BE, + WIN32_IPC_VIDEO_FORMAT_NV12_16L32S, + WIN32_IPC_VIDEO_FORMAT_NV12_8L128, + WIN32_IPC_VIDEO_FORMAT_NV12_10BE_8L128, +} Win32IpcVideoFormat; + +typedef struct +{ + Win32IpcVideoFormat format; + UINT32 width; + UINT32 height; + UINT32 fps_n; + UINT32 fps_d; + UINT32 par_n; + UINT32 par_d; + /* the size of memory */ + UINT64 size; + /* plane offsets */ + UINT64 offset[4]; + /* stride of each plane */ + UINT32 stride[4]; + /* QPC time */ + UINT64 qpc; +} Win32IpcVideoInfo; + +/* 1 byte (type) + 8 byte (seq-num) */ +#define WIN32_IPC_PKT_NEED_DATA_SIZE 9 + +/* 1 byte (type) + 8 byte (seq-num) + N bytes (name) + 4 (format) + + * 4 (width) + 4 (height) + 4 (fps_n) + 4 (fps_d) + 4 (par_n) + 4 (par_d) + + * 8 (size) + 8 * 4 (offset) + 4 * 4 (stride) + 8 (timestamp) */ +#define WIN32_IPC_PKT_HAVE_DATA_SIZE 101 + +/* 1 byte (type) + 8 byte (seq-num) */ +#define WIN32_IPC_PKT_READ_DONE_SIZE 5 + +const char * win32_ipc_pkt_type_to_string (Win32IpcPktType type); + +Win32IpcPktType win32_ipc_pkt_type_from_raw (UINT8 type); + +UINT8 win32_ipc_pkt_type_to_raw (Win32IpcPktType type); + +UINT32 win32_ipc_pkt_build_need_data (UINT8 * pkt, + UINT32 pkt_size, + UINT64 seq_num); + +BOOL win32_ipc_pkt_parse_need_data (UINT8 * pkt, + UINT32 pkt_size, + UINT64 * seq_num); + +UINT32 win32_ipc_pkt_build_have_data (UINT8 * pkt, + UINT32 pkt_size, + UINT64 seq_num, + const char * mmf_name, + const Win32IpcVideoInfo * info); + +BOOL win32_ipc_pkt_parse_have_data (UINT8 * pkt, + UINT32 pkt_size, + UINT64 * seq_num, + char * mmf_name, + Win32IpcVideoInfo * info); + +UINT32 win32_ipc_pkt_build_read_done (UINT8 * pkt, + UINT32 pkt_size, + UINT64 seq_num); + +BOOL win32_ipc_pkt_parse_read_done (UINT8 * pkt, + UINT32 pkt_size, + UINT64 * seq_num); + +G_END_DECLS + diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.cpp b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.cpp new file mode 100644 index 0000000..0982382 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.cpp @@ -0,0 +1,54 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#include "win32ipcutils.h" +#include +#include +#include +#include + +static inline void rtrim(std::string &s) { + s.erase (std::find_if (s.rbegin(), s.rend(), + [](unsigned char ch) { + return !std::isspace (ch); + }).base (), s.end ()); +} + +std::string +win32_ipc_error_message (DWORD error_code) +{ + WCHAR buffer[1024]; + + if (!FormatMessageW (FORMAT_MESSAGE_IGNORE_INSERTS | + FORMAT_MESSAGE_FROM_SYSTEM, nullptr, error_code, 0, buffer, + 1024, nullptr)) { + return std::string (""); + } + + std::wstring_convert, wchar_t> converter; + std::string ret = converter.to_bytes (buffer); + rtrim (ret); + + return ret; +} diff --git a/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.h b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.h new file mode 100644 index 0000000..368b100 --- /dev/null +++ b/subprojects/gst-plugins-bad/sys/win32ipc/protocol/win32ipcutils.h @@ -0,0 +1,30 @@ +/* GStreamer + * Copyright (C) 2022 Seungha Yang + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * SPDX-License-Identifier: MIT + */ + +#pragma once + +#include +#include + +std::string win32_ipc_error_message (DWORD error_code);