1 #include <zypp/zyppng/media/network/private/networkrequestdispatcher_p.h>
2 #include <zypp/zyppng/media/network/private/networkrequesterror_p.h>
3 #include <zypp/zyppng/media/network/private/request_p.h>
4 #include <zypp/zyppng/base/Timer>
5 #include <zypp/zyppng/base/SocketNotifier>
6 #include <zypp/zyppng/base/EventDispatcher>
7 #include <zypp/media/CurlHelper.h>
8 #include <zypp/media/MediaUserAuth.h>
11 #include <zypp/base/Logger.h>
12 #include <zypp/base/String.h>
14 using namespace boost;
18 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( )
19 : _timer( Timer::create() )
20 , _multi ( curl_multi_init() )
22 internal::globalInitCurlOnce();
24 curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
25 curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
26 curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
27 curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
29 _timer->sigExpired().connect( sigc::mem_fun( *this, &NetworkRequestDispatcherPrivate::multiTimerTimout ) );
32 NetworkRequestDispatcherPrivate::~NetworkRequestDispatcherPrivate()
34 cancelAll( NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, "Dispatcher shutdown" ) );
35 curl_multi_cleanup( _multi );
38 //called by curl to setup a timer
39 int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
41 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
42 assert( that != nullptr );
44 if ( timeout_ms >= 0 ) {
45 that->_timer->start( static_cast<uint64_t>(timeout_ms) );
53 void NetworkRequestDispatcherPrivate::multiTimerTimout(const Timer &)
55 handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
58 int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
60 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
61 assert( that != nullptr );
62 return that->socketCallback( easy, s, what, socketp );
65 int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
67 std::shared_ptr<SocketNotifier> socketp;
69 if ( _socketHandler.count( s ) == 0 ) {
70 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
73 socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
74 _socketHandler.insert( std::make_pair( s, socketp ) );
76 socketp->sigActivated().connect( sigc::mem_fun(*this, &NetworkRequestDispatcherPrivate::onSocketActivated) );
78 socketp = _socketHandler[s];
83 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
86 if ( _socketHandler.count( s ) > 0 )
87 _socketHandler.erase( s );
89 void *privatePtr = nullptr;
90 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
91 privatePtr = nullptr; //make sure this was not filled with bad info
95 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
96 //we stop the download, if we can not listen for socket changes we can not correctly do anything
97 setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
100 //a broken handle without anything assigned, also should never happen but make sure and clean it up
101 DBG << "Cleaning up unassigned easy handle" << std::endl;
102 curl_multi_remove_handle( _multi, easy );
103 curl_easy_cleanup( easy );
109 if ( what == CURL_POLL_REMOVE ) {
110 socketp->setEnabled( false );
111 _socketHandler.erase( s );
113 //keep the reference until this iteration is over
114 EventDispatcher::unrefLater( socketp );
118 if ( what == CURL_POLL_IN ) {
119 socketp->setMode( SocketNotifier::Read );
120 } else if ( what == CURL_POLL_OUT ) {
121 socketp->setMode( SocketNotifier::Write );
122 } else if ( what == CURL_POLL_INOUT ) {
123 socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
126 socketp->setEnabled();
130 void NetworkRequestDispatcherPrivate::onSocketActivated( const SocketNotifier &listener, int events )
133 if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
134 evBitmask |= CURL_CSELECT_IN;
135 if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
136 evBitmask |= CURL_CSELECT_OUT;
137 if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
138 evBitmask |= CURL_CSELECT_ERR;
140 handleMultiSocketAction( listener.socket(), evBitmask );
143 void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
146 CURLMcode rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
148 //we can not recover from a error like that, cancel all and stop
149 NetworkRequestError err = NetworkRequestErrorPrivate::fromCurlMError( rc );
153 _sigError.emit( *z_func() );
158 CURLMsg *msg = nullptr;
159 while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
160 if(msg->msg == CURLMSG_DONE) {
161 CURL *easy = msg->easy_handle;
162 CURLcode res = msg->data.result;
164 void *privatePtr = nullptr;
165 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK )
169 //broken easy handle not associated, should never happen but clean it up
170 DBG << "Cleaning up unassigned easy handle" << std::endl;
171 curl_multi_remove_handle( _multi, easy );
172 curl_easy_cleanup( easy );
176 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
178 //trigger notification about file downloaded
179 NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->_errorBuf.data() );
180 setFinished( *request->z_func(), e );
182 //attention request could be deleted from here on
187 void NetworkRequestDispatcherPrivate::cancelAll( NetworkRequestError result )
189 //prevent dequeuePending from filling up the runningDownloads again
192 while ( _runningDownloads.size() ) {
193 std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
194 setFinished(*req, result );
196 while ( _pendingDownloads.size() ) {
197 std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
198 setFinished(*req, result );
204 void NetworkRequestDispatcherPrivate::setFinished( NetworkRequest &req, NetworkRequestError result )
206 auto delReq = []( auto &list, NetworkRequest &req ) {
207 auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
208 return req.d_func() == r->d_func();
210 if ( it != list.end() ) {
211 EventDispatcher::unrefLater( *it );
216 delReq( _runningDownloads, req );
217 delReq( _pendingDownloads, req );
219 void *easyHandle = req.d_func()->_easyHandle;
221 curl_multi_remove_handle( _multi, easyHandle );
224 req.d_func()->_dispatcher = nullptr;
226 //first set the result, the Request might have a checksum to check as well so a currently
227 //successful request could fail later on
228 req.d_func()->setResult( std::move(result) );
229 _sigDownloadFinished.emit( *z_func(), req );
231 //we got a open slot, try to dequeue or send the finished signals if all queues are empty
235 void NetworkRequestDispatcherPrivate::dequeuePending()
237 if ( !_isRunning || _locked )
240 while ( _maxConnections > _runningDownloads.size() ) {
241 if ( !_pendingDownloads.size() )
244 std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
245 _pendingDownloads.pop_front();
247 std::string errBuf = "Failed to initialize easy handle";
248 if ( !req->d_func()->initialize( errBuf ) ) {
249 //@TODO store the CURL error in the errors extra info
250 EventDispatcher::unrefLater( req );
251 setFinished( *req, NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, std::move(errBuf) ) );
255 CURLMcode rc = curl_multi_add_handle( _multi, req->d_func()->_easyHandle );
257 EventDispatcher::unrefLater( req );
258 setFinished( *req, NetworkRequestErrorPrivate::fromCurlMError( rc ) );
262 req->d_func()->aboutToStart();
263 _sigDownloadStarted.emit( *z_func(), *req );
265 _runningDownloads.push_back( std::move(req) );
268 //check for empty queues
269 if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
270 //once we finished all requests, cancel the timer too, so curl is not called without requests
272 _sigQueueFinished.emit( *z_func() );
276 NetworkRequestDispatcher::NetworkRequestDispatcher( )
277 : Base( * new NetworkRequestDispatcherPrivate ( ) )
282 bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
284 curl_version_info_data *curl_info = nullptr;
285 curl_info = curl_version_info(CURLVERSION_NOW);
286 // curl_info does not need any free (is static)
287 if (curl_info->protocols)
289 const char * const *proto;
290 std::string scheme( url.getScheme() );
292 for(proto=curl_info->protocols; !found && *proto; ++proto) {
293 if( scheme == std::string((const char *)*proto))
301 void NetworkRequestDispatcher::setMaximumConcurrentConnections( size_t maxConn )
303 d_func()->_maxConnections = maxConn;
306 void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
312 if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
313 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
317 if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
318 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
322 req->d_func()->_dispatcher = this;
323 if ( req->priority() == NetworkRequest::Normal )
324 d->_pendingDownloads.push_back( req );
326 auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &req ){
327 return req->priority() == NetworkRequest::Normal;
330 //if we have a valid iterator, decrement we found a Normal pending download request, insert before that
331 if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
333 d->_pendingDownloads.insert( it, req );
336 //dequeue if running and we have capacity
340 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
342 cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitely cancelled" ) );
345 void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
349 if ( req.d_func()->_dispatcher != this ) {
350 //TODO throw exception
354 d->setFinished( req, err );
357 void NetworkRequestDispatcher::run()
360 d->_isRunning = true;
362 if ( d->_pendingDownloads.size() )
366 const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
368 return d_func()->_lastError;
371 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
373 return d_func()->_sigDownloadStarted;
376 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
378 return d_func()->_sigDownloadFinished;
381 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
383 return d_func()->_sigQueueFinished;
386 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
388 return d_func()->_sigError;