Imported Upstream version 17.22.1
[platform/upstream/libzypp.git] / zypp / zyppng / media / network / networkrequestdispatcher.cc
1 #include <zypp/zyppng/media/network/private/networkrequestdispatcher_p.h>
2 #include <zypp/zyppng/media/network/private/networkrequesterror_p.h>
3 #include <zypp/zyppng/media/network/private/request_p.h>
4 #include <zypp/zyppng/base/Timer>
5 #include <zypp/zyppng/base/SocketNotifier>
6 #include <zypp/zyppng/base/EventDispatcher>
7 #include <zypp/media/CurlHelper.h>
8 #include <zypp/media/MediaUserAuth.h>
9 #include <assert.h>
10
11 #include <zypp/base/Logger.h>
12 #include <zypp/base/String.h>
13
14 using namespace boost;
15
16 namespace zyppng {
17
18 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( )
19   : _timer( Timer::create() )
20   , _multi ( curl_multi_init() )
21 {
22   internal::globalInitCurlOnce();
23
24   curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
25   curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
26   curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
27   curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
28
29   _timer->sigExpired().connect( sigc::mem_fun( *this, &NetworkRequestDispatcherPrivate::multiTimerTimout ) );
30 }
31
32 NetworkRequestDispatcherPrivate::~NetworkRequestDispatcherPrivate()
33 {
34   cancelAll( NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, "Dispatcher shutdown" ) );
35   curl_multi_cleanup( _multi );
36 }
37
38 //called by curl to setup a timer
39 int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
40 {
41   NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
42   assert( that != nullptr );
43
44   if ( timeout_ms >= 0 ) {
45     that->_timer->start( static_cast<uint64_t>(timeout_ms) );
46   } else {
47     //cancel the timer
48     that->_timer->stop();
49   }
50   return 0;
51 }
52
53 void NetworkRequestDispatcherPrivate::multiTimerTimout(const Timer &)
54 {
55   handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
56 }
57
58 int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
59 {
60   NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
61   assert( that != nullptr );
62   return that->socketCallback( easy, s, what, socketp );
63 }
64
65 int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
66 {
67   std::shared_ptr<SocketNotifier> socketp;
68
69   if ( _socketHandler.count( s ) == 0 ) {
70     if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
71       return 0;
72
73     socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
74     _socketHandler.insert( std::make_pair( s, socketp ) );
75
76     socketp->sigActivated().connect( sigc::mem_fun(*this, &NetworkRequestDispatcherPrivate::onSocketActivated) );
77   } else {
78     socketp = _socketHandler[s];
79   }
80
81   //should never happen
82   if ( !socketp ) {
83     if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
84       return 0;
85
86     if ( _socketHandler.count( s ) > 0 )
87       _socketHandler.erase( s );
88
89     void *privatePtr = nullptr;
90     if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
91       privatePtr = nullptr; //make sure this was not filled with bad info
92     }
93
94     if ( privatePtr ) {
95       NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
96       //we stop the download, if we can not listen for socket changes we can not correctly do anything
97       setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
98       return 0;
99     } else {
100       //a broken handle without anything assigned, also should never happen but make sure and clean it up
101       DBG << "Cleaning up unassigned  easy handle" << std::endl;
102       curl_multi_remove_handle( _multi, easy );
103       curl_easy_cleanup( easy );
104       return 0;
105     }
106   }
107
108   //remove the socket
109   if ( what == CURL_POLL_REMOVE ) {
110     socketp->setEnabled( false );
111     _socketHandler.erase( s );
112
113     //keep the reference until this iteration is over
114     EventDispatcher::unrefLater( socketp );
115     return 0;
116   }
117
118   if ( what == CURL_POLL_IN ) {
119     socketp->setMode( SocketNotifier::Read );
120   } else if ( what == CURL_POLL_OUT ) {
121     socketp->setMode( SocketNotifier::Write );
122   } else if ( what == CURL_POLL_INOUT ) {
123     socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
124   }
125
126   socketp->setEnabled();
127   return 0;
128 }
129
130 void NetworkRequestDispatcherPrivate::onSocketActivated( const SocketNotifier &listener, int events )
131 {
132   int evBitmask = 0;
133   if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
134     evBitmask |= CURL_CSELECT_IN;
135   if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
136     evBitmask |= CURL_CSELECT_OUT;
137   if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
138     evBitmask |= CURL_CSELECT_ERR;
139
140   handleMultiSocketAction( listener.socket(), evBitmask );
141 }
142
143 void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
144 {
145   int running = 0;
146   CURLMcode rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
147   if (rc != 0) {
148     //we can not recover from a error like that, cancel all and stop
149     NetworkRequestError err = NetworkRequestErrorPrivate::fromCurlMError( rc );
150     cancelAll( err );
151     //emit error
152     _lastError = err;
153     _sigError.emit( *z_func() );
154     return;
155   }
156
157   int msgs_left = 0;
158   CURLMsg *msg = nullptr;
159   while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
160     if(msg->msg == CURLMSG_DONE) {
161       CURL *easy = msg->easy_handle;
162       CURLcode res = msg->data.result;
163
164       void *privatePtr = nullptr;
165       if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK )
166         continue;
167
168       if ( !privatePtr ) {
169         //broken easy handle not associated, should never happen but clean it up
170         DBG << "Cleaning up unassigned  easy handle" << std::endl;
171         curl_multi_remove_handle( _multi, easy );
172         curl_easy_cleanup( easy );
173         continue;
174       }
175
176       NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
177
178       //trigger notification about file downloaded
179       NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->_errorBuf.data() );
180       setFinished( *request->z_func(), e );
181
182       //attention request could be deleted from here on
183     }
184   }
185 }
186
187 void NetworkRequestDispatcherPrivate::cancelAll( NetworkRequestError result )
188 {
189   //prevent dequeuePending from filling up the runningDownloads again
190   _locked = true;
191
192   while ( _runningDownloads.size() ) {
193     std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
194     setFinished(*req, result );
195   }
196   while ( _pendingDownloads.size() ) {
197     std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
198     setFinished(*req, result );
199   }
200
201   _locked = false;
202 }
203
204 void NetworkRequestDispatcherPrivate::setFinished( NetworkRequest &req, NetworkRequestError result )
205 {
206   auto delReq = []( auto &list, NetworkRequest &req ) {
207     auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
208       return req.d_func() == r->d_func();
209     } );
210     if ( it != list.end() ) {
211       EventDispatcher::unrefLater( *it );
212       list.erase( it );
213     }
214   };
215
216   delReq( _runningDownloads, req );
217   delReq( _pendingDownloads, req );
218
219   void *easyHandle = req.d_func()->_easyHandle;
220   if ( easyHandle ) {
221     curl_multi_remove_handle( _multi, easyHandle );
222   }
223
224   req.d_func()->_dispatcher = nullptr;
225
226   //first set the result, the Request might have a checksum to check as well so a currently
227   //successful request could fail later on
228   req.d_func()->setResult( std::move(result) );
229   _sigDownloadFinished.emit( *z_func(), req );
230
231   //we got a open slot, try to dequeue or send the finished signals if all queues are empty
232   dequeuePending();
233 }
234
235 void NetworkRequestDispatcherPrivate::dequeuePending()
236 {
237   if ( !_isRunning || _locked )
238     return;
239
240   while ( _maxConnections > _runningDownloads.size() ) {
241     if ( !_pendingDownloads.size() )
242       break;
243
244     std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
245     _pendingDownloads.pop_front();
246
247     std::string errBuf = "Failed to initialize easy handle";
248     if ( !req->d_func()->initialize( errBuf ) ) {
249       //@TODO store the CURL error in the errors extra info
250       EventDispatcher::unrefLater( req );
251       setFinished( *req, NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, std::move(errBuf) ) );
252       continue;
253     }
254
255     CURLMcode rc = curl_multi_add_handle( _multi, req->d_func()->_easyHandle );
256     if ( rc != 0 ) {
257       EventDispatcher::unrefLater( req );
258       setFinished( *req, NetworkRequestErrorPrivate::fromCurlMError( rc ) );
259       continue;
260     }
261
262     req->d_func()->aboutToStart();
263     _sigDownloadStarted.emit( *z_func(), *req );
264
265     _runningDownloads.push_back( std::move(req) );
266   }
267
268   //check for empty queues
269   if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
270     //once we finished all requests, cancel the timer too, so curl is not called without requests
271     _timer->stop();
272     _sigQueueFinished.emit( *z_func() );
273   }
274 }
275
276 NetworkRequestDispatcher::NetworkRequestDispatcher( )
277   : Base( * new NetworkRequestDispatcherPrivate ( ) )
278 {
279
280 }
281
282 bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
283 {
284   curl_version_info_data *curl_info = nullptr;
285   curl_info = curl_version_info(CURLVERSION_NOW);
286   // curl_info does not need any free (is static)
287   if (curl_info->protocols)
288   {
289     const char * const *proto;
290     std::string        scheme( url.getScheme() );
291     bool               found = false;
292     for(proto=curl_info->protocols; !found && *proto; ++proto) {
293       if( scheme == std::string((const char *)*proto))
294         found = true;
295     }
296     return found;
297   }
298   return true;
299 }
300
301 void NetworkRequestDispatcher::setMaximumConcurrentConnections( size_t maxConn )
302 {
303   d_func()->_maxConnections = maxConn;
304 }
305
306 void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
307 {
308   if ( !req )
309     return;
310   Z_D();
311
312   if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() )  {
313     WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
314     return;
315   }
316
317   if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
318     WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
319     return;
320   }
321
322   req->d_func()->_dispatcher = this;
323   if ( req->priority() == NetworkRequest::Normal )
324     d->_pendingDownloads.push_back( req );
325   else {
326     auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &req ){
327       return req->priority() ==  NetworkRequest::Normal;
328     });
329
330     //if we have a valid iterator, decrement we found a Normal pending download request, insert before that
331     if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
332       it--;
333     d->_pendingDownloads.insert( it, req );
334   }
335
336   //dequeue if running and we have capacity
337   d->dequeuePending();
338 }
339
340 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
341 {
342   cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitely cancelled" ) );
343 }
344
345 void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
346 {
347   Z_D();
348
349   if ( req.d_func()->_dispatcher != this ) {
350     //TODO throw exception
351     return;
352   }
353
354   d->setFinished( req, err );
355 }
356
357 void NetworkRequestDispatcher::run()
358 {
359   Z_D();
360   d->_isRunning = true;
361
362   if ( d->_pendingDownloads.size() )
363     d->dequeuePending();
364 }
365
366 const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
367 {
368   return d_func()->_lastError;
369 }
370
371 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
372 {
373   return d_func()->_sigDownloadStarted;
374 }
375
376 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
377 {
378   return d_func()->_sigDownloadFinished;
379 }
380
381 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
382 {
383   return d_func()->_sigQueueFinished;
384 }
385
386 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
387 {
388   return d_func()->_sigError;
389 }
390
391 }