1 /*---------------------------------------------------------------------\
3 | |__ / \ / / . \ . \ |
8 \---------------------------------------------------------------------*/
9 /** \file zypp/media/MediaMultiCurl.cc
14 #include <sys/types.h>
18 #include <arpa/inet.h>
25 #include "zypp/ZConfig.h"
26 #include "zypp/base/Logger.h"
27 #include "zypp/media/MediaMultiCurl.h"
28 #include "zypp/media/MetaLinkParser.h"
31 using namespace zypp::base;
37 //////////////////////////////////////////////////////////////////////
40 class multifetchrequest;
42 // Hack: we derive from MediaCurl just to get the storage space for
43 // settings, url, curlerrors and the like
45 class multifetchworker : MediaCurl {
46 friend class multifetchrequest;
49 multifetchworker(int no, multifetchrequest &request, const Url &url);
54 bool recheckChecksum();
55 void disableCompetition();
58 void adddnsfd(fd_set &rset, int &maxfd);
59 void dnsevent(fd_set &rset);
83 size_t writefunction(void *ptr, size_t size);
84 static size_t _writefunction(void *ptr, size_t size, size_t nmemb, void *stream);
86 size_t headerfunction(char *ptr, size_t size);
87 static size_t _headerfunction(void *ptr, size_t size, size_t nmemb, void *stream);
89 multifetchrequest *_request;
100 #define WORKER_STARTING 0
101 #define WORKER_LOOKUP 1
102 #define WORKER_FETCH 2
103 #define WORKER_DISCARD 3
104 #define WORKER_DONE 4
105 #define WORKER_SLEEP 5
106 #define WORKER_BROKEN 6
110 class multifetchrequest {
112 multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize);
113 ~multifetchrequest();
115 void run(std::vector<Url> &urllist);
118 friend class multifetchworker;
120 const MediaMultiCurl *_context;
121 const Pathname _filename;
125 callback::SendReport<DownloadProgressReport> *_report;
126 MediaBlockList *_blklist;
131 std::list<multifetchworker *> _workers;
137 size_t _activeworkers;
138 size_t _lookupworkers;
139 size_t _sleepworkers;
140 double _minsleepuntil;
144 off_t _fetchedgoodsize;
147 double _lastprogress;
149 double _lastperiodstart;
150 double _lastperiodfetched;
155 double _connect_timeout;
159 #define BLKSIZE 131072
164 //////////////////////////////////////////////////////////////////////
170 if (gettimeofday(&tv, NULL))
172 return tv.tv_sec + tv.tv_usec / 1000000.;
176 multifetchworker::writefunction(void *ptr, size_t size)
179 if (_state == WORKER_BROKEN)
182 double now = currentTime();
184 len = size > _size ? _size : size;
194 _request->_lastprogress = now;
196 if (_state == WORKER_DISCARD || !_request->_fp)
198 // block is no longer needed
199 // still calculate the checksum so that we can throw out bad servers
200 if (_request->_blklist)
201 _dig.update((const char *)ptr, len);
206 if (fseeko(_request->_fp, _off, SEEK_SET))
208 cnt = fwrite(ptr, 1, len, _request->_fp);
211 _request->_fetchedsize += cnt;
212 if (_request->_blklist)
213 _dig.update((const char *)ptr, cnt);
223 multifetchworker::_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
225 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
226 return me->writefunction(ptr, size * nmemb);
230 multifetchworker::headerfunction(char *p, size_t size)
233 if (l > 9 && !strncasecmp(p, "Location:", 9))
235 string line(p + 9, l - 9);
236 if (line[l - 10] == '\r')
237 line.erase(l - 10, 1);
238 DBG << "#" << _workerno << ": redirecting to" << line << endl;
241 if (l <= 14 || l >= 128 || strncasecmp(p, "Content-Range:", 14) != 0)
245 while (l && (*p == ' ' || *p == '\t'))
247 if (l < 6 || strncasecmp(p, "bytes", 5))
254 unsigned long long start, off, filesize;
255 if (sscanf(buf, "%llu-%llu/%llu", &start, &off, &filesize) != 3)
257 if (_request->_filesize == (off_t)-1)
259 WAR << "#" << _workerno << ": setting request filesize to " << filesize << endl;
260 _request->_filesize = filesize;
261 if (_request->_totalsize == 0 && !_request->_blklist)
262 _request->_totalsize = filesize;
264 if (_request->_filesize != (off_t)filesize)
266 DBG << "#" << _workerno << ": filesize mismatch" << endl;
267 _state = WORKER_BROKEN;
268 strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE);
274 multifetchworker::_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
276 multifetchworker *me = reinterpret_cast<multifetchworker *>(stream);
277 return me->headerfunction((char *)ptr, size * nmemb);
280 multifetchworker::multifetchworker(int no, multifetchrequest &request, const Url &url)
281 : MediaCurl(url, Pathname())
285 _state = WORKER_STARTING;
287 _off = _blkstart = 0;
288 _size = _blksize = 0;
298 _maxspeed = _request->_maxspeed;
301 Url curlUrl( clearQueryString(url) );
302 _urlbuf = curlUrl.asString();
303 _curl = _request->_context->fromEasyPool(_url.getHost());
305 DBG << "reused worker from pool" << endl;
306 if (!_curl && !(_curl = curl_easy_init()))
308 _state = WORKER_BROKEN;
309 strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE);
316 catch (Exception &ex)
318 curl_easy_cleanup(_curl);
320 _state = WORKER_BROKEN;
321 strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE);
324 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
325 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
326 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, &_writefunction);
327 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, this);
328 if (_request->_filesize == off_t(-1) || !_request->_blklist || !_request->_blklist->haveChecksum(0))
330 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, &_headerfunction);
331 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, this);
333 // if this is the same host copy authorization
334 // (the host check is also what curl does when doing a redirect)
335 // (note also that unauthorized exceptions are thrown with the request host)
336 if (url.getHost() == _request->_context->_url.getHost())
338 _settings.setUsername(_request->_context->_settings.username());
339 _settings.setPassword(_request->_context->_settings.password());
340 _settings.setAuthType(_request->_context->_settings.authType());
341 if ( _settings.userPassword().size() )
343 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
344 string use_auth = _settings.authType();
345 if (use_auth.empty())
346 use_auth = "digest,basic"; // our default
347 long auth = CurlAuthData::auth_type_str2long(use_auth);
348 if( auth != CURLAUTH_NONE)
350 DBG << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
351 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
352 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
359 multifetchworker::~multifetchworker()
363 if (_state == WORKER_FETCH || _state == WORKER_DISCARD)
364 curl_multi_remove_handle(_request->_multi, _curl);
365 if (_state == WORKER_DONE || _state == WORKER_SLEEP)
367 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
368 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
369 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
370 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
371 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
372 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
373 _request->_context->toEasyPool(_url.getHost(), _curl);
376 curl_easy_cleanup(_curl);
383 while (waitpid(_pid, &status, 0) == -1)
393 // the destructor in MediaCurl doesn't call disconnect() if
394 // the media is not attached, so we do it here manually
398 static inline bool env_isset(string name)
400 const char *s = getenv(name.c_str());
401 return s && *s ? true : false;
405 multifetchworker::checkdns()
407 string host = _url.getHost();
412 if (_request->_context->isDNSok(host))
415 // no need to do dns checking for numeric hosts
417 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
419 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
422 // no need to do dns checking if we use a proxy
423 if (!_settings.proxy().empty())
425 if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
427 string schemeproxy = _url.getScheme() + "_proxy";
428 if (env_isset(schemeproxy))
430 if (schemeproxy != "http_proxy")
432 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
433 if (env_isset(schemeproxy))
437 DBG << "checking DNS lookup of " << host << endl;
441 _state = WORKER_BROKEN;
442 strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE);
446 if (_pid == pid_t(-1))
451 _state = WORKER_BROKEN;
452 strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE);
458 // XXX: close all other file descriptors
459 struct addrinfo *ai, aihints;
460 memset(&aihints, 0, sizeof(aihints));
461 aihints.ai_family = PF_UNSPEC;
462 int tstsock = socket(PF_INET6, SOCK_DGRAM, 0);
464 aihints.ai_family = PF_INET;
467 aihints.ai_socktype = SOCK_STREAM;
468 aihints.ai_flags = AI_CANONNAME;
469 unsigned int connecttimeout = _request->_connect_timeout;
471 alarm(connecttimeout);
472 signal(SIGALRM, SIG_DFL);
473 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
478 _dnspipe = pipefds[0];
479 _state = WORKER_LOOKUP;
483 multifetchworker::adddnsfd(fd_set &rset, int &maxfd)
485 if (_state != WORKER_LOOKUP)
487 FD_SET(_dnspipe, &rset);
488 if (maxfd < _dnspipe)
493 multifetchworker::dnsevent(fd_set &rset)
496 if (_state != WORKER_LOOKUP || !FD_ISSET(_dnspipe, &rset))
499 while (waitpid(_pid, &status, 0) == -1)
510 if (!WIFEXITED(status))
512 _state = WORKER_BROKEN;
513 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
514 _request->_activeworkers--;
517 int exitcode = WEXITSTATUS(status);
518 DBG << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
521 _state = WORKER_BROKEN;
522 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
523 _request->_activeworkers--;
526 _request->_context->setDNSok(_url.getHost());
531 multifetchworker::checkChecksum()
533 // DBG << "checkChecksum block " << _blkno << endl;
534 if (!_blksize || !_request->_blklist)
536 return _request->_blklist->verifyDigest(_blkno, _dig);
540 multifetchworker::recheckChecksum()
542 // DBG << "recheckChecksum block " << _blkno << endl;
543 if (!_request->_fp || !_blksize || !_request->_blklist)
545 if (fseeko(_request->_fp, _blkstart, SEEK_SET))
549 _request->_blklist->createDigest(_dig); // resets digest
552 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
553 if (fread(buf, cnt, 1, _request->_fp) != 1)
555 _dig.update(buf, cnt);
558 return _request->_blklist->verifyDigest(_blkno, _dig);
563 multifetchworker::stealjob()
565 if (!_request->_stealing)
567 DBG << "start stealing!" << endl;
568 _request->_stealing = true;
570 multifetchworker *best = 0;
571 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
573 for (; workeriter != _request->_workers.end(); ++workeriter)
575 multifetchworker *worker = *workeriter;
578 if (worker->_pass == -1)
579 continue; // do not steal!
580 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_blksize)
581 continue; // do not steal finished jobs
582 if (!worker->_avgspeed && worker->_blkreceived)
586 if (now > worker->_blkstarttime)
587 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
589 if (!best || best->_pass > worker->_pass)
594 if (best->_pass < worker->_pass)
596 // if it is the same block, we want to know the best worker, otherwise the worst
597 if (worker->_blkstart == best->_blkstart)
599 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed < (best->_blksize - best->_blkreceived) * worker->_avgspeed)
604 if ((worker->_blksize - worker->_blkreceived) * best->_avgspeed > (best->_blksize - best->_blkreceived) * worker->_avgspeed)
610 _state = WORKER_DONE;
611 _request->_activeworkers--;
612 _request->_finished = true;
615 // do not sleep twice
616 if (_state != WORKER_SLEEP)
618 if (!_avgspeed && _blkreceived)
622 if (now > _blkstarttime)
623 _avgspeed = _blkreceived / (now - _blkstarttime);
626 // lets see if we should sleep a bit
627 DBG << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_blksize << endl;
628 DBG << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_blksize - best->_blkreceived) << endl;
629 if (_avgspeed && best->_avgspeed && (best->_blksize - best->_blkreceived) * _avgspeed < best->_blksize * best->_avgspeed)
633 double sl = (best->_blksize - best->_blkreceived) / best->_avgspeed * 2;
636 DBG << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
637 _sleepuntil = now + sl;
638 _state = WORKER_SLEEP;
639 _request->_sleepworkers++;
645 best->_competing = true;
646 _blkstart = best->_blkstart;
647 _blksize = best->_blksize;
650 _blkno = best->_blkno;
655 multifetchworker::disableCompetition()
657 std::list<multifetchworker *>::iterator workeriter = _request->_workers.begin();
658 for (; workeriter != _request->_workers.end(); ++workeriter)
660 multifetchworker *worker = *workeriter;
663 if (worker->_blkstart == _blkstart)
665 if (worker->_state == WORKER_FETCH)
666 worker->_state = WORKER_DISCARD;
667 worker->_pass = -1; /* do not steal this one, we already have it */
674 multifetchworker::nextjob()
677 if (_request->_stealing)
683 MediaBlockList *blklist = _request->_blklist;
687 if (_request->_filesize != off_t(-1))
689 if (_request->_blkoff >= _request->_filesize)
694 _blksize = _request->_filesize - _request->_blkoff;
695 if (_blksize > BLKSIZE)
701 MediaBlock blk = blklist->getBlock(_request->_blkno);
702 while (_request->_blkoff >= blk.off + blk.size)
704 if (++_request->_blkno == blklist->numBlocks())
709 blk = blklist->getBlock(_request->_blkno);
710 _request->_blkoff = blk.off;
712 _blksize = blk.off + blk.size - _request->_blkoff;
713 if (_blksize > BLKSIZE && !blklist->haveChecksum(_request->_blkno))
716 _blkno = _request->_blkno;
717 _blkstart = _request->_blkoff;
718 _request->_blkoff += _blksize;
723 multifetchworker::run()
727 if (_state == WORKER_BROKEN || _state == WORKER_DONE)
728 return; // just in case...
730 sprintf(rangebuf, "%llu-", (unsigned long long)_blkstart);
732 sprintf(rangebuf, "%llu-%llu", (unsigned long long)_blkstart, (unsigned long long)_blkstart + _blksize - 1);
733 DBG << "#" << _workerno << ": BLK " << _blkno << ":" << rangebuf << " " << _url << endl;
734 if (curl_easy_setopt(_curl, CURLOPT_RANGE, !_noendrange || _blkstart != 0 ? rangebuf : (char *)0) != CURLE_OK)
736 _request->_activeworkers--;
737 _state = WORKER_BROKEN;
738 strncpy(_curlError, "curl_easy_setopt range failed", CURL_ERROR_SIZE);
741 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK)
743 _request->_activeworkers--;
744 _state = WORKER_BROKEN;
745 strncpy(_curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE);
748 _request->_havenewjob = true;
751 if (_request->_blklist)
752 _request->_blklist->createDigest(_dig); // resets digest
753 _state = WORKER_FETCH;
755 double now = currentTime();
761 //////////////////////////////////////////////////////////////////////
764 multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) : _context(context), _filename(filename), _baseurl(baseurl)
769 _filesize = filesize;
775 _blkoff = _blklist->getBlock(0).off;
784 _fetchedgoodsize = 0;
786 _lastperiodstart = _lastprogress = _starttime = currentTime();
787 _lastperiodfetched = 0;
790 _connect_timeout = 0;
794 for (size_t blkno = 0; blkno < blklist->numBlocks(); blkno++)
796 MediaBlock blk = blklist->getBlock(blkno);
797 _totalsize += blk.size;
800 else if (filesize != off_t(-1))
801 _totalsize = filesize;
804 multifetchrequest::~multifetchrequest()
806 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
808 multifetchworker *worker = *workeriter;
816 multifetchrequest::run(std::vector<Url> &urllist)
819 std::vector<Url>::iterator urliter = urllist.begin();
822 fd_set rset, wset, xset;
827 DBG << "finished!" << endl;
831 if (_activeworkers < MAXWORKERS && urliter != urllist.end() && _workers.size() < MAXURLS)
833 // spawn another worker!
834 multifetchworker *worker = new multifetchworker(workerno++, *this, *urliter);
835 _workers.push_back(worker);
836 if (worker->_state != WORKER_BROKEN)
839 if (worker->_state != WORKER_LOOKUP)
851 WAR << "No more active workers!" << endl;
852 // show the first worker error we find
853 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
855 if ((*workeriter)->_state != WORKER_BROKEN)
857 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError));
866 curl_multi_fdset(_multi, &rset, &wset, &xset, &maxfd);
869 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
870 (*workeriter)->adddnsfd(rset, maxfd);
873 // if we added a new job we have to call multi_perform once
874 // to make it show up in the fd set. do not sleep in this case.
876 tv.tv_usec = _havenewjob ? 0 : 200000;
877 if (_sleepworkers && !_havenewjob)
879 if (_minsleepuntil == 0)
881 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
883 multifetchworker *worker = *workeriter;
884 if (worker->_state != WORKER_SLEEP)
886 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
887 _minsleepuntil = worker->_sleepuntil;
890 double sl = _minsleepuntil - currentTime();
897 tv.tv_usec = sl * 1000000;
899 int r = select(maxfd + 1, &rset, &wset, &xset, &tv);
900 if (r == -1 && errno != EINTR)
901 ZYPP_THROW(MediaCurlException(_baseurl, "select() failed", "unknown error"));
902 if (r != 0 && _lookupworkers)
903 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
905 multifetchworker *worker = *workeriter;
906 if (worker->_state != WORKER_LOOKUP)
908 (*workeriter)->dnsevent(rset);
909 if (worker->_state != WORKER_LOOKUP)
919 mcode = curl_multi_perform(_multi, &tasks);
920 if (mcode == CURLM_CALL_MULTI_PERFORM)
922 if (mcode != CURLM_OK)
923 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_perform", "unknown error"));
927 double now = currentTime();
930 if (now > _lastperiodstart + .5)
933 _periodavg = (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart);
935 _periodavg = (_periodavg + (_fetchedsize - _lastperiodfetched) / (now - _lastperiodstart)) / 2;
936 _lastperiodfetched = _fetchedsize;
937 _lastperiodstart = now;
943 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
945 multifetchworker *worker = *workeriter;
946 if (worker->_state != WORKER_SLEEP)
948 if (worker->_sleepuntil > now)
950 if (_minsleepuntil == worker->_sleepuntil)
952 DBG << "#" << worker->_workerno << ": sleep done, wake up" << endl;
954 // nextjob chnages the state
959 // collect all curl results, reschedule new jobs
961 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
963 if (msg->msg != CURLMSG_DONE)
965 multifetchworker *worker;
966 if (curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &worker) != CURLE_OK)
967 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
968 CURLcode cc = msg->data.result;
969 if (worker->_blkreceived && now > worker->_blkstarttime)
971 if (worker->_avgspeed)
972 worker->_avgspeed = (worker->_avgspeed + worker->_blkreceived / (now - worker->_blkstarttime)) / 2;
974 worker->_avgspeed = worker->_blkreceived / (now - worker->_blkstarttime);
976 DBG << "#" << worker->_workerno << ": BLK " << worker->_blkno << " done code " << cc << " speed " << worker->_avgspeed << endl;
977 curl_multi_remove_handle(_multi, msg->easy_handle);
978 if (cc == CURLE_HTTP_RETURNED_ERROR)
981 (void)curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &statuscode);
982 DBG << "HTTP status " << statuscode << endl;
983 if (statuscode == 416 && !_blklist) /* Range error */
985 if (_filesize == off_t(-1))
987 if (!worker->_noendrange)
989 DBG << "#" << worker->_workerno << ": retrying with no end range" << endl;
990 worker->_noendrange = true;
994 worker->_noendrange = false;
998 if (worker->_blkstart >= _filesize)
1007 if (!worker->checkChecksum())
1009 WAR << "#" << worker->_workerno << ": checksum error, disable worker" << endl;
1010 worker->_state = WORKER_BROKEN;
1011 strncpy(worker->_curlError, "checksum error", CURL_ERROR_SIZE);
1015 if (worker->_state == WORKER_FETCH)
1017 if (worker->_competing)
1019 worker->disableCompetition();
1020 // multiple workers wrote into this block. We already know that our
1021 // data was correct, but maybe some other worker overwrote our data
1022 // with something broken. Thus we have to re-check the block.
1023 if (!worker->recheckChecksum())
1025 DBG << "#" << worker->_workerno << ": recheck checksum error, refetch block" << endl;
1026 // re-fetch! No need to worry about the bad workers,
1027 // they will now be set to DISCARD. At the end of their block
1028 // they will notice that they wrote bad data and go into BROKEN.
1033 _fetchedgoodsize += worker->_blksize;
1036 // make bad workers sleep a little
1038 int maxworkerno = 0;
1040 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1042 multifetchworker *oworker = *workeriter;
1043 if (oworker->_state == WORKER_BROKEN)
1045 if (oworker->_avgspeed > maxavg)
1047 maxavg = oworker->_avgspeed;
1048 maxworkerno = oworker->_workerno;
1050 if (oworker->_avgspeed > worker->_avgspeed)
1053 if (maxavg && !_stealing)
1055 double ratio = worker->_avgspeed / maxavg;
1057 if (numbetter < 3) // don't sleep that much if we're in the top two
1058 ratio = ratio * ratio;
1061 DBG << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
1062 worker->_sleepuntil = now + ratio;
1063 worker->_state = WORKER_SLEEP;
1069 // do rate control (if requested)
1070 // should use periodavg, but that's not what libcurl does
1071 if (_maxspeed && now > _starttime)
1073 double avg = _fetchedsize / (now - _starttime);
1074 avg = worker->_maxspeed * _maxspeed / avg;
1075 if (avg < _maxspeed / MAXWORKERS)
1076 avg = _maxspeed / MAXWORKERS;
1077 if (avg > _maxspeed)
1081 worker->_maxspeed = avg;
1082 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
1089 worker->_state = WORKER_BROKEN;
1091 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS))
1093 // end of workers reached! goodbye!
1094 worker->evaluateCurlCode(Pathname(), cc, false);
1102 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
1104 if (now > _starttime)
1105 avg = _fetchedsize / (now - _starttime);
1106 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
1107 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
1110 if (_timeout && now - _lastprogress > _timeout)
1115 ZYPP_THROW(MediaTimeoutException(_baseurl));
1117 // print some download stats
1118 WAR << "overall result" << endl;
1119 for (std::list<multifetchworker *>::iterator workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1121 multifetchworker *worker = *workeriter;
1122 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
1127 //////////////////////////////////////////////////////////////////////
1130 MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
1131 : MediaCurl(url_r, attach_point_hint_r)
1133 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
1135 _customHeadersMetalink = 0;
1138 MediaMultiCurl::~MediaMultiCurl()
1140 if (_customHeadersMetalink)
1142 curl_slist_free_all(_customHeadersMetalink);
1143 _customHeadersMetalink = 0;
1147 curl_multi_cleanup(_multi);
1150 std::map<std::string, CURL *>::iterator it;
1151 for (it = _easypool.begin(); it != _easypool.end(); it++)
1153 CURL *easy = it->second;
1156 curl_easy_cleanup(easy);
1162 void MediaMultiCurl::setupEasy()
1164 MediaCurl::setupEasy();
1166 if (_customHeadersMetalink)
1168 curl_slist_free_all(_customHeadersMetalink);
1169 _customHeadersMetalink = 0;
1171 struct curl_slist *sl = _customHeaders;
1172 for (; sl; sl = sl->next)
1173 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
1174 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/metalink+xml, application/metalink4+xml");
1178 void MediaMultiCurl::doGetFileCopy( const Pathname & filename , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
1180 Pathname dest = target.absolutename();
1181 if( assert_dir( dest.dirname() ) )
1183 DBG << "assert_dir " << dest.dirname() << " failed" << endl;
1184 Url url(getFileUrl(filename));
1185 ZYPP_THROW( MediaSystemException(url, "System error on " + dest.dirname().asString()) );
1187 string destNew = target.asString() + ".new.zypp.XXXXXX";
1188 char *buf = ::strdup( destNew.c_str());
1191 ERR << "out of memory for temp file name" << endl;
1192 Url url(getFileUrl(filename));
1193 ZYPP_THROW(MediaSystemException(url, "out of memory for temp file name"));
1196 int tmp_fd = ::mkstemp( buf );
1200 ERR << "mkstemp failed for file '" << destNew << "'" << endl;
1201 ZYPP_THROW(MediaWriteException(destNew));
1206 FILE *file = ::fdopen( tmp_fd, "w" );
1209 filesystem::unlink( destNew );
1210 ERR << "fopen failed for file '" << destNew << "'" << endl;
1211 ZYPP_THROW(MediaWriteException(destNew));
1213 DBG << "dest: " << dest << endl;
1214 DBG << "temp: " << destNew << endl;
1216 // set IFMODSINCE time condition (no download if not modified)
1217 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
1219 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
1220 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
1224 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1225 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1227 // change header to include Accept: metalink
1228 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
1231 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options);
1233 catch (Exception &ex)
1236 filesystem::unlink(destNew);
1237 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1238 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1239 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1242 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1243 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1244 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1245 long httpReturnCode = 0;
1246 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
1247 if (infoRet == CURLE_OK)
1249 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
1250 if ( httpReturnCode == 304
1251 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified
1253 DBG << "not modified: " << PathInfo(dest) << endl;
1259 WAR << "Could not get the reponse code." << endl;
1262 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
1264 string ct = string(ptr);
1265 if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
1267 bool userabort = false;
1270 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
1274 mlp.parse(Pathname(destNew));
1275 MediaBlockList bl = mlp.getBlockList();
1276 vector<Url> urls = mlp.getUrls();
1278 file = fopen(destNew.c_str(), "w+");
1280 ZYPP_THROW(MediaWriteException(destNew));
1281 if (PathInfo(target).isExist())
1283 DBG << "reusing blocks from file " << target << endl;
1284 bl.reuseBlocks(file, target.asString());
1287 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
1289 DBG << "reusing blocks from file " << failedFile << endl;
1290 bl.reuseBlocks(file, failedFile.asString());
1292 filesystem::unlink(failedFile);
1294 Pathname df = deltafile();
1297 DBG << "reusing blocks from file " << df << endl;
1298 bl.reuseBlocks(file, df.asString());
1303 multifetch(filename, file, &urls, &report, &bl);
1305 catch (MediaCurlException &ex)
1307 userabort = ex.errstr() == "User abort";
1311 catch (Exception &ex)
1313 // something went wrong. fall back to normal download
1317 if (PathInfo(destNew).size() >= 63336)
1319 ::unlink(failedFile.asString().c_str());
1320 filesystem::hardlinkCopy(destNew, failedFile);
1324 filesystem::unlink(destNew);
1327 file = fopen(destNew.c_str(), "w+");
1329 ZYPP_THROW(MediaWriteException(destNew));
1330 MediaCurl::doGetFileCopyFile(filename, dest, file, report, options | OPTION_NO_REPORT_START);
1334 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
1336 ERR << "Failed to chmod file " << destNew << endl;
1340 filesystem::unlink(destNew);
1341 ERR << "Fclose failed for file '" << destNew << "'" << endl;
1342 ZYPP_THROW(MediaWriteException(destNew));
1344 if ( rename( destNew, dest ) != 0 )
1346 ERR << "Rename failed" << endl;
1347 ZYPP_THROW(MediaWriteException(dest));
1349 DBG << "done: " << PathInfo(dest) << endl;
1352 void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, callback::SendReport<DownloadProgressReport> *report, MediaBlockList *blklist, off_t filesize) const
1354 Url baseurl(getFileUrl(filename));
1355 if (blklist && filesize == off_t(-1) && blklist->haveFilesize())
1356 filesize = blklist->getFilesize();
1357 if (blklist && !blklist->haveBlocks() && filesize != 0)
1359 if (blklist && (filesize == 0 || !blklist->numBlocks()))
1361 checkFileDigest(baseurl, fp, blklist);
1368 _multi = curl_multi_init();
1370 ZYPP_THROW(MediaCurlInitException(baseurl));
1372 multifetchrequest req(this, filename, baseurl, _multi, fp, report, blklist, filesize);
1373 req._timeout = _settings.timeout();
1374 req._connect_timeout = _settings.connectTimeout();
1375 req._maxspeed = _settings.maxDownloadSpeed();
1376 std::vector<Url> myurllist;
1377 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
1381 string scheme = urliter->getScheme();
1382 if (scheme == "http" || scheme == "https" || scheme == "ftp")
1384 checkProtocol(*urliter);
1385 myurllist.push_back(*urliter);
1392 if (!myurllist.size())
1393 myurllist.push_back(baseurl);
1395 checkFileDigest(baseurl, fp, blklist);
1398 void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList *blklist) const
1400 if (!blklist || !blklist->haveFileChecksum())
1402 if (fseeko(fp, off_t(0), SEEK_SET))
1403 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
1405 blklist->createFileDigest(dig);
1408 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
1410 if (!blklist->verifyFileDigest(dig))
1411 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
1414 bool MediaMultiCurl::isDNSok(const string &host) const
1416 return _dnsok.find(host) == _dnsok.end() ? false : true;
1419 void MediaMultiCurl::setDNSok(const string &host) const
1421 _dnsok.insert(host);
1424 CURL *MediaMultiCurl::fromEasyPool(const string &host) const
1426 if (_easypool.find(host) == _easypool.end())
1428 CURL *ret = _easypool[host];
1429 _easypool.erase(host);
1433 void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
1435 CURL *oldeasy = _easypool[host];
1436 _easypool[host] = easy;
1438 curl_easy_cleanup(oldeasy);
1441 } // namespace media