python: Add a Gst.init_python function to be called from plugins
[platform/upstream/gstreamer.git] / subprojects / gst-python / examples / plugins / python / py_audiotestsrc.py
1 '''
2 Element that generates a sine audio wave with the specified frequency
3
4 Requires numpy
5
6 Example pipeline:
7
8 gst-launch-1.0 py_audiotestsrc ! autoaudiosink
9 '''
10
11 import gi
12
13 gi.require_version('Gst', '1.0')
14 gi.require_version('GstBase', '1.0')
15 gi.require_version('GstAudio', '1.0')
16
17 from gi.repository import Gst, GLib, GObject, GstBase, GstAudio
18
19 try:
20     import numpy as np
21 except ImportError:
22     Gst.error('py_audiotestsrc requires numpy')
23     raise
24
25 Gst.init_python()
26
27 OCAPS = Gst.Caps.from_string (
28         'audio/x-raw, format=F32LE, layout=interleaved, rate=44100, channels=2')
29
30 SAMPLESPERBUFFER = 1024
31
32 DEFAULT_FREQ = 440
33 DEFAULT_VOLUME = 0.8
34 DEFAULT_MUTE = False
35 DEFAULT_IS_LIVE = False
36
37 class AudioTestSrc(GstBase.BaseSrc):
38     __gstmetadata__ = ('CustomSrc','Src', \
39                       'Custom test src element', 'Mathieu Duponchelle')
40
41     __gproperties__ = {
42         "freq": (int,
43                  "Frequency",
44                  "Frequency of test signal",
45                  1,
46                  GLib.MAXINT,
47                  DEFAULT_FREQ,
48                  GObject.ParamFlags.READWRITE
49                 ),
50         "volume": (float,
51                    "Volume",
52                    "Volume of test signal",
53                    0.0,
54                    1.0,
55                    DEFAULT_VOLUME,
56                    GObject.ParamFlags.READWRITE
57                   ),
58         "mute": (bool,
59                  "Mute",
60                  "Mute the test signal",
61                  DEFAULT_MUTE,
62                  GObject.ParamFlags.READWRITE
63                 ),
64         "is-live": (bool,
65                  "Is live",
66                  "Whether to act as a live source",
67                  DEFAULT_IS_LIVE,
68                  GObject.ParamFlags.READWRITE
69                 ),
70     }
71
72     __gsttemplates__ = Gst.PadTemplate.new("src",
73                                            Gst.PadDirection.SRC,
74                                            Gst.PadPresence.ALWAYS,
75                                            OCAPS)
76
77     def __init__(self):
78         GstBase.BaseSrc.__init__(self)
79         self.info = GstAudio.AudioInfo()
80
81         self.freq = DEFAULT_FREQ
82         self.volume = DEFAULT_VOLUME
83         self.mute = DEFAULT_MUTE
84
85         self.set_live(DEFAULT_IS_LIVE)
86         self.set_format(Gst.Format.TIME)
87
88     def do_set_caps(self, caps):
89         self.info.from_caps(caps)
90         self.set_blocksize(self.info.bpf * SAMPLESPERBUFFER)
91         return True
92
93     def do_get_property(self, prop):
94         if prop.name == 'freq':
95             return self.freq
96         elif prop.name == 'volume':
97             return self.volume
98         elif prop.name == 'mute':
99             return self.mute
100         elif prop.name == 'is-live':
101             return self.is_live
102         else:
103             raise AttributeError('unknown property %s' % prop.name)
104
105     def do_set_property(self, prop, value):
106         if prop.name == 'freq':
107             self.freq = value
108         elif prop.name == 'volume':
109             self.volume = value
110         elif prop.name == 'mute':
111             self.mute = value
112         elif prop.name == 'is-live':
113             self.set_live(value)
114         else:
115             raise AttributeError('unknown property %s' % prop.name)
116
117     def do_start (self):
118         self.next_sample = 0
119         self.next_byte = 0
120         self.next_time = 0
121         self.accumulator = 0
122         self.generate_samples_per_buffer = SAMPLESPERBUFFER
123
124         return True
125
126     def do_gst_base_src_query(self, query):
127         if query.type == Gst.QueryType.LATENCY:
128             latency = Gst.util_uint64_scale_int(self.generate_samples_per_buffer,
129                     Gst.SECOND, self.info.rate)
130             is_live = self.is_live
131             query.set_latency(is_live, latency, Gst.CLOCK_TIME_NONE)
132             res = True
133         else:
134             res = GstBase.BaseSrc.do_query(self, query)
135         return res
136
137     def do_get_times(self, buf):
138         end = 0
139         start = 0
140         if self.is_live:
141             ts = buf.pts
142             if ts != Gst.CLOCK_TIME_NONE:
143                 duration = buf.duration
144                 if duration != Gst.CLOCK_TIME_NONE:
145                     end = ts + duration
146                 start = ts
147         else:
148             start = Gst.CLOCK_TIME_NONE
149             end = Gst.CLOCK_TIME_NONE
150
151         return start, end
152
153     def do_fill(self, offset, length, buf):
154         if length == -1:
155             samples = SAMPLESPERBUFFER
156         else:
157             samples = int(length / self.info.bpf)
158
159         self.generate_samples_per_buffer = samples
160
161         bytes_ = samples * self.info.bpf
162
163         next_sample = self.next_sample + samples
164         next_byte = self.next_byte + bytes_
165         next_time = Gst.util_uint64_scale_int(next_sample, Gst.SECOND, self.info.rate)
166
167         try:
168             with buf.map(Gst.MapFlags.WRITE) as info:
169                 array = np.ndarray(shape = self.info.channels * samples, dtype = np.float32, buffer = info.data)
170                 if not self.mute:
171                     r = np.repeat(np.arange(self.accumulator, self.accumulator + samples),
172                             self.info.channels)
173                     np.sin(2 * np.pi * r * self.freq / self.info.rate, out=array)
174                     array *= self.volume
175                 else:
176                     array[:] = 0
177         except Exception as e:
178             Gst.error("Mapping error: %s" % e)
179             return (Gst.FlowReturn.ERROR, None)
180
181         buf.offset = self.next_sample
182         buf.offset_end = next_sample
183         buf.pts = self.next_time
184         buf.duration = next_time - self.next_time
185
186         self.next_time = next_time
187         self.next_sample = next_sample
188         self.next_byte = next_byte
189         self.accumulator += samples
190         self.accumulator %= self.info.rate / self.freq
191
192         return (Gst.FlowReturn.OK, buf)
193
194
195 __gstelementfactory__ = ("py_audiotestsrc", Gst.Rank.NONE, AudioTestSrc)