2 Element that generates a sine audio wave with the specified frequency
8 gst-launch-1.0 py_audiotestsrc ! autoaudiosink
13 gi.require_version('Gst', '1.0')
14 gi.require_version('GstBase', '1.0')
15 gi.require_version('GstAudio', '1.0')
17 from gi.repository import Gst, GLib, GObject, GstBase, GstAudio
22 Gst.error('py_audiotestsrc requires numpy')
27 OCAPS = Gst.Caps.from_string (
28 'audio/x-raw, format=F32LE, layout=interleaved, rate=44100, channels=2')
30 SAMPLESPERBUFFER = 1024
35 DEFAULT_IS_LIVE = False
37 class AudioTestSrc(GstBase.BaseSrc):
38 __gstmetadata__ = ('CustomSrc','Src', \
39 'Custom test src element', 'Mathieu Duponchelle')
44 "Frequency of test signal",
48 GObject.ParamFlags.READWRITE
52 "Volume of test signal",
56 GObject.ParamFlags.READWRITE
60 "Mute the test signal",
62 GObject.ParamFlags.READWRITE
66 "Whether to act as a live source",
68 GObject.ParamFlags.READWRITE
72 __gsttemplates__ = Gst.PadTemplate.new("src",
74 Gst.PadPresence.ALWAYS,
78 GstBase.BaseSrc.__init__(self)
79 self.info = GstAudio.AudioInfo()
81 self.freq = DEFAULT_FREQ
82 self.volume = DEFAULT_VOLUME
83 self.mute = DEFAULT_MUTE
85 self.set_live(DEFAULT_IS_LIVE)
86 self.set_format(Gst.Format.TIME)
88 def do_set_caps(self, caps):
89 self.info.from_caps(caps)
90 self.set_blocksize(self.info.bpf * SAMPLESPERBUFFER)
93 def do_get_property(self, prop):
94 if prop.name == 'freq':
96 elif prop.name == 'volume':
98 elif prop.name == 'mute':
100 elif prop.name == 'is-live':
103 raise AttributeError('unknown property %s' % prop.name)
105 def do_set_property(self, prop, value):
106 if prop.name == 'freq':
108 elif prop.name == 'volume':
110 elif prop.name == 'mute':
112 elif prop.name == 'is-live':
115 raise AttributeError('unknown property %s' % prop.name)
122 self.generate_samples_per_buffer = SAMPLESPERBUFFER
126 def do_gst_base_src_query(self, query):
127 if query.type == Gst.QueryType.LATENCY:
128 latency = Gst.util_uint64_scale_int(self.generate_samples_per_buffer,
129 Gst.SECOND, self.info.rate)
130 is_live = self.is_live
131 query.set_latency(is_live, latency, Gst.CLOCK_TIME_NONE)
134 res = GstBase.BaseSrc.do_query(self, query)
137 def do_get_times(self, buf):
142 if ts != Gst.CLOCK_TIME_NONE:
143 duration = buf.duration
144 if duration != Gst.CLOCK_TIME_NONE:
148 start = Gst.CLOCK_TIME_NONE
149 end = Gst.CLOCK_TIME_NONE
153 def do_fill(self, offset, length, buf):
155 samples = SAMPLESPERBUFFER
157 samples = int(length / self.info.bpf)
159 self.generate_samples_per_buffer = samples
161 bytes_ = samples * self.info.bpf
163 next_sample = self.next_sample + samples
164 next_byte = self.next_byte + bytes_
165 next_time = Gst.util_uint64_scale_int(next_sample, Gst.SECOND, self.info.rate)
168 with buf.map(Gst.MapFlags.WRITE) as info:
169 array = np.ndarray(shape = self.info.channels * samples, dtype = np.float32, buffer = info.data)
171 r = np.repeat(np.arange(self.accumulator, self.accumulator + samples),
173 np.sin(2 * np.pi * r * self.freq / self.info.rate, out=array)
177 except Exception as e:
178 Gst.error("Mapping error: %s" % e)
179 return (Gst.FlowReturn.ERROR, None)
181 buf.offset = self.next_sample
182 buf.offset_end = next_sample
183 buf.pts = self.next_time
184 buf.duration = next_time - self.next_time
186 self.next_time = next_time
187 self.next_sample = next_sample
188 self.next_byte = next_byte
189 self.accumulator += samples
190 self.accumulator %= self.info.rate / self.freq
192 return (Gst.FlowReturn.OK, buf)
195 __gstelementfactory__ = ("py_audiotestsrc", Gst.Rank.NONE, AudioTestSrc)